diff --git a/.github/ci-collectors.json b/.github/ci-collectors.json new file mode 100644 index 0000000..3497fc0 --- /dev/null +++ b/.github/ci-collectors.json @@ -0,0 +1,6 @@ +{ + "tempstat": {}, + "diskstat": {}, + "memstat": {}, + "cpustat": {} +} diff --git a/.github/ci-config.json b/.github/ci-config.json index 402388d..15b2e6f 100644 --- a/.github/ci-config.json +++ b/.github/ci-config.json @@ -1,52 +1,8 @@ { - "sink": { - "user": "testuser", - "password": "testpass", - "host": "127.0.0.1", - "port": "9090", - "database": "testdb", - "organization": "testorg", - "type": "stdout" - }, - "interval": 3, - "duration": 1, - "collectors": [ - "tempstat", - "loadavg", - "memstat", - "netstat", - "ibstat", - "lustrestat", - "cpustat", - "topprocs", - "nvidia", - "diskstat", - "ipmistat", - "gpfs", - "cpufreq", - "cpufreq_cpuinfo" - ], - "default_tags": { - "cluster": "testcluster" - }, - "receiver": { - "type": "none" - }, - "collect_config": { - "topprocs": { - "num_procs": 2 - }, - "tempstat": { - "tag_override": { - "hwmon0": { - "type": "socket", - "type-id": "0" - }, - "hwmon1": { - "type": "socket", - "type-id": "1" - } - } - } - } -} \ No newline at end of file + "sinks": ".github/ci-sinks.json", + "collectors" : ".github/ci-collectors.json", + "receivers" : ".github/ci-receivers.json", + "router" : ".github/ci-router.json", + "interval": 5, + "duration": 1 +} diff --git a/.github/ci-receivers.json b/.github/ci-receivers.json new file mode 100644 index 0000000..fe51488 --- /dev/null +++ b/.github/ci-receivers.json @@ -0,0 +1 @@ +[] diff --git a/.github/ci-router.json b/.github/ci-router.json new file mode 100644 index 0000000..0146768 --- /dev/null +++ b/.github/ci-router.json @@ -0,0 +1,37 @@ +{ + "add_tags": [ + { + "key": "cluster", + "value": "testcluster", + "if": "*" + }, + { + "key": "test", + "value": "testing", + "if": "name == 'temp_package_id_0'" + } + ], + "delete_tags": [ + { + "key": "unit", + "value": "*", + "if": "*" + } + ], + "interval_aggregates": [ + { + "name": "temp_cores_avg", + "function": "avg(values)", + "if": "match('temp_core_%d+', metric.Name())", + "tags": { + "type": "node" + }, + "meta": { + "group": "", + "unit": "", + "source": "MetricAggregator" + } + } + ], + "interval_timestamp": true +} diff --git a/.github/ci-sinks.json b/.github/ci-sinks.json new file mode 100644 index 0000000..d304018 --- /dev/null +++ b/.github/ci-sinks.json @@ -0,0 +1,6 @@ +[ + { + "type" : "stdout", + "meta_as_tags" : true + } +] diff --git a/.github/workflows/rpmbuild.yml b/.github/workflows/rpmbuild.yml index 3e121d0..a7aee22 100644 --- a/.github/workflows/rpmbuild.yml +++ b/.github/workflows/rpmbuild.yml @@ -2,7 +2,7 @@ name: Run RPM Build on: push jobs: - build: + build-centos8: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 @@ -21,3 +21,41 @@ jobs: with: name: cc-metric-collector SRPM CentOS8 path: ${{ steps.rpm.outputs.source_rpm_path }} + build-centos-latest: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: TomTheBear/rpmbuild@centos_latest + id: rpm + name: Build RPM package on CentOS 'Latest' + with: + spec_file: "./scripts/cc-metric-collector.spec" + - name: Save RPM as artifact + uses: actions/upload-artifact@v1.0.0 + with: + name: cc-metric-collector RPM CentOS 'Latest' + path: ${{ steps.rpm.outputs.rpm_dir_path }} + - name: Save SRPM as artifact + uses: actions/upload-artifact@v1.0.0 + with: + name: cc-metric-collector SRPM CentOS 'Latest' + path: ${{ steps.rpm.outputs.source_rpm_path }} + build-alma-8_5: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: TomTheBear/rpmbuild@alma8.5 + id: rpm + name: Build RPM package on AlmaLinux 8.5 + with: + spec_file: "./scripts/cc-metric-collector.spec" + - name: Save RPM as artifact + uses: actions/upload-artifact@v1.0.0 + with: + name: cc-metric-collector RPM AlmaLinux 8.5 + path: ${{ steps.rpm.outputs.rpm_dir_path }} + - name: Save SRPM as artifact + uses: actions/upload-artifact@v1.0.0 + with: + name: cc-metric-collector SRPM AlmaLinux 8.5 + path: ${{ steps.rpm.outputs.source_rpm_path }} diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index 7b0a9b7..f91db20 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -34,17 +34,18 @@ var AvailableCollectors = map[string]MetricCollector{ "nfsstat": new(NfsCollector), } +// Metric collector manager data structure type collectorManager struct { - collectors []MetricCollector - output chan lp.CCMetric // List of all output channels - done chan bool // channel to finish / stop metric collector manager - ticker mct.MultiChanTicker - duration time.Duration - wg *sync.WaitGroup - config map[string]json.RawMessage + collectors []MetricCollector // List of metric collectors to use + output chan lp.CCMetric // Output channels + done chan bool // channel to finish / stop metric collector manager + ticker mct.MultiChanTicker // periodically ticking once each interval + duration time.Duration // duration (for metrics that measure over a given duration) + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + config map[string]json.RawMessage // json encoded config for collector manager } -// Metric collector access functions +// Metric collector manager access functions type CollectorManager interface { Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error AddOutput(output chan lp.CCMetric) @@ -53,9 +54,9 @@ type CollectorManager interface { } // Init initializes a new metric collector manager by setting up: -// * output channels +// * output channel // * done channel -// * wait group synchronization (from variable wg) +// * wait group synchronization for goroutines (from variable wg) // * ticker (from variable ticker) // * configuration (read from config file in variable collectConfigFile) // Initialization is done for all configured collectors @@ -82,20 +83,20 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat } // Initialize configured collectors - for k, cfg := range cm.config { - if _, found := AvailableCollectors[k]; !found { - cclog.ComponentError("CollectorManager", "SKIP unknown collector", k) + for collectorName, collectorCfg := range cm.config { + if _, found := AvailableCollectors[collectorName]; !found { + cclog.ComponentError("CollectorManager", "SKIP unknown collector", collectorName) continue } - c := AvailableCollectors[k] + collector := AvailableCollectors[collectorName] - err = c.Init(cfg) + err = collector.Init(collectorCfg) if err != nil { - cclog.ComponentError("CollectorManager", "Collector", k, "initialization failed:", err.Error()) + cclog.ComponentError("CollectorManager", "Collector", collectorName, "initialization failed:", err.Error()) continue } - cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", c.Name()) - cm.collectors = append(cm.collectors, c) + cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name()) + cm.collectors = append(cm.collectors, collector) } return nil } @@ -114,6 +115,7 @@ func (cm *collectorManager) Start() { for _, c := range cm.collectors { c.Close() } + close(cm.done) cclog.ComponentDebug("CollectorManager", "DONE") } @@ -153,11 +155,13 @@ func (cm *collectorManager) AddOutput(output chan lp.CCMetric) { func (cm *collectorManager) Close() { cclog.ComponentDebug("CollectorManager", "CLOSE") cm.done <- true + // wait for close of channel cm.done + <-cm.done } // New creates a new initialized metric collector manager func New(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) { - cm := &collectorManager{} + cm := new(collectorManager) err := cm.Init(ticker, duration, wg, collectConfigFile) if err != nil { return nil, err diff --git a/collectors/gpfsMetric.go b/collectors/gpfsMetric.go index bc1852b..53db1c2 100644 --- a/collectors/gpfsMetric.go +++ b/collectors/gpfsMetric.go @@ -130,14 +130,21 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { continue } - timestampInt, err := strconv.ParseInt(key_value["_t_"]+key_value["_tu_"], 10, 64) - timestamp := time.UnixMicro(timestampInt) + sec, err := strconv.ParseInt(key_value["_t_"], 10, 64) if err != nil { fmt.Fprintf(os.Stderr, - "GpfsCollector.Read(): Failed to convert time stamp '%s': %s\n", - key_value["_t_"]+key_value["_tu_"], err.Error()) + "GpfsCollector.Read(): Failed to convert seconds to int '%s': %v\n", + key_value["_t_"], err) continue } + msec, err := strconv.ParseInt(key_value["_tu_"], 10, 64) + if err != nil { + fmt.Fprintf(os.Stderr, + "GpfsCollector.Read(): Failed to convert micro seconds to int '%s': %v\n", + key_value["_tu_"], err) + continue + } + timestamp := time.Unix(sec, msec*1000) // bytes read bytesRead, err := strconv.ParseInt(key_value["_br_"], 10, 64) diff --git a/go.mod b/go.mod index da4f3ea..0789f7e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ClusterCockpit/cc-metric-collector -go 1.17 +go 1.16 require ( github.com/NVIDIA/go-nvml v0.11.1-0 @@ -12,14 +12,7 @@ require ( ) require ( - github.com/deepmap/oapi-codegen v1.8.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/nats-io/nats-server/v2 v2.7.0 // indirect - github.com/nats-io/nkeys v0.3.0 // indirect - github.com/nats-io/nuid v1.0.1 // indirect - github.com/pkg/errors v0.9.1 // indirect - golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect - golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect google.golang.org/protobuf v1.27.1 // indirect - gopkg.in/yaml.v2 v2.3.0 // indirect ) diff --git a/internal/ccLogger/cclogger.go b/internal/ccLogger/cclogger.go index 38e7e6b..5135780 100644 --- a/internal/ccLogger/cclogger.go +++ b/internal/ccLogger/cclogger.go @@ -38,7 +38,7 @@ func initLogger() { func Print(e ...interface{}) { initLogger() - defaultLog.Print(e) + defaultLog.Print(e...) } func ComponentPrint(component string, e ...interface{}) { @@ -48,7 +48,7 @@ func ComponentPrint(component string, e ...interface{}) { func Info(e ...interface{}) { initLogger() - infoLog.Print(e) + infoLog.Print(e...) } func ComponentInfo(component string, e ...interface{}) { @@ -58,14 +58,14 @@ func ComponentInfo(component string, e ...interface{}) { func Debug(e ...interface{}) { initLogger() - if globalDebug == true { - debugLog.Print(e) + if globalDebug { + debugLog.Print(e...) } } func ComponentDebug(component string, e ...interface{}) { initLogger() - if globalDebug == true && debugLog != nil { + if globalDebug && debugLog != nil { //CCComponentPrint(debugLog, component, e) debugLog.Print(fmt.Sprintf("[%s] ", component), e) } diff --git a/internal/ccMetric/ccMetric.go b/internal/ccMetric/ccMetric.go index 6b6bda9..05f81ff 100644 --- a/internal/ccMetric/ccMetric.go +++ b/internal/ccMetric/ccMetric.go @@ -2,9 +2,10 @@ package ccmetric import ( "fmt" - lp "github.com/influxdata/line-protocol" // MIT license "sort" "time" + + lp "github.com/influxdata/line-protocol" // MIT license ) // Most functions are derived from github.com/influxdata/line-protocol/metric.go @@ -24,6 +25,11 @@ type CCMetric interface { AddMeta(key, value string) MetaList() []*lp.Tag RemoveTag(key string) + GetTag(key string) (string, bool) + GetMeta(key string) (string, bool) + GetField(key string) (interface{}, bool) + HasField(key string) bool + RemoveField(key string) } func (m *ccMetric) Meta() map[string]string { @@ -187,6 +193,35 @@ func (m *ccMetric) AddField(key string, value interface{}) { m.fields = append(m.fields, &lp.Field{Key: key, Value: convertField(value)}) } +func (m *ccMetric) GetField(key string) (interface{}, bool) { + for _, field := range m.fields { + if field.Key == key { + return field.Value, true + } + } + return "", false +} + +func (m *ccMetric) HasField(key string) bool { + for _, field := range m.fields { + if field.Key == key { + return true + } + } + return false +} + +func (m *ccMetric) RemoveField(key string) { + for i, field := range m.fields { + if field.Key == key { + copy(m.fields[i:], m.fields[i+1:]) + m.fields[len(m.fields)-1] = nil + m.fields = m.fields[:len(m.fields)-1] + return + } + } +} + func New( name string, tags map[string]string, diff --git a/internal/ccTopology/ccTopology.go b/internal/ccTopology/ccTopology.go new file mode 100644 index 0000000..8d53b05 --- /dev/null +++ b/internal/ccTopology/ccTopology.go @@ -0,0 +1,277 @@ +package ccTopology + +import ( + "fmt" + "io/ioutil" + "log" + "os" + "path/filepath" + "strconv" + "strings" + + cclogger "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" +) + +// intArrayContains scans an array of ints if the value str is present in the array +// If the specified value is found, the corresponding array index is returned. +// The bool value is used to signal success or failure +func intArrayContains(array []int, str int) (int, bool) { + for i, a := range array { + if a == str { + return i, true + } + } + return -1, false +} + +// stringArrayContains scans an array of strings if the value str is present in the array +// If the specified value is found, the corresponding array index is returned. +// The bool value is used to signal success or failure +// func stringArrayContains(array []string, str string) (int, bool) { +// for i, a := range array { +// if a == str { +// return i, true +// } +// } +// return -1, false +// } + +func SocketList() []int { + buffer, err := ioutil.ReadFile("/proc/cpuinfo") + if err != nil { + log.Print(err) + return nil + } + ll := strings.Split(string(buffer), "\n") + var packs []int + for _, line := range ll { + if strings.HasPrefix(line, "physical id") { + lv := strings.Fields(line) + id, err := strconv.ParseInt(lv[3], 10, 32) + if err != nil { + log.Print(err) + return packs + } + _, found := intArrayContains(packs, int(id)) + if !found { + packs = append(packs, int(id)) + } + } + } + return packs +} + +func CpuList() []int { + buffer, err := ioutil.ReadFile("/proc/cpuinfo") + if err != nil { + log.Print(err) + return nil + } + ll := strings.Split(string(buffer), "\n") + var cpulist []int + for _, line := range ll { + if strings.HasPrefix(line, "processor") { + lv := strings.Fields(line) + id, err := strconv.ParseInt(lv[2], 10, 32) + if err != nil { + log.Print(err) + return cpulist + } + _, found := intArrayContains(cpulist, int(id)) + if !found { + cpulist = append(cpulist, int(id)) + } + } + } + return cpulist +} + +type CpuEntry struct { + Cpuid int + SMT int + Core int + Socket int + Numadomain int + Die int +} + +func CpuData() []CpuEntry { + + fileToInt := func(path string) int { + buffer, err := ioutil.ReadFile(path) + if err != nil { + log.Print(err) + cclogger.ComponentError("ccTopology", "Reading", path, ":", err.Error()) + return -1 + } + sbuffer := strings.Replace(string(buffer), "\n", "", -1) + var id int64 + //_, err = fmt.Scanf("%d", sbuffer, &id) + id, err = strconv.ParseInt(sbuffer, 10, 32) + if err != nil { + cclogger.ComponentError("ccTopology", "Parsing", path, ":", sbuffer, err.Error()) + return -1 + } + return int(id) + } + getCore := func(basepath string) int { + return fileToInt(fmt.Sprintf("%s/core_id", basepath)) + } + + getSocket := func(basepath string) int { + return fileToInt(fmt.Sprintf("%s/physical_package_id", basepath)) + } + + getDie := func(basepath string) int { + return fileToInt(fmt.Sprintf("%s/die_id", basepath)) + } + + getSMT := func(cpuid int, basepath string) int { + buffer, err := ioutil.ReadFile(fmt.Sprintf("%s/thread_siblings_list", basepath)) + if err != nil { + log.Print(err) + } + threadlist := make([]int, 0) + sbuffer := strings.Replace(string(buffer), "\n", "", -1) + for _, x := range strings.Split(sbuffer, ",") { + id, err := strconv.ParseInt(x, 10, 32) + if err != nil { + log.Print(err) + } + threadlist = append(threadlist, int(id)) + } + for i, x := range threadlist { + if x == cpuid { + return i + } + } + return 1 + } + + getNumaDomain := func(basepath string) int { + files, err := filepath.Glob(fmt.Sprintf("%s/node*", basepath)) + if err != nil { + log.Print(err) + } + for _, f := range files { + finfo, err := os.Lstat(f) + if err == nil && (finfo.IsDir() || finfo.Mode()&os.ModeSymlink != 0) { + var id int + parts := strings.Split(f, "/") + _, err = fmt.Scanf("node%d", parts[len(parts)-1], &id) + if err == nil { + return id + } + } + } + return 0 + } + + clist := make([]CpuEntry, 0) + for _, c := range CpuList() { + clist = append(clist, CpuEntry{Cpuid: c}) + } + for _, centry := range clist { + centry.Socket = -1 + centry.Numadomain = -1 + centry.Die = -1 + centry.Core = -1 + // Set base directory for topology lookup + base := fmt.Sprintf("/sys/devices/system/cpu/cpu%d/topology", centry.Cpuid) + + // Lookup CPU core id + centry.Core = getCore(base) + + // Lookup CPU socket id + centry.Socket = getSocket(base) + + // Lookup CPU die id + centry.Die = getDie(base) + + // Lookup SMT thread id + centry.SMT = getSMT(centry.Cpuid, base) + + // Lookup NUMA domain id + centry.Numadomain = getNumaDomain(base) + + } + return clist +} + +type CpuInformation struct { + NumHWthreads int + SMTWidth int + NumSockets int + NumDies int + NumNumaDomains int +} + +func CpuInfo() CpuInformation { + var c CpuInformation + + smt := 0 + numa := 0 + die := 0 + socket := 0 + cdata := CpuData() + for _, d := range cdata { + if d.SMT > smt { + smt = d.SMT + } + if d.Numadomain > numa { + numa = d.Numadomain + } + if d.Die > die { + die = d.Die + } + if d.Socket > socket { + socket = d.Socket + } + } + c.NumNumaDomains = numa + 1 + c.SMTWidth = smt + 1 + c.NumDies = die + 1 + c.NumSockets = socket + 1 + c.NumHWthreads = len(cdata) + return c +} + +func GetCpuSocket(cpuid int) int { + cdata := CpuData() + for _, d := range cdata { + if d.Cpuid == cpuid { + return d.Socket + } + } + return -1 +} + +func GetCpuNumaDomain(cpuid int) int { + cdata := CpuData() + for _, d := range cdata { + if d.Cpuid == cpuid { + return d.Numadomain + } + } + return -1 +} + +func GetCpuDie(cpuid int) int { + cdata := CpuData() + for _, d := range cdata { + if d.Cpuid == cpuid { + return d.Die + } + } + return -1 +} + +func GetCpuCore(cpuid int) int { + cdata := CpuData() + for _, d := range cdata { + if d.Cpuid == cpuid { + return d.Core + } + } + return -1 +} diff --git a/internal/metricRouter/metricAggregator.go b/internal/metricRouter/metricAggregator.go new file mode 100644 index 0000000..41c5276 --- /dev/null +++ b/internal/metricRouter/metricAggregator.go @@ -0,0 +1,291 @@ +package metricRouter + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" + + "github.com/PaesslerAG/gval" +) + +type metricAggregatorIntervalConfig struct { + Name string `json:"name"` // Metric name for the new metric + Function string `json:"function"` // Function to apply on the metric + Condition string `json:"if"` // Condition for applying function + Tags map[string]string `json:"tags"` // Tags for the new metric + Meta map[string]string `json:"meta"` // Meta information for the new metric + gvalCond gval.Evaluable + gvalFunc gval.Evaluable +} + +type metricAggregator struct { + functions []*metricAggregatorIntervalConfig + constants map[string]interface{} + language gval.Language + output chan lp.CCMetric +} + +type MetricAggregator interface { + AddAggregation(name, function, condition string, tags, meta map[string]string) error + DeleteAggregation(name string) error + Init(output chan lp.CCMetric) error + Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMetric) +} + +var metricCacheLanguage = gval.NewLanguage( + gval.Base(), + gval.Function("sum", sumfunc), + gval.Function("min", minfunc), + gval.Function("avg", avgfunc), + gval.Function("mean", avgfunc), + gval.Function("max", maxfunc), + gval.Function("len", lenfunc), + gval.Function("median", medianfunc), + gval.InfixOperator("in", infunc), + gval.Function("match", matchfunc), + gval.Function("getCpuCore", getCpuCoreFunc), + gval.Function("getCpuSocket", getCpuSocketFunc), + gval.Function("getCpuNuma", getCpuNumaDomainFunc), + gval.Function("getCpuDie", getCpuDieFunc), + gval.Function("getSockCpuList", getCpuListOfSocketFunc), + gval.Function("getNumaCpuList", getCpuListOfNumaDomainFunc), + gval.Function("getDieCpuList", getCpuListOfDieFunc), + gval.Function("getCoreCpuList", getCpuListOfCoreFunc), + gval.Function("getCpuList", getCpuListOfNode), + gval.Function("getCpuListOfType", getCpuListOfType), +) + +func (c *metricAggregator) Init(output chan lp.CCMetric) error { + c.output = output + c.functions = make([]*metricAggregatorIntervalConfig, 0) + c.constants = make(map[string]interface{}) + + // add constants like hostname, numSockets, ... to constants list + // Set hostname + hostname, err := os.Hostname() + if err != nil { + cclog.Error(err.Error()) + return err + } + // Drop domain part of host name + c.constants["hostname"] = strings.SplitN(hostname, `.`, 2)[0] + cinfo := topo.CpuInfo() + c.constants["numHWThreads"] = cinfo.NumHWthreads + c.constants["numSockets"] = cinfo.NumSockets + c.constants["numNumaDomains"] = cinfo.NumNumaDomains + c.constants["numDies"] = cinfo.NumDies + c.constants["smtWidth"] = cinfo.SMTWidth + + c.language = gval.NewLanguage( + gval.Base(), + metricCacheLanguage, + ) + + // Example aggregation function + // var f metricCacheFunctionConfig + // f.Name = "temp_cores_avg" + // //f.Condition = `"temp_core_" in name` + // f.Condition = `match("temp_core_%d+", metric.Name())` + // f.Function = `avg(values)` + // f.Tags = map[string]string{"type": "node"} + // f.Meta = map[string]string{"group": "IPMI", "unit": "degC", "source": "TempCollector"} + // c.functions = append(c.functions, &f) + return nil +} + +func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMetric) { + vars := make(map[string]interface{}) + for k, v := range c.constants { + vars[k] = v + } + vars["starttime"] = starttime + vars["endtime"] = endtime + for _, f := range c.functions { + cclog.ComponentDebug("MetricCache", "COLLECT", f.Name, "COND", f.Condition) + values := make([]float64, 0) + matches := make([]lp.CCMetric, 0) + for _, m := range metrics { + vars["metric"] = m + //value, err := gval.Evaluate(f.Condition, vars, c.language) + value, err := f.gvalCond.EvalBool(context.Background(), vars) + if err != nil { + cclog.ComponentError("MetricCache", "COLLECT", f.Name, "COND", f.Condition, ":", err.Error()) + continue + } + if value { + v, valid := m.GetField("value") + if valid { + switch x := v.(type) { + case float64: + values = append(values, x) + case float32: + case int: + case int64: + values = append(values, float64(x)) + case bool: + if x { + values = append(values, float64(1.0)) + } else { + values = append(values, float64(0.0)) + } + default: + cclog.ComponentError("MetricCache", "COLLECT ADD VALUE", v, "FAILED") + } + } + matches = append(matches, m) + } + } + delete(vars, "metric") + cclog.ComponentDebug("MetricCache", "EVALUATE", f.Name, "METRICS", len(values), "CALC", f.Function) + vars["values"] = values + vars["metrics"] = matches + if len(values) > 0 { + value, err := gval.Evaluate(f.Function, vars, c.language) + if err != nil { + cclog.ComponentError("MetricCache", "EVALUATE", f.Name, "METRICS", len(values), "CALC", f.Function, ":", err.Error()) + break + } + + copy_tags := func(tags map[string]string, metrics []lp.CCMetric) map[string]string { + out := make(map[string]string) + for key, value := range tags { + switch value { + case "": + for _, m := range metrics { + v, err := m.GetTag(key) + if err { + out[key] = v + } + } + default: + out[key] = value + } + } + return out + } + copy_meta := func(meta map[string]string, metrics []lp.CCMetric) map[string]string { + out := make(map[string]string) + for key, value := range meta { + switch value { + case "": + for _, m := range metrics { + v, err := m.GetMeta(key) + if err { + out[key] = v + } + } + default: + out[key] = value + } + } + return out + } + tags := copy_tags(f.Tags, matches) + meta := copy_meta(f.Meta, matches) + + var m lp.CCMetric + switch t := value.(type) { + case float64: + m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime) + case float32: + m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime) + case int: + m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime) + case int64: + m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime) + case string: + m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime) + default: + cclog.ComponentError("MetricCache", "Gval returned invalid type", t, "skipping metric", f.Name) + } + if err != nil { + cclog.ComponentError("MetricCache", "Cannot create metric from Gval result", value, ":", err.Error()) + } + cclog.ComponentDebug("MetricCache", "SEND", m) + select { + case c.output <- m: + default: + } + + } + } +} + +func (c *metricAggregator) AddAggregation(name, function, condition string, tags, meta map[string]string) error { + // Since "" cannot be used inside of JSON strings, we use '' and replace them here because gval does not like '' + // but wants "" + newfunc := strings.ReplaceAll(function, "'", "\"") + newcond := strings.ReplaceAll(condition, "'", "\"") + gvalCond, err := gval.Full(metricCacheLanguage).NewEvaluable(newcond) + if err != nil { + cclog.ComponentError("MetricAggregator", "Cannot add aggregation, invalid if condition", newcond, ":", err.Error()) + return err + } + gvalFunc, err := gval.Full(metricCacheLanguage).NewEvaluable(newfunc) + if err != nil { + cclog.ComponentError("MetricAggregator", "Cannot add aggregation, invalid function condition", newfunc, ":", err.Error()) + return err + } + for _, agg := range c.functions { + if agg.Name == name { + agg.Name = name + agg.Condition = newcond + agg.Function = newfunc + agg.Tags = tags + agg.Meta = meta + agg.gvalCond = gvalCond + agg.gvalFunc = gvalFunc + return nil + } + } + var agg metricAggregatorIntervalConfig + agg.Name = name + agg.Condition = newcond + agg.gvalCond = gvalCond + agg.Function = newfunc + agg.gvalFunc = gvalFunc + agg.Tags = tags + agg.Meta = meta + c.functions = append(c.functions, &agg) + return nil +} + +func (c *metricAggregator) DeleteAggregation(name string) error { + for i, agg := range c.functions { + if agg.Name == name { + copy(c.functions[i:], c.functions[i+1:]) + c.functions[len(c.functions)-1] = nil + c.functions = c.functions[:len(c.functions)-1] + return nil + } + } + return fmt.Errorf("no aggregation for metric name %s", name) +} + +func (c *metricAggregator) AddConstant(name string, value interface{}) { + c.constants[name] = value +} + +func (c *metricAggregator) DelConstant(name string) { + delete(c.constants, name) +} + +func (c *metricAggregator) AddFunction(name string, function func(args ...interface{}) (interface{}, error)) { + c.language = gval.NewLanguage(c.language, gval.Function(name, function)) +} + +func NewAggregator(output chan lp.CCMetric) (MetricAggregator, error) { + a := new(metricAggregator) + err := a.Init(output) + if err != nil { + return nil, err + } + return a, err +} diff --git a/internal/metricRouter/metricAggregatorFunctions.go b/internal/metricRouter/metricAggregatorFunctions.go new file mode 100644 index 0000000..4133a4b --- /dev/null +++ b/internal/metricRouter/metricAggregatorFunctions.go @@ -0,0 +1,376 @@ +package metricRouter + +import ( + "errors" + "fmt" + "math" + "regexp" + "sort" + "strings" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" +) + +/* + * Arithmetic functions on value arrays + */ + +// Sum up values +func sumfunc(args ...interface{}) (interface{}, error) { + s := 0.0 + values, ok := args[0].([]float64) + if ok { + cclog.ComponentDebug("MetricCache", "SUM FUNC START") + for _, x := range values { + s += x + } + cclog.ComponentDebug("MetricCache", "SUM FUNC END", s) + } else { + cclog.ComponentDebug("MetricCache", "SUM FUNC CAST FAILED") + } + return s, nil +} + +// Get the minimum value +func minfunc(args ...interface{}) (interface{}, error) { + var err error = nil + switch values := args[0].(type) { + case []float64: + var s float64 = math.MaxFloat64 + for _, x := range values { + if x < s { + s = x + } + } + return s, nil + case []float32: + var s float32 = math.MaxFloat32 + for _, x := range values { + if x < s { + s = x + } + } + return s, nil + case []int: + var s int = math.MaxInt + for _, x := range values { + if x < s { + s = x + } + } + return s, nil + case []int64: + var s int64 = math.MaxInt64 + for _, x := range values { + if x < s { + s = x + } + } + return s, nil + case []int32: + var s int32 = math.MaxInt32 + for _, x := range values { + if x < s { + s = x + } + } + return s, nil + default: + err = errors.New("function 'min' only on list of values (float64, float32, int, int32, int64)") + } + + return 0.0, err +} + +// Get the average or mean value +func avgfunc(args ...interface{}) (interface{}, error) { + switch values := args[0].(type) { + case []float64: + var s float64 = 0 + for _, x := range values { + s += x + } + return s / float64(len(values)), nil + case []float32: + var s float32 = 0 + for _, x := range values { + s += x + } + return s / float32(len(values)), nil + case []int: + var s int = 0 + for _, x := range values { + s += x + } + return s / len(values), nil + case []int64: + var s int64 = 0 + for _, x := range values { + s += x + } + return s / int64(len(values)), nil + } + return 0.0, nil +} + +// Get the maximum value +func maxfunc(args ...interface{}) (interface{}, error) { + s := 0.0 + values, ok := args[0].([]float64) + if ok { + for _, x := range values { + if x > s { + s = x + } + } + } + return s, nil +} + +// Get the median value +func medianfunc(args ...interface{}) (interface{}, error) { + switch values := args[0].(type) { + case []float64: + sort.Float64s(values) + return values[len(values)/2], nil + // case []float32: + // sort.Float64s(values) + // return values[len(values)/2], nil + case []int: + sort.Ints(values) + return values[len(values)/2], nil + + // case []int64: + // sort.Ints(values) + // return values[len(values)/2], nil + // case []int32: + // sort.Ints(values) + // return values[len(values)/2], nil + } + return 0.0, errors.New("function 'median()' only on lists of type float64 and int") +} + +/* + * Get number of values in list. Returns always an int + */ + +func lenfunc(args ...interface{}) (interface{}, error) { + var err error = nil + var length int = 0 + switch values := args[0].(type) { + case []float64: + length = len(values) + case []float32: + length = len(values) + case []int: + length = len(values) + case []int64: + length = len(values) + case []int32: + length = len(values) + case float64: + err = errors.New("function 'len' can only be applied on arrays and strings") + case float32: + err = errors.New("function 'len' can only be applied on arrays and strings") + case int: + err = errors.New("function 'len' can only be applied on arrays and strings") + case int64: + err = errors.New("function 'len' can only be applied on arrays and strings") + case string: + length = len(values) + } + return length, err +} + +/* + * Check if a values is in a list + * In constrast to most of the other functions, this one is an infix operator for + * - substring matching: `"abc" in "abcdef"` -> true + * - substring matching with int casting: `3 in "abd3"` -> true + * - search for an int in an int list: `3 in getCpuList()` -> true (if you have more than 4 CPU hardware threads) + */ + +func infunc(a interface{}, b interface{}) (interface{}, error) { + switch match := a.(type) { + case string: + switch total := b.(type) { + case string: + return strings.Contains(total, match), nil + } + case int: + switch total := b.(type) { + case []int: + for _, x := range total { + if x == match { + return true, nil + } + } + case string: + smatch := fmt.Sprintf("%d", match) + return strings.Contains(total, smatch), nil + } + + } + return false, nil +} + +/* + * Regex matching of strings (metric name, tag keys, tag values, meta keys, meta values) + * Since we cannot use \ inside JSON strings without escaping, we use % instead for the + * format keys \d = %d, \w = %d, ... Not sure how to fix this + */ + +func matchfunc(args ...interface{}) (interface{}, error) { + switch match := args[0].(type) { + case string: + switch total := args[1].(type) { + case string: + smatch := strings.Replace(match, "%", "\\", -1) + regex, err := regexp.Compile(smatch) + if err != nil { + return false, err + } + s := regex.Find([]byte(total)) + return s != nil, nil + } + } + return false, nil +} + +/* + * System topology getter functions + */ + +// for a given cpuid, it returns the core id +func getCpuCoreFunc(args ...interface{}) (interface{}, error) { + switch cpuid := args[0].(type) { + case int: + return topo.GetCpuCore(cpuid), nil + } + return -1, errors.New("function 'getCpuCore' accepts only an 'int' cpuid") +} + +// for a given cpuid, it returns the socket id +func getCpuSocketFunc(args ...interface{}) (interface{}, error) { + switch cpuid := args[0].(type) { + case int: + return topo.GetCpuSocket(cpuid), nil + } + return -1, errors.New("function 'getCpuCore' accepts only an 'int' cpuid") +} + +// for a given cpuid, it returns the id of the NUMA node +func getCpuNumaDomainFunc(args ...interface{}) (interface{}, error) { + switch cpuid := args[0].(type) { + case int: + return topo.GetCpuNumaDomain(cpuid), nil + } + return -1, errors.New("function 'getCpuNuma' accepts only an 'int' cpuid") +} + +// for a given cpuid, it returns the id of the CPU die +func getCpuDieFunc(args ...interface{}) (interface{}, error) { + switch cpuid := args[0].(type) { + case int: + return topo.GetCpuDie(cpuid), nil + } + return -1, errors.New("function 'getCpuDie' accepts only an 'int' cpuid") +} + +// for a given core id, it returns the list of cpuids +func getCpuListOfCoreFunc(args ...interface{}) (interface{}, error) { + cpulist := make([]int, 0) + switch in := args[0].(type) { + case int: + for _, c := range topo.CpuData() { + if c.Core == in { + cpulist = append(cpulist, c.Cpuid) + } + } + } + return cpulist, nil +} + +// for a given socket id, it returns the list of cpuids +func getCpuListOfSocketFunc(args ...interface{}) (interface{}, error) { + cpulist := make([]int, 0) + switch in := args[0].(type) { + case int: + for _, c := range topo.CpuData() { + if c.Socket == in { + cpulist = append(cpulist, c.Cpuid) + } + } + } + return cpulist, nil +} + +// for a given id of a NUMA domain, it returns the list of cpuids +func getCpuListOfNumaDomainFunc(args ...interface{}) (interface{}, error) { + cpulist := make([]int, 0) + switch in := args[0].(type) { + case int: + for _, c := range topo.CpuData() { + if c.Numadomain == in { + cpulist = append(cpulist, c.Cpuid) + } + } + } + return cpulist, nil +} + +// for a given CPU die id, it returns the list of cpuids +func getCpuListOfDieFunc(args ...interface{}) (interface{}, error) { + cpulist := make([]int, 0) + switch in := args[0].(type) { + case int: + for _, c := range topo.CpuData() { + if c.Die == in { + cpulist = append(cpulist, c.Cpuid) + } + } + } + return cpulist, nil +} + +// wrapper function to get a list of all cpuids of the node +func getCpuListOfNode(args ...interface{}) (interface{}, error) { + return topo.CpuList(), nil +} + +// helper function to get the cpuid list for a CCMetric type tag set (type and type-id) +// since there is no access to the metric data in the function, is should be called like +// `getCpuListOfType()` +func getCpuListOfType(args ...interface{}) (interface{}, error) { + cpulist := make([]int, 0) + switch typ := args[0].(type) { + case string: + switch typ { + case "node": + return topo.CpuList(), nil + case "socket": + return getCpuListOfSocketFunc(args[1]) + case "numadomain": + return getCpuListOfNumaDomainFunc(args[1]) + case "core": + return getCpuListOfCoreFunc(args[1]) + case "cpu": + var cpu int + + switch id := args[1].(type) { + case string: + _, err := fmt.Scanf(id, "%d", &cpu) + if err == nil { + cpulist = append(cpulist, cpu) + } + case int: + cpulist = append(cpulist, id) + case int64: + cpulist = append(cpulist, int(id)) + } + + } + } + return cpulist, errors.New("no valid args type and type-id") +} diff --git a/internal/metricRouter/metricCache.go b/internal/metricRouter/metricCache.go new file mode 100644 index 0000000..1cfd8c3 --- /dev/null +++ b/internal/metricRouter/metricCache.go @@ -0,0 +1,176 @@ +package metricRouter + +import ( + "sync" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" +) + +type metricCachePeriod struct { + startstamp time.Time + stopstamp time.Time + numMetrics int + sizeMetrics int + metrics []lp.CCMetric +} + +// Metric cache data structure +type metricCache struct { + numPeriods int + curPeriod int + intervals []*metricCachePeriod + wg *sync.WaitGroup + ticker mct.MultiChanTicker + tickchan chan time.Time + done chan bool + output chan lp.CCMetric + aggEngine MetricAggregator +} + +type MetricCache interface { + Init(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error + Start() + Add(metric lp.CCMetric) + GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric) + AddAggregation(name, function, condition string, tags, meta map[string]string) error + DeleteAggregation(name string) error + Close() +} + +func (c *metricCache) Init(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error { + var err error = nil + c.done = make(chan bool) + c.wg = wg + c.ticker = ticker + c.numPeriods = numPeriods + c.output = output + c.intervals = make([]*metricCachePeriod, 0) + for i := 0; i < c.numPeriods+1; i++ { + p := new(metricCachePeriod) + p.numMetrics = 0 + p.sizeMetrics = 0 + p.metrics = make([]lp.CCMetric, 0) + c.intervals = append(c.intervals, p) + } + + // Create a new aggregation engine. No separate goroutine at the moment + // The code is executed by the MetricCache goroutine + c.aggEngine, err = NewAggregator(c.output) + if err != nil { + cclog.ComponentError("MetricCache", "Cannot create aggregator") + return err + } + + return nil +} + +// Start starts the metric cache +func (c *metricCache) Start() { + + c.tickchan = make(chan time.Time) + c.ticker.AddChannel(c.tickchan) + // Router cache is done + done := func() { + cclog.ComponentDebug("MetricCache", "DONE") + close(c.done) + } + + // Rotate cache interval + rotate := func(timestamp time.Time) int { + oldPeriod := c.curPeriod + c.curPeriod = oldPeriod + 1 + if c.curPeriod >= c.numPeriods { + c.curPeriod = 0 + } + c.intervals[oldPeriod].numMetrics = 0 + c.intervals[oldPeriod].stopstamp = timestamp + c.intervals[c.curPeriod].startstamp = timestamp + c.intervals[c.curPeriod].stopstamp = timestamp + return oldPeriod + } + + c.wg.Add(1) + go func() { + defer c.wg.Done() + for { + select { + case <-c.done: + done() + return + case tick := <-c.tickchan: + old := rotate(tick) + // Get the last period and evaluate aggregation metrics + starttime, endtime, metrics := c.GetPeriod(old) + if len(metrics) > 0 { + c.aggEngine.Eval(starttime, endtime, metrics) + } else { + // This message is also printed in the first interval after startup + cclog.ComponentDebug("MetricCache", "EMPTY INTERVAL?") + } + } + } + }() + cclog.ComponentDebug("MetricCache", "START") +} + +// Add a metric to the cache. The interval is defined by the global timer (rotate() in Start()) +// The intervals list is used as round-robin buffer and the metric list grows dynamically and +// to avoid reallocations +func (c *metricCache) Add(metric lp.CCMetric) { + if c.curPeriod >= 0 && c.curPeriod < c.numPeriods { + p := c.intervals[c.curPeriod] + if p.numMetrics < p.sizeMetrics { + p.metrics[p.numMetrics] = metric + p.numMetrics = p.numMetrics + 1 + p.stopstamp = metric.Time() + } else { + p.metrics = append(p.metrics, metric) + p.numMetrics = p.numMetrics + 1 + p.sizeMetrics = p.sizeMetrics + 1 + p.stopstamp = metric.Time() + } + } +} + +func (c *metricCache) AddAggregation(name, function, condition string, tags, meta map[string]string) error { + return c.aggEngine.AddAggregation(name, function, condition, tags, meta) +} + +func (c *metricCache) DeleteAggregation(name string) error { + return c.aggEngine.DeleteAggregation(name) +} + +// Get all metrics of a interval. The index is the difference to the current interval, so index=0 +// is the current one, index=1 the last interval and so on. Returns and empty array if a wrong index +// is given (negative index, index larger than configured number of total intervals, ...) +func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric) { + if index >= 0 && index < c.numPeriods { + pindex := c.curPeriod - index + if pindex < 0 { + pindex = c.numPeriods - pindex + } + if pindex >= 0 && pindex < c.numPeriods { + return c.intervals[pindex].startstamp, c.intervals[pindex].stopstamp, c.intervals[pindex].metrics + } + } + return time.Now(), time.Now(), make([]lp.CCMetric, 0) +} + +// Close finishes / stops the metric cache +func (c *metricCache) Close() { + cclog.ComponentDebug("MetricCache", "CLOSE") + c.done <- true +} + +func NewCache(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) (MetricCache, error) { + c := new(metricCache) + err := c.Init(output, ticker, wg, numPeriods) + if err != nil { + return nil, err + } + return c, err +} diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index a321aae..870af02 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -3,6 +3,7 @@ package metricRouter import ( "encoding/json" "os" + "strings" "sync" "time" @@ -22,21 +23,28 @@ type metricRouterTagConfig struct { // Metric router configuration type metricRouterConfig struct { - AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met - DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met - IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically? + AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met + DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met + IntervalAgg []metricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval + IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval? + NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation } +// Metric router data structure type metricRouter struct { - coll_input chan lp.CCMetric // Input channel from CollectorManager - recv_input chan lp.CCMetric // Input channel from ReceiveManager - outputs []chan lp.CCMetric // List of all output channels - done chan bool // channel to finish / stop metric router - wg *sync.WaitGroup - timestamp time.Time // timestamp - timerdone chan bool // channel to finish / stop timestamp updater - ticker mct.MultiChanTicker - config metricRouterConfig + hostname string // Hostname used in tags + coll_input chan lp.CCMetric // Input channel from CollectorManager + recv_input chan lp.CCMetric // Input channel from ReceiveManager + cache_input chan lp.CCMetric // Input channel from MetricCache + outputs []chan lp.CCMetric // List of all output channels + done chan bool // channel to finish / stop metric router + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + timestamp time.Time // timestamp periodically updated by ticker each interval + timerdone chan bool // channel to finish / stop timestamp updater + ticker mct.MultiChanTicker // periodically ticking once each interval + config metricRouterConfig // json encoded config for metric router + cache MetricCache // pointer to MetricCache + cachewg sync.WaitGroup // wait group for MetricCache } // MetricRouter access functions @@ -58,8 +66,20 @@ type MetricRouter interface { func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error { r.outputs = make([]chan lp.CCMetric, 0) r.done = make(chan bool) + r.cache_input = make(chan lp.CCMetric) r.wg = wg r.ticker = ticker + + // Set hostname + hostname, err := os.Hostname() + if err != nil { + cclog.Error(err.Error()) + return err + } + // Drop domain part of host name + r.hostname = strings.SplitN(hostname, `.`, 2)[0] + + // Read metric router config file configFile, err := os.Open(routerConfigFile) if err != nil { cclog.ComponentError("MetricRouter", err.Error()) @@ -72,6 +92,18 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout cclog.ComponentError("MetricRouter", err.Error()) return err } + numIntervals := r.config.NumCacheIntervals + if numIntervals <= 0 { + numIntervals = 1 + } + r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, numIntervals) + if err != nil { + cclog.ComponentError("MetricRouter", "MetricCache initialization failed:", err.Error()) + return err + } + for _, agg := range r.config.IntervalAgg { + r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta) + } return nil } @@ -87,6 +119,7 @@ func (r *metricRouter) StartTimer() { for { select { case <-r.timerdone: + close(r.timerdone) cclog.ComponentDebug("MetricRouter", "TIMER DONE") return case t := <-m: @@ -97,11 +130,11 @@ func (r *metricRouter) StartTimer() { cclog.ComponentDebug("MetricRouter", "TIMER START") } -// EvalCondition evaluates condition Cond for metric data from point -func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error) { - expression, err := govaluate.NewEvaluableExpression(Cond) +// EvalCondition evaluates condition cond for metric data from point +func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, error) { + expression, err := govaluate.NewEvaluableExpression(cond) if err != nil { - cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error()) + cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error()) return false, err } @@ -122,7 +155,7 @@ func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, erro // evaluate condition result, err := expression.Evaluate(params) if err != nil { - cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error()) + cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error()) return false, err } return bool(result.(bool)), err @@ -172,13 +205,21 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) { // Start starts the metric router func (r *metricRouter) Start() { + + // start timer if configured r.timestamp = time.Now() if r.config.IntervalStamp { r.StartTimer() } + + // Router manager is done done := func() { + close(r.done) cclog.ComponentDebug("MetricRouter", "DONE") } + + // Forward takes a received metric, adds or deletes tags + // and forwards it to the output channels forward := func(point lp.CCMetric) { cclog.ComponentDebug("MetricRouter", "FORWARD", point) r.DoAddTags(point) @@ -188,36 +229,50 @@ func (r *metricRouter) Start() { } } + // Start Metric Cache + r.cache.Start() + r.wg.Add(1) go func() { defer r.wg.Done() for { - // RouterLoop: select { case <-r.done: done() return + case p := <-r.coll_input: + // receive from metric collector + p.AddTag("hostname", r.hostname) if r.config.IntervalStamp { p.SetTime(r.timestamp) } forward(p) + r.cache.Add(p) + case p := <-r.recv_input: + // receive from receive manager if r.config.IntervalStamp { p.SetTime(r.timestamp) } forward(p) + + case p := <-r.cache_input: + // receive from metric collector + p.AddTag("hostname", r.hostname) + forward(p) } } }() cclog.ComponentDebug("MetricRouter", "STARTED") } -// AddInput adds a input channel to the metric router +// AddCollectorInput adds a channel between metric collector and metric router func (r *metricRouter) AddCollectorInput(input chan lp.CCMetric) { r.coll_input = input } +// AddReceiverInput adds a channel between metric receiver and metric router func (r *metricRouter) AddReceiverInput(input chan lp.CCMetric) { r.recv_input = input } @@ -231,10 +286,16 @@ func (r *metricRouter) AddOutput(output chan lp.CCMetric) { func (r *metricRouter) Close() { cclog.ComponentDebug("MetricRouter", "CLOSE") r.done <- true + // wait for close of channel r.done + <-r.done if r.config.IntervalStamp { cclog.ComponentDebug("MetricRouter", "TIMER CLOSE") r.timerdone <- true + // wait for close of channel r.timerdone + <-r.timerdone } + r.cache.Close() + r.cachewg.Wait() } // New creates a new initialized metric router diff --git a/internal/multiChanTicker/multiChanTicker.go b/internal/multiChanTicker/multiChanTicker.go index a9394ab..e0eca43 100644 --- a/internal/multiChanTicker/multiChanTicker.go +++ b/internal/multiChanTicker/multiChanTicker.go @@ -23,6 +23,7 @@ func (t *multiChanTicker) Init(duration time.Duration) { t.done = make(chan bool) go func() { done := func() { + close(t.done) cclog.ComponentDebug("MultiChanTicker", "DONE") } for { @@ -52,6 +53,8 @@ func (t *multiChanTicker) AddChannel(channel chan time.Time) { func (t *multiChanTicker) Close() { cclog.ComponentDebug("MultiChanTicker", "CLOSE") t.done <- true + // wait for close of channel t.done + <-t.done } func NewTicker(duration time.Duration) MultiChanTicker { diff --git a/metric-collector.go b/metric-collector.go index 3975b62..8121141 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -5,7 +5,6 @@ import ( "flag" "os" "os/signal" - "strings" "syscall" "github.com/ClusterCockpit/cc-metric-collector/collectors" @@ -45,7 +44,6 @@ func LoadCentralConfiguration(file string, config *CentralConfigFile) error { } type RuntimeConfig struct { - Hostname string Interval time.Duration Duration time.Duration CliArgs map[string]string @@ -213,13 +211,21 @@ func mainFunc() int { } rcfg.Duration = time.Duration(rcfg.ConfigFile.Duration) * time.Second - rcfg.Hostname, err = os.Hostname() - if err != nil { - cclog.Error(err.Error()) + if len(rcfg.ConfigFile.RouterConfigFile) == 0 { + cclog.Error("Metric router configuration file must be set") return 1 } - // Drop domain part of host name - rcfg.Hostname = strings.SplitN(rcfg.Hostname, `.`, 2)[0] + + if len(rcfg.ConfigFile.SinkConfigFile) == 0 { + cclog.Error("Sink configuration file must be set") + return 1 + } + + if len(rcfg.ConfigFile.CollectorConfigFile) == 0 { + cclog.Error("Metric collector configuration file must be set") + return 1 + } + // err = CreatePidfile(rcfg.CliArgs["pidfile"]) // Set log file @@ -231,42 +237,36 @@ func mainFunc() int { rcfg.MultiChanTicker = mct.NewTicker(rcfg.Interval) // Create new metric router - if len(rcfg.ConfigFile.RouterConfigFile) > 0 { - rcfg.MetricRouter, err = mr.New(rcfg.MultiChanTicker, &rcfg.Sync, rcfg.ConfigFile.RouterConfigFile) - if err != nil { - cclog.Error(err.Error()) - return 1 - } + rcfg.MetricRouter, err = mr.New(rcfg.MultiChanTicker, &rcfg.Sync, rcfg.ConfigFile.RouterConfigFile) + if err != nil { + cclog.Error(err.Error()) + return 1 } // Create new sink - if len(rcfg.ConfigFile.SinkConfigFile) > 0 { - rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile) - if err != nil { - cclog.Error(err.Error()) - return 1 - } - - // Connect metric router to sink manager - RouterToSinksChannel := make(chan lp.CCMetric, 200) - rcfg.SinkManager.AddInput(RouterToSinksChannel) - rcfg.MetricRouter.AddOutput(RouterToSinksChannel) + rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile) + if err != nil { + cclog.Error(err.Error()) + return 1 } + // Connect metric router to sink manager + RouterToSinksChannel := make(chan lp.CCMetric, 200) + rcfg.SinkManager.AddInput(RouterToSinksChannel) + rcfg.MetricRouter.AddOutput(RouterToSinksChannel) + // Create new collector manager - if len(rcfg.ConfigFile.CollectorConfigFile) > 0 { - rcfg.CollectManager, err = collectors.New(rcfg.MultiChanTicker, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile) - if err != nil { - cclog.Error(err.Error()) - return 1 - } - - // Connect collector manager to metric router - CollectToRouterChannel := make(chan lp.CCMetric, 200) - rcfg.CollectManager.AddOutput(CollectToRouterChannel) - rcfg.MetricRouter.AddCollectorInput(CollectToRouterChannel) + rcfg.CollectManager, err = collectors.New(rcfg.MultiChanTicker, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile) + if err != nil { + cclog.Error(err.Error()) + return 1 } + // Connect collector manager to metric router + CollectToRouterChannel := make(chan lp.CCMetric, 200) + rcfg.CollectManager.AddOutput(CollectToRouterChannel) + rcfg.MetricRouter.AddCollectorInput(CollectToRouterChannel) + // Create new receive manager if len(rcfg.ConfigFile.ReceiverConfigFile) > 0 { rcfg.ReceiveManager, err = receivers.New(&rcfg.Sync, rcfg.ConfigFile.ReceiverConfigFile) diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index 7c46bd8..02421d3 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -9,6 +9,7 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) +// Map of all available sinks var AvailableSinks = map[string]Sink{ "influxdb": new(InfluxSink), "stdout": new(StdoutSink), @@ -17,14 +18,16 @@ var AvailableSinks = map[string]Sink{ "ganglia": new(GangliaSink), } +// Metric collector manager data structure type sinkManager struct { - input chan lp.CCMetric - outputs []Sink - done chan bool - wg *sync.WaitGroup - config []sinkConfig + input chan lp.CCMetric // input channel + outputs []Sink // List of sinks to use + done chan bool // channel to finish / stop metric sink manager + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + config []sinkConfig // json encoded config for sink manager } +// Sink manager access functions type SinkManager interface { Init(wg *sync.WaitGroup, sinkConfigFile string) error AddInput(input chan lp.CCMetric) @@ -39,6 +42,8 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { sm.done = make(chan bool) sm.wg = wg sm.config = make([]sinkConfig, 0) + + // Read sink config file if len(sinkConfigFile) > 0 { configFile, err := os.Open(sinkConfigFile) if err != nil { @@ -64,27 +69,37 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { } func (sm *sinkManager) Start() { - sm.wg.Add(1) batchcount := 20 + + sm.wg.Add(1) go func() { + defer sm.wg.Done() + + // Sink manager is done done := func() { for _, s := range sm.outputs { s.Flush() s.Close() } - sm.wg.Done() + + close(sm.done) cclog.ComponentDebug("SinkManager", "DONE") } + for { select { case <-sm.done: done() return + case p := <-sm.input: + // Send received metric to all outputs cclog.ComponentDebug("SinkManager", "WRITE", p) for _, s := range sm.outputs { s.Write(p) } + + // Flush all outputs if batchcount == 0 { cclog.ComponentDebug("SinkManager", "FLUSH") for _, s := range sm.outputs { @@ -96,9 +111,12 @@ func (sm *sinkManager) Start() { } } }() + + // Sink manager is started cclog.ComponentDebug("SinkManager", "STARTED") } +// AddInput adds the input channel to the sink manager func (sm *sinkManager) AddInput(input chan lp.CCMetric) { sm.input = input } @@ -129,11 +147,15 @@ func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error { return nil } +// Close finishes / stops the sink manager func (sm *sinkManager) Close() { cclog.ComponentDebug("SinkManager", "CLOSE") sm.done <- true + // wait for close of channel sm.done + <-sm.done } +// New creates a new initialized sink manager func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) { sm := &sinkManager{} err := sm.Init(wg, sinkConfigFile)