diff --git a/internal/metricRouter/metricAggregator.go b/internal/metricRouter/metricAggregator.go new file mode 100644 index 0000000..41c5276 --- /dev/null +++ b/internal/metricRouter/metricAggregator.go @@ -0,0 +1,291 @@ +package metricRouter + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" + + "github.com/PaesslerAG/gval" +) + +type metricAggregatorIntervalConfig struct { + Name string `json:"name"` // Metric name for the new metric + Function string `json:"function"` // Function to apply on the metric + Condition string `json:"if"` // Condition for applying function + Tags map[string]string `json:"tags"` // Tags for the new metric + Meta map[string]string `json:"meta"` // Meta information for the new metric + gvalCond gval.Evaluable + gvalFunc gval.Evaluable +} + +type metricAggregator struct { + functions []*metricAggregatorIntervalConfig + constants map[string]interface{} + language gval.Language + output chan lp.CCMetric +} + +type MetricAggregator interface { + AddAggregation(name, function, condition string, tags, meta map[string]string) error + DeleteAggregation(name string) error + Init(output chan lp.CCMetric) error + Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMetric) +} + +var metricCacheLanguage = gval.NewLanguage( + gval.Base(), + gval.Function("sum", sumfunc), + gval.Function("min", minfunc), + gval.Function("avg", avgfunc), + gval.Function("mean", avgfunc), + gval.Function("max", maxfunc), + gval.Function("len", lenfunc), + gval.Function("median", medianfunc), + gval.InfixOperator("in", infunc), + gval.Function("match", matchfunc), + gval.Function("getCpuCore", getCpuCoreFunc), + gval.Function("getCpuSocket", getCpuSocketFunc), + gval.Function("getCpuNuma", getCpuNumaDomainFunc), + gval.Function("getCpuDie", getCpuDieFunc), + gval.Function("getSockCpuList", getCpuListOfSocketFunc), + gval.Function("getNumaCpuList", getCpuListOfNumaDomainFunc), + gval.Function("getDieCpuList", getCpuListOfDieFunc), + gval.Function("getCoreCpuList", getCpuListOfCoreFunc), + gval.Function("getCpuList", getCpuListOfNode), + gval.Function("getCpuListOfType", getCpuListOfType), +) + +func (c *metricAggregator) Init(output chan lp.CCMetric) error { + c.output = output + c.functions = make([]*metricAggregatorIntervalConfig, 0) + c.constants = make(map[string]interface{}) + + // add constants like hostname, numSockets, ... to constants list + // Set hostname + hostname, err := os.Hostname() + if err != nil { + cclog.Error(err.Error()) + return err + } + // Drop domain part of host name + c.constants["hostname"] = strings.SplitN(hostname, `.`, 2)[0] + cinfo := topo.CpuInfo() + c.constants["numHWThreads"] = cinfo.NumHWthreads + c.constants["numSockets"] = cinfo.NumSockets + c.constants["numNumaDomains"] = cinfo.NumNumaDomains + c.constants["numDies"] = cinfo.NumDies + c.constants["smtWidth"] = cinfo.SMTWidth + + c.language = gval.NewLanguage( + gval.Base(), + metricCacheLanguage, + ) + + // Example aggregation function + // var f metricCacheFunctionConfig + // f.Name = "temp_cores_avg" + // //f.Condition = `"temp_core_" in name` + // f.Condition = `match("temp_core_%d+", metric.Name())` + // f.Function = `avg(values)` + // f.Tags = map[string]string{"type": "node"} + // f.Meta = map[string]string{"group": "IPMI", "unit": "degC", "source": "TempCollector"} + // c.functions = append(c.functions, &f) + return nil +} + +func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMetric) { + vars := make(map[string]interface{}) + for k, v := range c.constants { + vars[k] = v + } + vars["starttime"] = starttime + vars["endtime"] = endtime + for _, f := range c.functions { + cclog.ComponentDebug("MetricCache", "COLLECT", f.Name, "COND", f.Condition) + values := make([]float64, 0) + matches := make([]lp.CCMetric, 0) + for _, m := range metrics { + vars["metric"] = m + //value, err := gval.Evaluate(f.Condition, vars, c.language) + value, err := f.gvalCond.EvalBool(context.Background(), vars) + if err != nil { + cclog.ComponentError("MetricCache", "COLLECT", f.Name, "COND", f.Condition, ":", err.Error()) + continue + } + if value { + v, valid := m.GetField("value") + if valid { + switch x := v.(type) { + case float64: + values = append(values, x) + case float32: + case int: + case int64: + values = append(values, float64(x)) + case bool: + if x { + values = append(values, float64(1.0)) + } else { + values = append(values, float64(0.0)) + } + default: + cclog.ComponentError("MetricCache", "COLLECT ADD VALUE", v, "FAILED") + } + } + matches = append(matches, m) + } + } + delete(vars, "metric") + cclog.ComponentDebug("MetricCache", "EVALUATE", f.Name, "METRICS", len(values), "CALC", f.Function) + vars["values"] = values + vars["metrics"] = matches + if len(values) > 0 { + value, err := gval.Evaluate(f.Function, vars, c.language) + if err != nil { + cclog.ComponentError("MetricCache", "EVALUATE", f.Name, "METRICS", len(values), "CALC", f.Function, ":", err.Error()) + break + } + + copy_tags := func(tags map[string]string, metrics []lp.CCMetric) map[string]string { + out := make(map[string]string) + for key, value := range tags { + switch value { + case "": + for _, m := range metrics { + v, err := m.GetTag(key) + if err { + out[key] = v + } + } + default: + out[key] = value + } + } + return out + } + copy_meta := func(meta map[string]string, metrics []lp.CCMetric) map[string]string { + out := make(map[string]string) + for key, value := range meta { + switch value { + case "": + for _, m := range metrics { + v, err := m.GetMeta(key) + if err { + out[key] = v + } + } + default: + out[key] = value + } + } + return out + } + tags := copy_tags(f.Tags, matches) + meta := copy_meta(f.Meta, matches) + + var m lp.CCMetric + switch t := value.(type) { + case float64: + m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime) + case float32: + m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime) + case int: + m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime) + case int64: + m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime) + case string: + m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime) + default: + cclog.ComponentError("MetricCache", "Gval returned invalid type", t, "skipping metric", f.Name) + } + if err != nil { + cclog.ComponentError("MetricCache", "Cannot create metric from Gval result", value, ":", err.Error()) + } + cclog.ComponentDebug("MetricCache", "SEND", m) + select { + case c.output <- m: + default: + } + + } + } +} + +func (c *metricAggregator) AddAggregation(name, function, condition string, tags, meta map[string]string) error { + // Since "" cannot be used inside of JSON strings, we use '' and replace them here because gval does not like '' + // but wants "" + newfunc := strings.ReplaceAll(function, "'", "\"") + newcond := strings.ReplaceAll(condition, "'", "\"") + gvalCond, err := gval.Full(metricCacheLanguage).NewEvaluable(newcond) + if err != nil { + cclog.ComponentError("MetricAggregator", "Cannot add aggregation, invalid if condition", newcond, ":", err.Error()) + return err + } + gvalFunc, err := gval.Full(metricCacheLanguage).NewEvaluable(newfunc) + if err != nil { + cclog.ComponentError("MetricAggregator", "Cannot add aggregation, invalid function condition", newfunc, ":", err.Error()) + return err + } + for _, agg := range c.functions { + if agg.Name == name { + agg.Name = name + agg.Condition = newcond + agg.Function = newfunc + agg.Tags = tags + agg.Meta = meta + agg.gvalCond = gvalCond + agg.gvalFunc = gvalFunc + return nil + } + } + var agg metricAggregatorIntervalConfig + agg.Name = name + agg.Condition = newcond + agg.gvalCond = gvalCond + agg.Function = newfunc + agg.gvalFunc = gvalFunc + agg.Tags = tags + agg.Meta = meta + c.functions = append(c.functions, &agg) + return nil +} + +func (c *metricAggregator) DeleteAggregation(name string) error { + for i, agg := range c.functions { + if agg.Name == name { + copy(c.functions[i:], c.functions[i+1:]) + c.functions[len(c.functions)-1] = nil + c.functions = c.functions[:len(c.functions)-1] + return nil + } + } + return fmt.Errorf("no aggregation for metric name %s", name) +} + +func (c *metricAggregator) AddConstant(name string, value interface{}) { + c.constants[name] = value +} + +func (c *metricAggregator) DelConstant(name string) { + delete(c.constants, name) +} + +func (c *metricAggregator) AddFunction(name string, function func(args ...interface{}) (interface{}, error)) { + c.language = gval.NewLanguage(c.language, gval.Function(name, function)) +} + +func NewAggregator(output chan lp.CCMetric) (MetricAggregator, error) { + a := new(metricAggregator) + err := a.Init(output) + if err != nil { + return nil, err + } + return a, err +} diff --git a/internal/metricRouter/metricAggregatorFunctions.go b/internal/metricRouter/metricAggregatorFunctions.go new file mode 100644 index 0000000..4133a4b --- /dev/null +++ b/internal/metricRouter/metricAggregatorFunctions.go @@ -0,0 +1,376 @@ +package metricRouter + +import ( + "errors" + "fmt" + "math" + "regexp" + "sort" + "strings" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" +) + +/* + * Arithmetic functions on value arrays + */ + +// Sum up values +func sumfunc(args ...interface{}) (interface{}, error) { + s := 0.0 + values, ok := args[0].([]float64) + if ok { + cclog.ComponentDebug("MetricCache", "SUM FUNC START") + for _, x := range values { + s += x + } + cclog.ComponentDebug("MetricCache", "SUM FUNC END", s) + } else { + cclog.ComponentDebug("MetricCache", "SUM FUNC CAST FAILED") + } + return s, nil +} + +// Get the minimum value +func minfunc(args ...interface{}) (interface{}, error) { + var err error = nil + switch values := args[0].(type) { + case []float64: + var s float64 = math.MaxFloat64 + for _, x := range values { + if x < s { + s = x + } + } + return s, nil + case []float32: + var s float32 = math.MaxFloat32 + for _, x := range values { + if x < s { + s = x + } + } + return s, nil + case []int: + var s int = math.MaxInt + for _, x := range values { + if x < s { + s = x + } + } + return s, nil + case []int64: + var s int64 = math.MaxInt64 + for _, x := range values { + if x < s { + s = x + } + } + return s, nil + case []int32: + var s int32 = math.MaxInt32 + for _, x := range values { + if x < s { + s = x + } + } + return s, nil + default: + err = errors.New("function 'min' only on list of values (float64, float32, int, int32, int64)") + } + + return 0.0, err +} + +// Get the average or mean value +func avgfunc(args ...interface{}) (interface{}, error) { + switch values := args[0].(type) { + case []float64: + var s float64 = 0 + for _, x := range values { + s += x + } + return s / float64(len(values)), nil + case []float32: + var s float32 = 0 + for _, x := range values { + s += x + } + return s / float32(len(values)), nil + case []int: + var s int = 0 + for _, x := range values { + s += x + } + return s / len(values), nil + case []int64: + var s int64 = 0 + for _, x := range values { + s += x + } + return s / int64(len(values)), nil + } + return 0.0, nil +} + +// Get the maximum value +func maxfunc(args ...interface{}) (interface{}, error) { + s := 0.0 + values, ok := args[0].([]float64) + if ok { + for _, x := range values { + if x > s { + s = x + } + } + } + return s, nil +} + +// Get the median value +func medianfunc(args ...interface{}) (interface{}, error) { + switch values := args[0].(type) { + case []float64: + sort.Float64s(values) + return values[len(values)/2], nil + // case []float32: + // sort.Float64s(values) + // return values[len(values)/2], nil + case []int: + sort.Ints(values) + return values[len(values)/2], nil + + // case []int64: + // sort.Ints(values) + // return values[len(values)/2], nil + // case []int32: + // sort.Ints(values) + // return values[len(values)/2], nil + } + return 0.0, errors.New("function 'median()' only on lists of type float64 and int") +} + +/* + * Get number of values in list. Returns always an int + */ + +func lenfunc(args ...interface{}) (interface{}, error) { + var err error = nil + var length int = 0 + switch values := args[0].(type) { + case []float64: + length = len(values) + case []float32: + length = len(values) + case []int: + length = len(values) + case []int64: + length = len(values) + case []int32: + length = len(values) + case float64: + err = errors.New("function 'len' can only be applied on arrays and strings") + case float32: + err = errors.New("function 'len' can only be applied on arrays and strings") + case int: + err = errors.New("function 'len' can only be applied on arrays and strings") + case int64: + err = errors.New("function 'len' can only be applied on arrays and strings") + case string: + length = len(values) + } + return length, err +} + +/* + * Check if a values is in a list + * In constrast to most of the other functions, this one is an infix operator for + * - substring matching: `"abc" in "abcdef"` -> true + * - substring matching with int casting: `3 in "abd3"` -> true + * - search for an int in an int list: `3 in getCpuList()` -> true (if you have more than 4 CPU hardware threads) + */ + +func infunc(a interface{}, b interface{}) (interface{}, error) { + switch match := a.(type) { + case string: + switch total := b.(type) { + case string: + return strings.Contains(total, match), nil + } + case int: + switch total := b.(type) { + case []int: + for _, x := range total { + if x == match { + return true, nil + } + } + case string: + smatch := fmt.Sprintf("%d", match) + return strings.Contains(total, smatch), nil + } + + } + return false, nil +} + +/* + * Regex matching of strings (metric name, tag keys, tag values, meta keys, meta values) + * Since we cannot use \ inside JSON strings without escaping, we use % instead for the + * format keys \d = %d, \w = %d, ... Not sure how to fix this + */ + +func matchfunc(args ...interface{}) (interface{}, error) { + switch match := args[0].(type) { + case string: + switch total := args[1].(type) { + case string: + smatch := strings.Replace(match, "%", "\\", -1) + regex, err := regexp.Compile(smatch) + if err != nil { + return false, err + } + s := regex.Find([]byte(total)) + return s != nil, nil + } + } + return false, nil +} + +/* + * System topology getter functions + */ + +// for a given cpuid, it returns the core id +func getCpuCoreFunc(args ...interface{}) (interface{}, error) { + switch cpuid := args[0].(type) { + case int: + return topo.GetCpuCore(cpuid), nil + } + return -1, errors.New("function 'getCpuCore' accepts only an 'int' cpuid") +} + +// for a given cpuid, it returns the socket id +func getCpuSocketFunc(args ...interface{}) (interface{}, error) { + switch cpuid := args[0].(type) { + case int: + return topo.GetCpuSocket(cpuid), nil + } + return -1, errors.New("function 'getCpuCore' accepts only an 'int' cpuid") +} + +// for a given cpuid, it returns the id of the NUMA node +func getCpuNumaDomainFunc(args ...interface{}) (interface{}, error) { + switch cpuid := args[0].(type) { + case int: + return topo.GetCpuNumaDomain(cpuid), nil + } + return -1, errors.New("function 'getCpuNuma' accepts only an 'int' cpuid") +} + +// for a given cpuid, it returns the id of the CPU die +func getCpuDieFunc(args ...interface{}) (interface{}, error) { + switch cpuid := args[0].(type) { + case int: + return topo.GetCpuDie(cpuid), nil + } + return -1, errors.New("function 'getCpuDie' accepts only an 'int' cpuid") +} + +// for a given core id, it returns the list of cpuids +func getCpuListOfCoreFunc(args ...interface{}) (interface{}, error) { + cpulist := make([]int, 0) + switch in := args[0].(type) { + case int: + for _, c := range topo.CpuData() { + if c.Core == in { + cpulist = append(cpulist, c.Cpuid) + } + } + } + return cpulist, nil +} + +// for a given socket id, it returns the list of cpuids +func getCpuListOfSocketFunc(args ...interface{}) (interface{}, error) { + cpulist := make([]int, 0) + switch in := args[0].(type) { + case int: + for _, c := range topo.CpuData() { + if c.Socket == in { + cpulist = append(cpulist, c.Cpuid) + } + } + } + return cpulist, nil +} + +// for a given id of a NUMA domain, it returns the list of cpuids +func getCpuListOfNumaDomainFunc(args ...interface{}) (interface{}, error) { + cpulist := make([]int, 0) + switch in := args[0].(type) { + case int: + for _, c := range topo.CpuData() { + if c.Numadomain == in { + cpulist = append(cpulist, c.Cpuid) + } + } + } + return cpulist, nil +} + +// for a given CPU die id, it returns the list of cpuids +func getCpuListOfDieFunc(args ...interface{}) (interface{}, error) { + cpulist := make([]int, 0) + switch in := args[0].(type) { + case int: + for _, c := range topo.CpuData() { + if c.Die == in { + cpulist = append(cpulist, c.Cpuid) + } + } + } + return cpulist, nil +} + +// wrapper function to get a list of all cpuids of the node +func getCpuListOfNode(args ...interface{}) (interface{}, error) { + return topo.CpuList(), nil +} + +// helper function to get the cpuid list for a CCMetric type tag set (type and type-id) +// since there is no access to the metric data in the function, is should be called like +// `getCpuListOfType()` +func getCpuListOfType(args ...interface{}) (interface{}, error) { + cpulist := make([]int, 0) + switch typ := args[0].(type) { + case string: + switch typ { + case "node": + return topo.CpuList(), nil + case "socket": + return getCpuListOfSocketFunc(args[1]) + case "numadomain": + return getCpuListOfNumaDomainFunc(args[1]) + case "core": + return getCpuListOfCoreFunc(args[1]) + case "cpu": + var cpu int + + switch id := args[1].(type) { + case string: + _, err := fmt.Scanf(id, "%d", &cpu) + if err == nil { + cpulist = append(cpulist, cpu) + } + case int: + cpulist = append(cpulist, id) + case int64: + cpulist = append(cpulist, int(id)) + } + + } + } + return cpulist, errors.New("no valid args type and type-id") +} diff --git a/internal/metricRouter/metricCache.go b/internal/metricRouter/metricCache.go new file mode 100644 index 0000000..1cfd8c3 --- /dev/null +++ b/internal/metricRouter/metricCache.go @@ -0,0 +1,176 @@ +package metricRouter + +import ( + "sync" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" +) + +type metricCachePeriod struct { + startstamp time.Time + stopstamp time.Time + numMetrics int + sizeMetrics int + metrics []lp.CCMetric +} + +// Metric cache data structure +type metricCache struct { + numPeriods int + curPeriod int + intervals []*metricCachePeriod + wg *sync.WaitGroup + ticker mct.MultiChanTicker + tickchan chan time.Time + done chan bool + output chan lp.CCMetric + aggEngine MetricAggregator +} + +type MetricCache interface { + Init(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error + Start() + Add(metric lp.CCMetric) + GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric) + AddAggregation(name, function, condition string, tags, meta map[string]string) error + DeleteAggregation(name string) error + Close() +} + +func (c *metricCache) Init(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error { + var err error = nil + c.done = make(chan bool) + c.wg = wg + c.ticker = ticker + c.numPeriods = numPeriods + c.output = output + c.intervals = make([]*metricCachePeriod, 0) + for i := 0; i < c.numPeriods+1; i++ { + p := new(metricCachePeriod) + p.numMetrics = 0 + p.sizeMetrics = 0 + p.metrics = make([]lp.CCMetric, 0) + c.intervals = append(c.intervals, p) + } + + // Create a new aggregation engine. No separate goroutine at the moment + // The code is executed by the MetricCache goroutine + c.aggEngine, err = NewAggregator(c.output) + if err != nil { + cclog.ComponentError("MetricCache", "Cannot create aggregator") + return err + } + + return nil +} + +// Start starts the metric cache +func (c *metricCache) Start() { + + c.tickchan = make(chan time.Time) + c.ticker.AddChannel(c.tickchan) + // Router cache is done + done := func() { + cclog.ComponentDebug("MetricCache", "DONE") + close(c.done) + } + + // Rotate cache interval + rotate := func(timestamp time.Time) int { + oldPeriod := c.curPeriod + c.curPeriod = oldPeriod + 1 + if c.curPeriod >= c.numPeriods { + c.curPeriod = 0 + } + c.intervals[oldPeriod].numMetrics = 0 + c.intervals[oldPeriod].stopstamp = timestamp + c.intervals[c.curPeriod].startstamp = timestamp + c.intervals[c.curPeriod].stopstamp = timestamp + return oldPeriod + } + + c.wg.Add(1) + go func() { + defer c.wg.Done() + for { + select { + case <-c.done: + done() + return + case tick := <-c.tickchan: + old := rotate(tick) + // Get the last period and evaluate aggregation metrics + starttime, endtime, metrics := c.GetPeriod(old) + if len(metrics) > 0 { + c.aggEngine.Eval(starttime, endtime, metrics) + } else { + // This message is also printed in the first interval after startup + cclog.ComponentDebug("MetricCache", "EMPTY INTERVAL?") + } + } + } + }() + cclog.ComponentDebug("MetricCache", "START") +} + +// Add a metric to the cache. The interval is defined by the global timer (rotate() in Start()) +// The intervals list is used as round-robin buffer and the metric list grows dynamically and +// to avoid reallocations +func (c *metricCache) Add(metric lp.CCMetric) { + if c.curPeriod >= 0 && c.curPeriod < c.numPeriods { + p := c.intervals[c.curPeriod] + if p.numMetrics < p.sizeMetrics { + p.metrics[p.numMetrics] = metric + p.numMetrics = p.numMetrics + 1 + p.stopstamp = metric.Time() + } else { + p.metrics = append(p.metrics, metric) + p.numMetrics = p.numMetrics + 1 + p.sizeMetrics = p.sizeMetrics + 1 + p.stopstamp = metric.Time() + } + } +} + +func (c *metricCache) AddAggregation(name, function, condition string, tags, meta map[string]string) error { + return c.aggEngine.AddAggregation(name, function, condition, tags, meta) +} + +func (c *metricCache) DeleteAggregation(name string) error { + return c.aggEngine.DeleteAggregation(name) +} + +// Get all metrics of a interval. The index is the difference to the current interval, so index=0 +// is the current one, index=1 the last interval and so on. Returns and empty array if a wrong index +// is given (negative index, index larger than configured number of total intervals, ...) +func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric) { + if index >= 0 && index < c.numPeriods { + pindex := c.curPeriod - index + if pindex < 0 { + pindex = c.numPeriods - pindex + } + if pindex >= 0 && pindex < c.numPeriods { + return c.intervals[pindex].startstamp, c.intervals[pindex].stopstamp, c.intervals[pindex].metrics + } + } + return time.Now(), time.Now(), make([]lp.CCMetric, 0) +} + +// Close finishes / stops the metric cache +func (c *metricCache) Close() { + cclog.ComponentDebug("MetricCache", "CLOSE") + c.done <- true +} + +func NewCache(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) (MetricCache, error) { + c := new(metricCache) + err := c.Init(output, ticker, wg, numPeriods) + if err != nil { + return nil, err + } + return c, err +} diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 96a2f05..870af02 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -23,23 +23,28 @@ type metricRouterTagConfig struct { // Metric router configuration type metricRouterConfig struct { - AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met - DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met - IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval? + AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met + DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met + IntervalAgg []metricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval + IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval? + NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation } // Metric router data structure type metricRouter struct { - hostname string // Hostname used in tags - coll_input chan lp.CCMetric // Input channel from CollectorManager - recv_input chan lp.CCMetric // Input channel from ReceiveManager - outputs []chan lp.CCMetric // List of all output channels - done chan bool // channel to finish / stop metric router - wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector - timestamp time.Time // timestamp periodically updated by ticker each interval - timerdone chan bool // channel to finish / stop timestamp updater - ticker mct.MultiChanTicker // periodically ticking once each interval - config metricRouterConfig // json encoded config for metric router + hostname string // Hostname used in tags + coll_input chan lp.CCMetric // Input channel from CollectorManager + recv_input chan lp.CCMetric // Input channel from ReceiveManager + cache_input chan lp.CCMetric // Input channel from MetricCache + outputs []chan lp.CCMetric // List of all output channels + done chan bool // channel to finish / stop metric router + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + timestamp time.Time // timestamp periodically updated by ticker each interval + timerdone chan bool // channel to finish / stop timestamp updater + ticker mct.MultiChanTicker // periodically ticking once each interval + config metricRouterConfig // json encoded config for metric router + cache MetricCache // pointer to MetricCache + cachewg sync.WaitGroup // wait group for MetricCache } // MetricRouter access functions @@ -61,6 +66,7 @@ type MetricRouter interface { func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error { r.outputs = make([]chan lp.CCMetric, 0) r.done = make(chan bool) + r.cache_input = make(chan lp.CCMetric) r.wg = wg r.ticker = ticker @@ -86,6 +92,18 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout cclog.ComponentError("MetricRouter", err.Error()) return err } + numIntervals := r.config.NumCacheIntervals + if numIntervals <= 0 { + numIntervals = 1 + } + r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, numIntervals) + if err != nil { + cclog.ComponentError("MetricRouter", "MetricCache initialization failed:", err.Error()) + return err + } + for _, agg := range r.config.IntervalAgg { + r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta) + } return nil } @@ -211,6 +229,9 @@ func (r *metricRouter) Start() { } } + // Start Metric Cache + r.cache.Start() + r.wg.Add(1) go func() { defer r.wg.Done() @@ -227,6 +248,7 @@ func (r *metricRouter) Start() { p.SetTime(r.timestamp) } forward(p) + r.cache.Add(p) case p := <-r.recv_input: // receive from receive manager @@ -234,6 +256,11 @@ func (r *metricRouter) Start() { p.SetTime(r.timestamp) } forward(p) + + case p := <-r.cache_input: + // receive from metric collector + p.AddTag("hostname", r.hostname) + forward(p) } } }() @@ -267,6 +294,8 @@ func (r *metricRouter) Close() { // wait for close of channel r.timerdone <-r.timerdone } + r.cache.Close() + r.cachewg.Wait() } // New creates a new initialized metric router