From a6feb16ec1ac1fa977a3057c0e7b41586fa56979 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Tue, 21 Dec 2021 14:04:31 +0100 Subject: [PATCH] Formatting --- Makefile | 1 + collectors/collectorManager.go | 183 ++++----- collectors/cpustatMetric.go | 2 +- collectors/customCmdMetric.go | 2 +- collectors/diskstatMetric.go | 2 +- collectors/infinibandMetric.go | 2 +- collectors/ipmiMetric.go | 10 +- collectors/likwidMetric.go | 30 +- collectors/loadavgMetric.go | 2 +- collectors/lustreMetric.go | 8 +- collectors/memstatMetric.go | 2 +- collectors/metricCollector.go | 10 +- collectors/netstatMetric.go | 14 +- collectors/nvidiaMetric.go | 10 +- collectors/tempMetric.go | 14 +- collectors/topprocsMetric.go | 2 +- go.sum | 1 - internal/ccMetric/ccMetric.go | 25 +- internal/metricRouter/metricRouter.go | 287 +++++++------ internal/multiChanTicker/multiChanTicker.go | 40 +- metric-collector.go | 429 ++++++++++---------- receivers/metricReceiver.go | 13 +- receivers/natsReceiver.go | 17 +- receivers/receiveManager.go | 223 +++++----- sinks/httpSink.go | 4 +- sinks/influxSink.go | 8 +- sinks/metricSink.go | 2 +- sinks/natsSink.go | 4 +- sinks/sinkManager.go | 211 +++++----- sinks/stdoutSink.go | 10 +- 30 files changed, 775 insertions(+), 793 deletions(-) diff --git a/Makefile b/Makefile index e82685e..1b78645 100644 --- a/Makefile +++ b/Makefile @@ -16,5 +16,6 @@ fmt: go fmt sinks/*.go go fmt receivers/*.go go fmt metric-collector.go + find . -name "*.go" -exec go fmt {} \; .PHONY: clean diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index cd4cc1f..00ffe68 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -1,16 +1,15 @@ package collectors import ( - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - "sync" - "time" - "log" - "os" - "encoding/json" - mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" + "encoding/json" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" + "log" + "os" + "sync" + "time" ) - var AvailableCollectors = map[string]MetricCollector{ "likwid": &LikwidCollector{}, "loadavg": &LoadavgCollector{}, @@ -27,15 +26,14 @@ var AvailableCollectors = map[string]MetricCollector{ "ipmistat": &IpmiCollector{}, } - type collectorManager struct { - collectors []MetricCollector - output chan lp.CCMetric - done chan bool - ticker mct.MultiChanTicker - duration time.Duration - wg *sync.WaitGroup - config map[string]json.RawMessage + collectors []MetricCollector + output chan lp.CCMetric + done chan bool + ticker mct.MultiChanTicker + duration time.Duration + wg *sync.WaitGroup + config map[string]json.RawMessage } type CollectorManager interface { @@ -45,96 +43,95 @@ type CollectorManager interface { Close() } - func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error { - cm.collectors = make([]MetricCollector, 0) - cm.output = nil - cm.done = make(chan bool) - cm.wg = wg - cm.ticker = ticker - cm.duration = duration - configFile, err := os.Open(collectConfigFile) - if err != nil { - log.Print(err.Error()) - return err - } - defer configFile.Close() - jsonParser := json.NewDecoder(configFile) + cm.collectors = make([]MetricCollector, 0) + cm.output = nil + cm.done = make(chan bool) + cm.wg = wg + cm.ticker = ticker + cm.duration = duration + configFile, err := os.Open(collectConfigFile) + if err != nil { + log.Print(err.Error()) + return err + } + defer configFile.Close() + jsonParser := json.NewDecoder(configFile) err = jsonParser.Decode(&cm.config) if err != nil { - log.Print(err.Error()) - return err - } - for k, cfg := range cm.config { - log.Print(k, " ", cfg) - if _, found := AvailableCollectors[k]; !found { - log.Print("[CollectorManager] SKIP unknown collector ", k) - continue - } - c := AvailableCollectors[k] - - err = c.Init(cfg) - if err != nil { - log.Print("[CollectorManager] Collector ", k, "initialization failed: ", err.Error()) - continue - } - cm.collectors = append(cm.collectors, c) - } - return nil + log.Print(err.Error()) + return err + } + for k, cfg := range cm.config { + log.Print(k, " ", cfg) + if _, found := AvailableCollectors[k]; !found { + log.Print("[CollectorManager] SKIP unknown collector ", k) + continue + } + c := AvailableCollectors[k] + + err = c.Init(cfg) + if err != nil { + log.Print("[CollectorManager] Collector ", k, "initialization failed: ", err.Error()) + continue + } + cm.collectors = append(cm.collectors, c) + } + return nil } func (cm *collectorManager) Start() { - cm.wg.Add(1) - tick := make(chan time.Time) - cm.ticker.AddChannel(tick) - go func() { - for { -CollectorManagerLoop: - select { - case <- cm.done: - for _, c := range cm.collectors { - c.Close() - } - cm.wg.Done() - log.Print("[CollectorManager] DONE\n") - break CollectorManagerLoop - case t := <- tick: - for _, c := range cm.collectors { -CollectorManagerInputLoop: - select { - case <- cm.done: - for _, c := range cm.collectors { - c.Close() - } - cm.wg.Done() - log.Print("[CollectorManager] DONE\n") - break CollectorManagerInputLoop - default: - log.Print("[CollectorManager] ", c.Name(), " ", t) - c.Read(cm.duration, cm.output) - } - } - } - } - log.Print("[CollectorManager] EXIT\n") - }() - log.Print("[CollectorManager] STARTED\n") + cm.wg.Add(1) + tick := make(chan time.Time) + cm.ticker.AddChannel(tick) + go func() { + for { + CollectorManagerLoop: + select { + case <-cm.done: + for _, c := range cm.collectors { + c.Close() + } + cm.wg.Done() + log.Print("[CollectorManager] DONE\n") + break CollectorManagerLoop + case t := <-tick: + for _, c := range cm.collectors { + CollectorManagerInputLoop: + select { + case <-cm.done: + for _, c := range cm.collectors { + c.Close() + } + cm.wg.Done() + log.Print("[CollectorManager] DONE\n") + break CollectorManagerInputLoop + default: + log.Print("[CollectorManager] ", c.Name(), " ", t) + c.Read(cm.duration, cm.output) + } + } + } + } + log.Print("[CollectorManager] EXIT\n") + }() + log.Print("[CollectorManager] STARTED\n") } func (cm *collectorManager) AddOutput(output chan lp.CCMetric) { - cm.output = output + cm.output = output } func (cm *collectorManager) Close() { - cm.done <- true - log.Print("[CollectorManager] CLOSE") + cm.done <- true + log.Print("[CollectorManager] CLOSE") } func New(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) { - cm := &collectorManager{} - err := cm.Init(ticker, duration, wg, collectConfigFile) - if err != nil { - return nil, err - } - return cm, err + cm := &collectorManager{} + err := cm.Init(ticker, duration, wg, collectConfigFile) + if err != nil { + return nil, err + } + return cm, err } diff --git a/collectors/cpustatMetric.go b/collectors/cpustatMetric.go index c9dd746..cafb9e3 100644 --- a/collectors/cpustatMetric.go +++ b/collectors/cpustatMetric.go @@ -24,7 +24,7 @@ type CpustatCollector struct { func (m *CpustatCollector) Init(config json.RawMessage) error { m.name = "CpustatCollector" m.setup() - m.meta = map[string]string{"source" : m.name, "group" : "CPU"} + m.meta = map[string]string{"source": m.name, "group": "CPU"} if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { diff --git a/collectors/customCmdMetric.go b/collectors/customCmdMetric.go index 444d534..1b05c93 100644 --- a/collectors/customCmdMetric.go +++ b/collectors/customCmdMetric.go @@ -32,7 +32,7 @@ type CustomCmdCollector struct { func (m *CustomCmdCollector) Init(config json.RawMessage) error { var err error m.name = "CustomCmdCollector" - m.meta = map[string]string{"source" : m.name, "group" : "Custom"} + m.meta = map[string]string{"source": m.name, "group": "Custom"} if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { diff --git a/collectors/diskstatMetric.go b/collectors/diskstatMetric.go index 611cdb9..d932fea 100644 --- a/collectors/diskstatMetric.go +++ b/collectors/diskstatMetric.go @@ -27,7 +27,7 @@ type DiskstatCollector struct { func (m *DiskstatCollector) Init(config json.RawMessage) error { var err error m.name = "DiskstatCollector" - m.meta = map[string]string{"source" : m.name, "group" : "Disk"} + m.meta = map[string]string{"source": m.name, "group": "Disk"} m.setup() if len(config) > 0 { err = json.Unmarshal(config, &m.config) diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index a076646..3914d4b 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -54,7 +54,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { m.name = "InfinibandCollector" m.use_perfquery = false m.setup() - m.meta = map[string]string{"source" : m.name, "group" : "Network"} + m.meta = map[string]string{"source": m.name, "group": "Network"} m.tags = map[string]string{"type": "node"} if len(config) > 0 { err = json.Unmarshal(config, &m.config) diff --git a/collectors/ipmiMetric.go b/collectors/ipmiMetric.go index 341a895..0a79acb 100644 --- a/collectors/ipmiMetric.go +++ b/collectors/ipmiMetric.go @@ -31,7 +31,7 @@ type IpmiCollector struct { func (m *IpmiCollector) Init(config json.RawMessage) error { m.name = "IpmiCollector" m.setup() - m.meta = map[string]string{"source" : m.name, "group" : "IPMI"} + m.meta = map[string]string{"source": m.name, "group": "IPMI"} if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { @@ -85,7 +85,7 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) { y, err := lp.New(name, map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": v}, time.Now()) if err == nil { - y.AddMeta("unit", unit) + y.AddMeta("unit", unit) output <- y } } @@ -112,9 +112,9 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) { name := strings.ToLower(strings.Replace(lv[1], " ", "_", -1)) y, err := lp.New(name, map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": v}, time.Now()) if err == nil { - if len(lv) > 4 { - y.AddMeta("unit", lv[4]) - } + if len(lv) > 4 { + y.AddMeta("unit", lv[4]) + } output <- y } } diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index cc3dd98..c7d3236 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -27,21 +27,21 @@ import ( type MetricScope int const ( - METRIC_SCOPE_HWTHREAD = iota - METRIC_SCOPE_SOCKET - METRIC_SCOPE_NUMA - METRIC_SCOPE_NODE + METRIC_SCOPE_HWTHREAD = iota + METRIC_SCOPE_SOCKET + METRIC_SCOPE_NUMA + METRIC_SCOPE_NODE ) func (ms MetricScope) String() string { - return []string{"Head", "Shoulder", "Knee", "Toe"}[ms] + return []string{"Head", "Shoulder", "Knee", "Toe"}[ms] } type LikwidCollectorMetricConfig struct { - Name string `json:"name"` - Calc string `json:"calc"` - Scope MetricScope `json:"socket_scope"` - Publish bool `json:"publish"` + Name string `json:"name"` + Calc string `json:"calc"` + Scope MetricScope `json:"socket_scope"` + Publish bool `json:"publish"` } type LikwidCollectorEventsetConfig struct { @@ -127,13 +127,13 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { } } m.setup() - m.meta = map[string]string{"source" : m.name, "group" : "PerfCounter"} + m.meta = map[string]string{"source": m.name, "group": "PerfCounter"} cpulist := CpuList() m.cpulist = make([]C.int, len(cpulist)) slist := getSocketCpus() m.sock2tid = make(map[int]int) -// m.numa2tid = make(map[int]int) + // m.numa2tid = make(map[int]int) for i, c := range cpulist { m.cpulist[i] = C.int(c) if sid, found := slist[m.cpulist[i]]; found { @@ -264,7 +264,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) for sid, tid := range m.sock2tid { y, err := lp.New(metric.Name, map[string]string{"type": "socket", - "type-id": fmt.Sprintf("%d", int(sid))}, + "type-id": fmt.Sprintf("%d", int(sid))}, m.meta, map[string]interface{}{"value": m.mresults[i][tid][metric.Name]}, time.Now()) @@ -276,7 +276,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) for tid, cpu := range m.cpulist { y, err := lp.New(metric.Name, map[string]string{"type": "cpu", - "type-id": fmt.Sprintf("%d", int(cpu))}, + "type-id": fmt.Sprintf("%d", int(cpu))}, m.meta, map[string]interface{}{"value": m.mresults[i][tid][metric.Name]}, time.Now()) @@ -295,7 +295,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) for sid, tid := range m.sock2tid { y, err := lp.New(metric.Name, map[string]string{"type": "socket", - "type-id": fmt.Sprintf("%d", int(sid))}, + "type-id": fmt.Sprintf("%d", int(sid))}, m.meta, map[string]interface{}{"value": m.gmresults[tid][metric.Name]}, time.Now()) @@ -307,7 +307,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) for tid, cpu := range m.cpulist { y, err := lp.New(metric.Name, map[string]string{"type": "cpu", - "type-id": fmt.Sprintf("%d", int(cpu))}, + "type-id": fmt.Sprintf("%d", int(cpu))}, m.meta, map[string]interface{}{"value": m.gmresults[tid][metric.Name]}, time.Now()) diff --git a/collectors/loadavgMetric.go b/collectors/loadavgMetric.go index 418b15c..bb820b0 100644 --- a/collectors/loadavgMetric.go +++ b/collectors/loadavgMetric.go @@ -32,7 +32,7 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error { return err } } - m.meta = map[string]string{"source" : m.name, "group" : "LOAD"} + m.meta = map[string]string{"source": m.name, "group": "LOAD"} m.tags = map[string]string{"type": "node"} m.load_matches = []string{"load_one", "load_five", "load_fifteen"} m.proc_matches = []string{"proc_run", "proc_total"} diff --git a/collectors/lustreMetric.go b/collectors/lustreMetric.go index bde77c2..ccf5124 100644 --- a/collectors/lustreMetric.go +++ b/collectors/lustreMetric.go @@ -37,7 +37,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error { } m.setup() m.tags = map[string]string{"type": "node"} - m.meta = map[string]string{"source" : m.name, "group" : "Lustre"} + m.meta = map[string]string{"source": m.name, "group": "Lustre"} m.matches = map[string]map[string]int{"read_bytes": {"read_bytes": 6, "read_requests": 1}, "write_bytes": {"write_bytes": 6, "write_requests": 1}, "open": {"open": 1}, @@ -90,9 +90,9 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, time.Now()) if err == nil { - if strings.Contains(name, "byte") { - y.AddMeta("unit", "Byte") - } + if strings.Contains(name, "byte") { + y.AddMeta("unit", "Byte") + } output <- y } } diff --git a/collectors/memstatMetric.go b/collectors/memstatMetric.go index 1d07cd8..f62a67c 100644 --- a/collectors/memstatMetric.go +++ b/collectors/memstatMetric.go @@ -35,7 +35,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error { return err } } - m.meta = map[string]string{"source" : m.name, "group" : "Memory", "unit": "kByte"} + m.meta = map[string]string{"source": m.name, "group": "Memory", "unit": "kByte"} m.stats = make(map[string]int64) m.matches = make(map[string]string) m.tags = map[string]string{"type": "node"} diff --git a/collectors/metricCollector.go b/collectors/metricCollector.go index ebe5f0f..6bc9047 100644 --- a/collectors/metricCollector.go +++ b/collectors/metricCollector.go @@ -1,6 +1,7 @@ package collectors import ( + "encoding/json" "errors" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" influx "github.com/influxdata/line-protocol" @@ -9,7 +10,6 @@ import ( "strconv" "strings" "time" - "encoding/json" ) type MetricCollector interface { @@ -21,10 +21,10 @@ type MetricCollector interface { } type metricCollector struct { - output chan lp.CCMetric - name string - init bool - meta map[string]string + output chan lp.CCMetric + name string + init bool + meta map[string]string } func (c *metricCollector) Name() string { diff --git a/collectors/netstatMetric.go b/collectors/netstatMetric.go index d28f828..d8e063e 100644 --- a/collectors/netstatMetric.go +++ b/collectors/netstatMetric.go @@ -25,7 +25,7 @@ type NetstatCollector struct { func (m *NetstatCollector) Init(config json.RawMessage) error { m.name = "NetstatCollector" m.setup() - m.meta = map[string]string{"source" : m.name, "group" : "Memory"} + m.meta = map[string]string{"source": m.name, "group": "Memory"} m.matches = map[int]string{ 1: "bytes_in", 9: "bytes_out", @@ -75,12 +75,12 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": int(float64(v) * 1.0e-3)}, time.Now()) if err == nil { - switch { - case strings.Contains(name, "byte"): - y.AddMeta("unit", "Byte") - case strings.Contains(name, "pkt"): - y.AddMeta("unit", "Packets") - } + switch { + case strings.Contains(name, "byte"): + y.AddMeta("unit", "Byte") + case strings.Contains(name, "pkt"): + y.AddMeta("unit", "Packets") + } output <- y } } diff --git a/collectors/nvidiaMetric.go b/collectors/nvidiaMetric.go index aad3129..1531f14 100644 --- a/collectors/nvidiaMetric.go +++ b/collectors/nvidiaMetric.go @@ -4,8 +4,8 @@ import ( "encoding/json" "errors" "fmt" - "github.com/NVIDIA/go-nvml/pkg/nvml" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + "github.com/NVIDIA/go-nvml/pkg/nvml" "log" "time" ) @@ -32,7 +32,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error { var err error m.name = "NvidiaCollector" m.setup() - m.meta = map[string]string{"source" : m.name, "group" : "Nvidia"} + m.meta = map[string]string{"source": m.name, "group": "Nvidia"} if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { @@ -91,14 +91,14 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "mem_total") y, err := lp.New("mem_total", tags, m.meta, map[string]interface{}{"value": t}, time.Now()) if err == nil && !skip { - y.AddMeta("unit", "MByte") + y.AddMeta("unit", "MByte") output <- y } f := float64(meminfo.Used) / (1024 * 1024) _, skip = stringArrayContains(m.config.ExcludeMetrics, "fb_memory") y, err = lp.New("fb_memory", tags, m.meta, map[string]interface{}{"value": f}, time.Now()) if err == nil && !skip { - y.AddMeta("unit", "MByte") + y.AddMeta("unit", "MByte") output <- y } } @@ -108,7 +108,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "temp") y, err := lp.New("temp", tags, m.meta, map[string]interface{}{"value": float64(temp)}, time.Now()) if err == nil && !skip { - y.AddMeta("unit", "degC") + y.AddMeta("unit", "degC") output <- y } } diff --git a/collectors/tempMetric.go b/collectors/tempMetric.go index 4aba391..40aaad8 100644 --- a/collectors/tempMetric.go +++ b/collectors/tempMetric.go @@ -5,12 +5,12 @@ import ( "fmt" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "io/ioutil" + "log" "os" "path/filepath" "strconv" "strings" "time" - "log" ) const HWMON_PATH = `/sys/class/hwmon` @@ -28,7 +28,7 @@ type TempCollector struct { func (m *TempCollector) Init(config json.RawMessage) error { m.name = "TempCollector" m.setup() - m.meta = map[string]string{"source" : m.name, "group" : "IPMI", "unit": "degC"} + m.meta = map[string]string{"source": m.name, "group": "IPMI", "unit": "degC"} if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { @@ -90,10 +90,10 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) { break } } - mname := strings.Replace(name, " ", "_", -1) - if !strings.Contains(mname, "temp") { - mname = fmt.Sprintf("temp_%s", mname) - } + mname := strings.Replace(name, " ", "_", -1) + if !strings.Contains(mname, "temp") { + mname = fmt.Sprintf("temp_%s", mname) + } buffer, err := ioutil.ReadFile(string(file)) if err != nil { continue @@ -102,7 +102,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) { if err == nil { y, err := lp.New(strings.ToLower(mname), tags, m.meta, map[string]interface{}{"value": int(float64(x) / 1000)}, time.Now()) if err == nil { - log.Print("[", m.name, "] ",y) + log.Print("[", m.name, "] ", y) output <- y } } diff --git a/collectors/topprocsMetric.go b/collectors/topprocsMetric.go index df25b6f..56b4576 100644 --- a/collectors/topprocsMetric.go +++ b/collectors/topprocsMetric.go @@ -28,7 +28,7 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error { var err error m.name = "TopProcsCollector" m.tags = map[string]string{"type": "node"} - m.meta = map[string]string{"source" : m.name, "group" : "TopProcs"} + m.meta = map[string]string{"source": m.name, "group": "TopProcs"} if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { diff --git a/go.sum b/go.sum index a6f98d7..68d72c1 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/NVIDIA/go-nvml v0.11.1-0 h1:XHSz3zZKC4NCP2ja1rI7++DXFhA+uDhdYa3MykCTGHY= github.com/NVIDIA/go-nvml v0.11.1-0/go.mod h1:hy7HYeQy335x6nEss0Ne3PYqleRa6Ct+VKD9RQ4nyFs= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= diff --git a/internal/ccMetric/ccMetric.go b/internal/ccMetric/ccMetric.go index c5864de..6b6bda9 100644 --- a/internal/ccMetric/ccMetric.go +++ b/internal/ccMetric/ccMetric.go @@ -1,10 +1,10 @@ package ccmetric import ( - lp "github.com/influxdata/line-protocol" // MIT license - "time" - "sort" - "fmt" + "fmt" + lp "github.com/influxdata/line-protocol" // MIT license + "sort" + "time" ) // Most functions are derived from github.com/influxdata/line-protocol/metric.go @@ -12,18 +12,18 @@ import ( // type. type ccMetric struct { - name string + name string tags []*lp.Tag fields []*lp.Field tm time.Time - meta []*lp.Tag + meta []*lp.Tag } type CCMetric interface { - lp.MutableMetric - AddMeta(key, value string) - MetaList() []*lp.Tag - RemoveTag(key string) + lp.MutableMetric + AddMeta(key, value string) + MetaList() []*lp.Tag + RemoveTag(key string) } func (m *ccMetric) Meta() map[string]string { @@ -187,9 +187,6 @@ func (m *ccMetric) AddField(key string, value interface{}) { m.fields = append(m.fields, &lp.Field{Key: key, Value: convertField(value)}) } - - - func New( name string, tags map[string]string, @@ -202,7 +199,7 @@ func New( tags: nil, fields: nil, tm: tm, - meta: nil, + meta: nil, } if len(tags) > 0 { diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 9e2d4f9..881c99e 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -1,36 +1,36 @@ package metricRouter import ( - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - "sync" - "log" - "encoding/json" - "os" - "time" - "gopkg.in/Knetic/govaluate.v2" - mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" + "encoding/json" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" + "gopkg.in/Knetic/govaluate.v2" + "log" + "os" + "sync" + "time" ) type metricRounterTagConfig struct { - Key string `json:"key"` - Value string `json:"value"` - Condition string `json:"if"` + Key string `json:"key"` + Value string `json:"value"` + Condition string `json:"if"` } type metricRouterConfig struct { - AddTags []metricRounterTagConfig `json:"add_tags"` - DelTags []metricRounterTagConfig `json:"delete_tags"` - IntervalStamp bool `json:"interval_timestamp"` + AddTags []metricRounterTagConfig `json:"add_tags"` + DelTags []metricRounterTagConfig `json:"delete_tags"` + IntervalStamp bool `json:"interval_timestamp"` } type metricRouter struct { - inputs []chan lp.CCMetric - outputs []chan lp.CCMetric - done chan bool - wg *sync.WaitGroup - timestamp time.Time - ticker mct.MultiChanTicker - config metricRouterConfig + inputs []chan lp.CCMetric + outputs []chan lp.CCMetric + done chan bool + wg *sync.WaitGroup + timestamp time.Time + ticker mct.MultiChanTicker + config metricRouterConfig } type MetricRouter interface { @@ -41,62 +41,61 @@ type MetricRouter interface { Close() } - func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error { - r.inputs = make([]chan lp.CCMetric, 0) - r.outputs = make([]chan lp.CCMetric, 0) - r.done = make(chan bool) - r.wg = wg - r.ticker = ticker - configFile, err := os.Open(routerConfigFile) - if err != nil { - log.Print(err.Error()) - return err - } - defer configFile.Close() - jsonParser := json.NewDecoder(configFile) + r.inputs = make([]chan lp.CCMetric, 0) + r.outputs = make([]chan lp.CCMetric, 0) + r.done = make(chan bool) + r.wg = wg + r.ticker = ticker + configFile, err := os.Open(routerConfigFile) + if err != nil { + log.Print(err.Error()) + return err + } + defer configFile.Close() + jsonParser := json.NewDecoder(configFile) err = jsonParser.Decode(&r.config) if err != nil { - log.Print(err.Error()) - return err - } - return nil + log.Print(err.Error()) + return err + } + return nil } func (r *metricRouter) StartTimer() { - m := make(chan time.Time) - r.ticker.AddChannel(m) - go func() { - for { - select { - case t := <- m: - r.timestamp = t - } - } - }() + m := make(chan time.Time) + r.ticker.AddChannel(m) + go func() { + for { + select { + case t := <-m: + r.timestamp = t + } + } + }() } -func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error){ - expression, err := govaluate.NewEvaluableExpression(Cond) - if err != nil { +func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error) { + expression, err := govaluate.NewEvaluableExpression(Cond) + if err != nil { log.Print(Cond, " = ", err.Error()) return false, err } params := make(map[string]interface{}) params["name"] = point.Name() - for _,t := range point.TagList() { - params[t.Key] = t.Value + for _, t := range point.TagList() { + params[t.Key] = t.Value } - for _,m := range point.MetaList() { - params[m.Key] = m.Value + for _, m := range point.MetaList() { + params[m.Key] = m.Value } - for _,f := range point.FieldList() { - params[f.Key] = f.Value + for _, f := range point.FieldList() { + params[f.Key] = f.Value } params["timestamp"] = point.Time() - - result, err := expression.Evaluate(params) - if err != nil { + + result, err := expression.Evaluate(params) + if err != nil { log.Print(Cond, " = ", err.Error()) return false, err } @@ -104,106 +103,106 @@ func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, erro } func (r *metricRouter) DoAddTags(point lp.CCMetric) { - for _, m := range r.config.AddTags { - var res bool - var err error - - if m.Condition == "*" { - res = true - err = nil - } else { - res, err = r.EvalCondition(m.Condition, point) - if err != nil { - log.Print(err.Error()) - res = false - } - } - if res == true { - point.AddTag(m.Key, m.Value) - } - } + for _, m := range r.config.AddTags { + var res bool + var err error + + if m.Condition == "*" { + res = true + err = nil + } else { + res, err = r.EvalCondition(m.Condition, point) + if err != nil { + log.Print(err.Error()) + res = false + } + } + if res == true { + point.AddTag(m.Key, m.Value) + } + } } func (r *metricRouter) DoDelTags(point lp.CCMetric) { - for _, m := range r.config.DelTags { - var res bool - var err error - if m.Condition == "*" { - res = true - err = nil - } else { - res, err = r.EvalCondition(m.Condition, point) - if err != nil { - log.Print(err.Error()) - res = false - } - } - if res == true { - point.RemoveTag(m.Key) - } - } + for _, m := range r.config.DelTags { + var res bool + var err error + if m.Condition == "*" { + res = true + err = nil + } else { + res, err = r.EvalCondition(m.Condition, point) + if err != nil { + log.Print(err.Error()) + res = false + } + } + if res == true { + point.RemoveTag(m.Key) + } + } } func (r *metricRouter) Start() { - r.wg.Add(1) - r.timestamp = time.Now() - if r.config.IntervalStamp == true { - r.StartTimer() - } - go func() { - for { -RouterLoop: - select { - case <- r.done: - log.Print("[MetricRouter] DONE\n") - r.wg.Done() - break RouterLoop - default: - for _, c := range r.inputs { -RouterInputLoop: - select { - case <- r.done: - log.Print("[MetricRouter] DONE\n") - r.wg.Done() - break RouterInputLoop - case p := <- c: - log.Print("[MetricRouter] FORWARD ",p) - r.DoAddTags(p) - r.DoDelTags(p) - if r.config.IntervalStamp == true { - p.SetTime(r.timestamp) - } - for _, o := range r.outputs { - o <- p - } - default: - } - } - } - } - log.Print("[MetricRouter] EXIT\n") - }() - log.Print("[MetricRouter] STARTED\n") + r.wg.Add(1) + r.timestamp = time.Now() + if r.config.IntervalStamp == true { + r.StartTimer() + } + go func() { + for { + RouterLoop: + select { + case <-r.done: + log.Print("[MetricRouter] DONE\n") + r.wg.Done() + break RouterLoop + default: + for _, c := range r.inputs { + RouterInputLoop: + select { + case <-r.done: + log.Print("[MetricRouter] DONE\n") + r.wg.Done() + break RouterInputLoop + case p := <-c: + log.Print("[MetricRouter] FORWARD ", p) + r.DoAddTags(p) + r.DoDelTags(p) + if r.config.IntervalStamp == true { + p.SetTime(r.timestamp) + } + for _, o := range r.outputs { + o <- p + } + default: + } + } + } + } + log.Print("[MetricRouter] EXIT\n") + }() + log.Print("[MetricRouter] STARTED\n") } func (r *metricRouter) AddInput(input chan lp.CCMetric) { - r.inputs = append(r.inputs, input) + r.inputs = append(r.inputs, input) } func (r *metricRouter) AddOutput(output chan lp.CCMetric) { - r.outputs = append(r.outputs, output) + r.outputs = append(r.outputs, output) } func (r *metricRouter) Close() { - r.done <- true - log.Print("[MetricRouter] CLOSE\n") + r.done <- true + log.Print("[MetricRouter] CLOSE\n") } func New(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) (MetricRouter, error) { - r := &metricRouter{} - err := r.Init(ticker, wg, routerConfigFile) - if err != nil { - return nil, err - } - return r, err + r := &metricRouter{} + err := r.Init(ticker, wg, routerConfigFile) + if err != nil { + return nil, err + } + return r, err } diff --git a/internal/multiChanTicker/multiChanTicker.go b/internal/multiChanTicker/multiChanTicker.go index f063af4..f8139fa 100644 --- a/internal/multiChanTicker/multiChanTicker.go +++ b/internal/multiChanTicker/multiChanTicker.go @@ -1,39 +1,39 @@ package multiChanTicker import ( - "time" + "time" ) type multiChanTicker struct { - ticker *time.Ticker - channels []chan time.Time + ticker *time.Ticker + channels []chan time.Time } type MultiChanTicker interface { - Init(duration time.Duration) - AddChannel(chan time.Time) + Init(duration time.Duration) + AddChannel(chan time.Time) } func (t *multiChanTicker) Init(duration time.Duration) { - t.ticker = time.NewTicker(duration) - go func() { - for { - select { - case ts := <-t.ticker.C: - for _, c := range t.channels { - c <- ts - } - } - } - }() + t.ticker = time.NewTicker(duration) + go func() { + for { + select { + case ts := <-t.ticker.C: + for _, c := range t.channels { + c <- ts + } + } + } + }() } func (t *multiChanTicker) AddChannel(channel chan time.Time) { - t.channels = append(t.channels, channel) + t.channels = append(t.channels, channel) } func NewTicker(duration time.Duration) MultiChanTicker { - t := &multiChanTicker{} - t.Init(duration) - return t + t := &multiChanTicker{} + t.Init(duration) + return t } diff --git a/metric-collector.go b/metric-collector.go index e9b5a00..5e87808 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -10,12 +10,12 @@ import ( "log" "os" "os/signal" -// "strings" + // "strings" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" + mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" "sync" "time" - mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" ) // List of provided collectors. Which collector should be run can be @@ -48,17 +48,17 @@ import ( //} type CentralConfigFile struct { - Interval int `json:"interval"` - Duration int `json:"duration"` - Pidfile string `json:"pidfile", omitempty` - CollectorConfigFile string `json:"collectors"` - RouterConfigFile string `json:"router"` - SinkConfigFile string `json:"sinks"` - ReceiverConfigFile string `json:"receivers", omitempty` + Interval int `json:"interval"` + Duration int `json:"duration"` + Pidfile string `json:"pidfile", omitempty` + CollectorConfigFile string `json:"collectors"` + RouterConfigFile string `json:"router"` + SinkConfigFile string `json:"sinks"` + ReceiverConfigFile string `json:"receivers", omitempty` } func LoadCentralConfiguration(file string, config *CentralConfigFile) error { - configFile, err := os.Open(file) + configFile, err := os.Open(file) defer configFile.Close() if err != nil { fmt.Println(err.Error()) @@ -70,29 +70,29 @@ func LoadCentralConfiguration(file string, config *CentralConfigFile) error { } type RuntimeConfig struct { - Hostname string - Interval time.Duration - Duration time.Duration - CliArgs map[string]string - ConfigFile CentralConfigFile - - Router mr.MetricRouter - CollectManager collectors.CollectorManager - SinkManager sinks.SinkManager - ReceiveManager receivers.ReceiveManager - Ticker mct.MultiChanTicker + Hostname string + Interval time.Duration + Duration time.Duration + CliArgs map[string]string + ConfigFile CentralConfigFile - Channels []chan lp.CCMetric - Sync sync.WaitGroup + Router mr.MetricRouter + CollectManager collectors.CollectorManager + SinkManager sinks.SinkManager + ReceiveManager receivers.ReceiveManager + Ticker mct.MultiChanTicker + + Channels []chan lp.CCMetric + Sync sync.WaitGroup } func prepare_runcfg() RuntimeConfig { - r := RuntimeConfig{} - r.Router = nil - r.CollectManager = nil - r.SinkManager = nil - r.ReceiveManager = nil - return r + r := RuntimeConfig{} + r.Router = nil + r.CollectManager = nil + r.SinkManager = nil + r.ReceiveManager = nil + return r } //// Structure of the configuration file @@ -177,26 +177,26 @@ func ReadCli() map[string]string { func shutdown(config *RuntimeConfig) { log.Print("Shutdown...") if config.CollectManager != nil { - log.Print("Shutdown CollectManager...") - config.CollectManager.Close() - } - if config.ReceiveManager != nil { - log.Print("Shutdown ReceiveManager...") - config.ReceiveManager.Close() - } - if config.Router != nil { - log.Print("Shutdown Router...") - config.Router.Close() - } + log.Print("Shutdown CollectManager...") + config.CollectManager.Close() + } + if config.ReceiveManager != nil { + log.Print("Shutdown ReceiveManager...") + config.ReceiveManager.Close() + } + if config.Router != nil { + log.Print("Shutdown Router...") + config.Router.Close() + } if config.SinkManager != nil { - log.Print("Shutdown SinkManager...") - config.SinkManager.Close() - } - -// pidfile := config.ConfigFile.Pidfile -// RemovePidfile(pidfile) -// pidfile = config.CliArgs["pidfile"] -// RemovePidfile(pidfile) + log.Print("Shutdown SinkManager...") + config.SinkManager.Close() + } + + // pidfile := config.ConfigFile.Pidfile + // RemovePidfile(pidfile) + // pidfile = config.CliArgs["pidfile"] + // RemovePidfile(pidfile) config.Sync.Done() } @@ -214,7 +214,7 @@ func prepare_shutdown(config *RuntimeConfig) { } func main() { - var err error + var err error use_recv := false rcfg := prepare_runcfg() @@ -231,194 +231,193 @@ func main() { log.Print("Configuration value 'interval' must be greater than zero") return } - rcfg.Interval = time.Duration(rcfg.ConfigFile.Interval)*time.Second + rcfg.Interval = time.Duration(rcfg.ConfigFile.Interval) * time.Second if rcfg.ConfigFile.Duration <= 0 || time.Duration(rcfg.ConfigFile.Duration)*time.Second <= 0 { log.Print("Configuration value 'duration' must be greater than zero") return } - rcfg.Duration = time.Duration(rcfg.ConfigFile.Duration)*time.Second - - + rcfg.Duration = time.Duration(rcfg.ConfigFile.Duration) * time.Second + rcfg.Hostname, err = os.Hostname() if err != nil { log.Print(err.Error()) return } -// err = CreatePidfile(rcfg.CliArgs["pidfile"]) -// err = SetLogging(rcfg.CliArgs["logfile"]) -// if err != nil { -// log.Print("Error setting up logging system to ", rcfg.CliArgs["logfile"], " on ", rcfg.Hostname) -// return -// } + // err = CreatePidfile(rcfg.CliArgs["pidfile"]) + // err = SetLogging(rcfg.CliArgs["logfile"]) + // if err != nil { + // log.Print("Error setting up logging system to ", rcfg.CliArgs["logfile"], " on ", rcfg.Hostname) + // return + // } rcfg.Ticker = mct.NewTicker(rcfg.Interval) - if len(rcfg.ConfigFile.RouterConfigFile) > 0 { - rcfg.Router, err = mr.New(rcfg.Ticker, &rcfg.Sync, rcfg.ConfigFile.RouterConfigFile) - if err != nil { - log.Print(err.Error()) - return - } - } - if len(rcfg.ConfigFile.SinkConfigFile) > 0 { - rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile) - if err != nil { - log.Print(err.Error()) - return - } - RouterToSinksChannel := make(chan lp.CCMetric) - rcfg.SinkManager.AddInput(RouterToSinksChannel) - rcfg.Router.AddOutput(RouterToSinksChannel) - } - if len(rcfg.ConfigFile.CollectorConfigFile) > 0 { - rcfg.CollectManager, err = collectors.New(rcfg.Ticker, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile) - if err != nil { - log.Print(err.Error()) - return - } - CollectToRouterChannel := make(chan lp.CCMetric) - rcfg.CollectManager.AddOutput(CollectToRouterChannel) - rcfg.Router.AddInput(CollectToRouterChannel) - } - if len(rcfg.ConfigFile.ReceiverConfigFile) > 0 { - rcfg.ReceiveManager, err = receivers.New(&rcfg.Sync, rcfg.ConfigFile.ReceiverConfigFile) - if err != nil { - log.Print(err.Error()) - return - } - ReceiveToRouterChannel := make(chan lp.CCMetric) - rcfg.ReceiveManager.AddOutput(ReceiveToRouterChannel) - rcfg.Router.AddInput(ReceiveToRouterChannel) - use_recv = true - } - prepare_shutdown(&rcfg) + if len(rcfg.ConfigFile.RouterConfigFile) > 0 { + rcfg.Router, err = mr.New(rcfg.Ticker, &rcfg.Sync, rcfg.ConfigFile.RouterConfigFile) + if err != nil { + log.Print(err.Error()) + return + } + } + if len(rcfg.ConfigFile.SinkConfigFile) > 0 { + rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile) + if err != nil { + log.Print(err.Error()) + return + } + RouterToSinksChannel := make(chan lp.CCMetric) + rcfg.SinkManager.AddInput(RouterToSinksChannel) + rcfg.Router.AddOutput(RouterToSinksChannel) + } + if len(rcfg.ConfigFile.CollectorConfigFile) > 0 { + rcfg.CollectManager, err = collectors.New(rcfg.Ticker, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile) + if err != nil { + log.Print(err.Error()) + return + } + CollectToRouterChannel := make(chan lp.CCMetric) + rcfg.CollectManager.AddOutput(CollectToRouterChannel) + rcfg.Router.AddInput(CollectToRouterChannel) + } + if len(rcfg.ConfigFile.ReceiverConfigFile) > 0 { + rcfg.ReceiveManager, err = receivers.New(&rcfg.Sync, rcfg.ConfigFile.ReceiverConfigFile) + if err != nil { + log.Print(err.Error()) + return + } + ReceiveToRouterChannel := make(chan lp.CCMetric) + rcfg.ReceiveManager.AddOutput(ReceiveToRouterChannel) + rcfg.Router.AddInput(ReceiveToRouterChannel) + use_recv = true + } + prepare_shutdown(&rcfg) rcfg.Sync.Add(1) rcfg.Router.Start() rcfg.SinkManager.Start() rcfg.CollectManager.Start() - + if use_recv { - rcfg.ReceiveManager.Start() - } -// if len(config.Collectors) == 0 { -// var keys []string -// for k := range Collectors { -// keys = append(keys, k) -// } -// log.Print("Configuration value 'collectors' does not contain any collector. Available: ", strings.Join(keys, ", ")) -// return -// } -// for _, name := range config.Collectors { -// if _, found := Collectors[name]; !found { -// log.Print("Invalid collector '", name, "' in configuration") -// return -// } -// } -// if _, found := Sinks[config.Sink.Type]; !found { -// log.Print("Invalid sink type '", config.Sink.Type, "' in configuration") -// return -// } -// // Setup sink -// sink := Sinks[config.Sink.Type] -// err = sink.Init(config.Sink) -// if err != nil { -// log.Print(err) -// return -// } -// sinkChannel := make(chan bool) -// mproxy.Init(sinkChannel, &wg) -// // Setup receiver -// if len(config.Receiver.Type) > 0 && config.Receiver.Type != "none" { -// if _, found := Receivers[config.Receiver.Type]; !found { -// log.Print("Invalid receiver type '", config.Receiver.Type, "' in configuration") -// return -// } else { -// recv = Receivers[config.Receiver.Type] -// err = recv.Init(config.Receiver, sink) -// if err == nil { -// use_recv = true -// } else { -// log.Print(err) -// } -// } -// } + rcfg.ReceiveManager.Start() + } + // if len(config.Collectors) == 0 { + // var keys []string + // for k := range Collectors { + // keys = append(keys, k) + // } + // log.Print("Configuration value 'collectors' does not contain any collector. Available: ", strings.Join(keys, ", ")) + // return + // } + // for _, name := range config.Collectors { + // if _, found := Collectors[name]; !found { + // log.Print("Invalid collector '", name, "' in configuration") + // return + // } + // } + // if _, found := Sinks[config.Sink.Type]; !found { + // log.Print("Invalid sink type '", config.Sink.Type, "' in configuration") + // return + // } + // // Setup sink + // sink := Sinks[config.Sink.Type] + // err = sink.Init(config.Sink) + // if err != nil { + // log.Print(err) + // return + // } + // sinkChannel := make(chan bool) + // mproxy.Init(sinkChannel, &wg) + // // Setup receiver + // if len(config.Receiver.Type) > 0 && config.Receiver.Type != "none" { + // if _, found := Receivers[config.Receiver.Type]; !found { + // log.Print("Invalid receiver type '", config.Receiver.Type, "' in configuration") + // return + // } else { + // recv = Receivers[config.Receiver.Type] + // err = recv.Init(config.Receiver, sink) + // if err == nil { + // use_recv = true + // } else { + // log.Print(err) + // } + // } + // } -// // Register interrupt handler -// prepare_shutdown(&wg, &config, sink, recv, clicfg["pidfile"]) + // // Register interrupt handler + // prepare_shutdown(&wg, &config, sink, recv, clicfg["pidfile"]) -// // Initialize all collectors -// tmp := make([]string, 0) -// for _, c := range config.Collectors { -// col := Collectors[c] -// conf, found := config.CollectConfigs[c] -// if !found { -// conf = json.RawMessage("") -// } -// err = col.Init([]byte(conf)) -// if err != nil { -// log.Print("SKIP ", col.Name(), " (", err.Error(), ")") -// } else if !col.Initialized() { -// log.Print("SKIP ", col.Name(), " (Not initialized)") -// } else { -// log.Print("Start ", col.Name()) -// tmp = append(tmp, c) -// } -// } -// config.Collectors = tmp -// config.DefTags["hostname"] = host + // // Initialize all collectors + // tmp := make([]string, 0) + // for _, c := range config.Collectors { + // col := Collectors[c] + // conf, found := config.CollectConfigs[c] + // if !found { + // conf = json.RawMessage("") + // } + // err = col.Init([]byte(conf)) + // if err != nil { + // log.Print("SKIP ", col.Name(), " (", err.Error(), ")") + // } else if !col.Initialized() { + // log.Print("SKIP ", col.Name(), " (Not initialized)") + // } else { + // log.Print("Start ", col.Name()) + // tmp = append(tmp, c) + // } + // } + // config.Collectors = tmp + // config.DefTags["hostname"] = host -// // Setup up ticker loop -// if clicfg["once"] != "true" { -// log.Print("Running loop every ", time.Duration(config.Interval)*time.Second) -// } else { -// log.Print("Running loop only once") -// } -// ticker := time.NewTicker(time.Duration(config.Interval) * time.Second) -// done := make(chan bool) + // // Setup up ticker loop + // if clicfg["once"] != "true" { + // log.Print("Running loop every ", time.Duration(config.Interval)*time.Second) + // } else { + // log.Print("Running loop only once") + // } + // ticker := time.NewTicker(time.Duration(config.Interval) * time.Second) + // done := make(chan bool) -// // Storage for all node metrics -// tmpPoints := make([]lp.MutableMetric, 0) + // // Storage for all node metrics + // tmpPoints := make([]lp.MutableMetric, 0) -// // Start receiver -// if use_recv { -// recv.Start() -// } + // // Start receiver + // if use_recv { + // recv.Start() + // } -// go func() { -// for { -// select { -// case <-done: -// return -// case t := <-ticker.C: + // go func() { + // for { + // select { + // case <-done: + // return + // case t := <-ticker.C: -// // Read all collectors are sort the results in the right -// // storage locations -// for _, c := range config.Collectors { -// col := Collectors[c] -// col.Read(time.Duration(config.Duration), &tmpPoints) + // // Read all collectors are sort the results in the right + // // storage locations + // for _, c := range config.Collectors { + // col := Collectors[c] + // col.Read(time.Duration(config.Duration), &tmpPoints) -// for { -// if len(tmpPoints) == 0 { -// break -// } -// p := tmpPoints[0] -// for k, v := range config.DefTags { -// p.AddTag(k, v) -// p.SetTime(t) -// } -// sink.Write(p) -// tmpPoints = tmpPoints[1:] -// } -// } + // for { + // if len(tmpPoints) == 0 { + // break + // } + // p := tmpPoints[0] + // for k, v := range config.DefTags { + // p.AddTag(k, v) + // p.SetTime(t) + // } + // sink.Write(p) + // tmpPoints = tmpPoints[1:] + // } + // } -// if err := sink.Flush(); err != nil { -// log.Printf("sink error: %s\n", err) -// } -// if clicfg["once"] == "true" { -// shutdown(&wg, config.Collectors, sink, recv, clicfg["pidfile"]) -// return -// } -// } -// } -// }() + // if err := sink.Flush(); err != nil { + // log.Printf("sink error: %s\n", err) + // } + // if clicfg["once"] == "true" { + // shutdown(&wg, config.Collectors, sink, recv, clicfg["pidfile"]) + // return + // } + // } + // } + // }() // Wait until receiving an interrupt rcfg.Sync.Wait() diff --git a/receivers/metricReceiver.go b/receivers/metricReceiver.go index b074f97..05b8b65 100644 --- a/receivers/metricReceiver.go +++ b/receivers/metricReceiver.go @@ -2,17 +2,16 @@ package receivers import ( // "time" - influx "github.com/influxdata/line-protocol" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - + influx "github.com/influxdata/line-protocol" ) type ReceiverConfig struct { - Addr string `json:"address"` - Port string `json:"port"` - Database string `json:"database"` + Addr string `json:"address"` + Port string `json:"port"` + Database string `json:"database"` Organization string `json:"organization", omitempty` - Type string `json:"type"` + Type string `json:"type"` } type receiver struct { @@ -37,7 +36,7 @@ func (r *receiver) Name() string { } func (r *receiver) SetSink(sink chan lp.CCMetric) { - r.sink = sink + r.sink = sink } func Tags2Map(metric influx.Metric) map[string]string { diff --git a/receivers/natsReceiver.go b/receivers/natsReceiver.go index e3f037c..5cbe90d 100644 --- a/receivers/natsReceiver.go +++ b/receivers/natsReceiver.go @@ -2,16 +2,16 @@ package receivers import ( "errors" - influx "github.com/influxdata/line-protocol" + "fmt" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + influx "github.com/influxdata/line-protocol" nats "github.com/nats-io/nats.go" "log" "time" - "fmt" ) type NatsReceiverConfig struct { - Addr string `json:"address"` + Addr string `json:"address"` Port string `json:"port"` Database string `json:"database"` } @@ -37,7 +37,7 @@ func (r *NatsReceiver) Init(config ReceiverConfig) error { len(r.config.Database) == 0 { return errors.New("Not all configuration variables set required by NatsReceiver") } - r.meta = map[string]string{"source" : r.name} + r.meta = map[string]string{"source": r.name} r.addr = r.config.Addr if len(r.addr) == 0 { r.addr = nats.DefaultURL @@ -71,10 +71,10 @@ func (r *NatsReceiver) _NatsReceive(m *nats.Msg) { metrics, err := r.parser.Parse(m.Data) if err == nil { for _, m := range metrics { - y := lp.FromInfluxMetric(m) - for k, v := range r.meta { - y.AddMeta(k, v) - } + y := lp.FromInfluxMetric(m) + for k, v := range r.meta { + y.AddMeta(k, v) + } //y, err := lp.New(m.Name(), Tags2Map(m), r.meta, Fields2Map(m), m.Time()) if r.sink != nil { r.sink <- y @@ -89,4 +89,3 @@ func (r *NatsReceiver) Close() { r.nc.Close() } } - diff --git a/receivers/receiveManager.go b/receivers/receiveManager.go index 322862d..62f70b3 100644 --- a/receivers/receiveManager.go +++ b/receivers/receiveManager.go @@ -1,24 +1,23 @@ package receivers import ( - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - "sync" - "log" - "os" - "encoding/json" + "encoding/json" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + "log" + "os" + "sync" ) var AvailableReceivers = map[string]Receiver{ "nats": &NatsReceiver{}, } - type receiveManager struct { - inputs []Receiver - output chan lp.CCMetric - done chan bool - wg *sync.WaitGroup - config []ReceiverConfig + inputs []Receiver + output chan lp.CCMetric + done chan bool + wg *sync.WaitGroup + config []ReceiverConfig } type ReceiveManager interface { @@ -29,128 +28,126 @@ type ReceiveManager interface { Close() } - func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) error { - rm.inputs = make([]Receiver, 0) - rm.output = nil - rm.done = make(chan bool) - rm.wg = wg - rm.config = make([]ReceiverConfig, 0) - configFile, err := os.Open(receiverConfigFile) - if err != nil { - log.Print(err.Error()) - return err - } - defer configFile.Close() - jsonParser := json.NewDecoder(configFile) - var rawConfigs []json.RawMessage + rm.inputs = make([]Receiver, 0) + rm.output = nil + rm.done = make(chan bool) + rm.wg = wg + rm.config = make([]ReceiverConfig, 0) + configFile, err := os.Open(receiverConfigFile) + if err != nil { + log.Print(err.Error()) + return err + } + defer configFile.Close() + jsonParser := json.NewDecoder(configFile) + var rawConfigs []json.RawMessage err = jsonParser.Decode(&rawConfigs) if err != nil { - log.Print(err.Error()) - return err - } - for _, raw := range rawConfigs { - log.Print("[ReceiveManager] ", string(raw)) - rm.AddInput(raw) -// if _, found := AvailableReceivers[k.Type]; !found { -// log.Print("[ReceiveManager] SKIP Config specifies unknown receiver 'type': ", k.Type) -// continue -// } -// r := AvailableReceivers[k.Type] -// err = r.Init(k) -// if err != nil { -// log.Print("[ReceiveManager] SKIP Receiver ", k.Type, " cannot be initialized: ", err.Error()) -// continue -// } -// rm.inputs = append(rm.inputs, r) - } - return nil + log.Print(err.Error()) + return err + } + for _, raw := range rawConfigs { + log.Print("[ReceiveManager] ", string(raw)) + rm.AddInput(raw) + // if _, found := AvailableReceivers[k.Type]; !found { + // log.Print("[ReceiveManager] SKIP Config specifies unknown receiver 'type': ", k.Type) + // continue + // } + // r := AvailableReceivers[k.Type] + // err = r.Init(k) + // if err != nil { + // log.Print("[ReceiveManager] SKIP Receiver ", k.Type, " cannot be initialized: ", err.Error()) + // continue + // } + // rm.inputs = append(rm.inputs, r) + } + return nil } - func (rm *receiveManager) Start() { - rm.wg.Add(1) + rm.wg.Add(1) - for _, r := range rm.inputs { - log.Print("[ReceiveManager] START ", r.Name()) - r.Start() - } - log.Print("[ReceiveManager] STARTED\n") -// go func() { -// for { -//ReceiveManagerLoop: -// select { -// case <- rm.done: -// log.Print("ReceiveManager done\n") -// rm.wg.Done() -// break ReceiveManagerLoop -// default: -// for _, c := range rm.inputs { -//ReceiveManagerInputLoop: -// select { -// case <- rm.done: -// log.Print("ReceiveManager done\n") -// rm.wg.Done() -// break ReceiveManagerInputLoop -// case p := <- c: -// log.Print("ReceiveManager: ", p) -// rm.output <- p -// default: -// } -// } -// } -// } -// }() -// for _, r := range rm.inputs { -// r.Close() -// } + for _, r := range rm.inputs { + log.Print("[ReceiveManager] START ", r.Name()) + r.Start() + } + log.Print("[ReceiveManager] STARTED\n") + // go func() { + // for { + //ReceiveManagerLoop: + // select { + // case <- rm.done: + // log.Print("ReceiveManager done\n") + // rm.wg.Done() + // break ReceiveManagerLoop + // default: + // for _, c := range rm.inputs { + //ReceiveManagerInputLoop: + // select { + // case <- rm.done: + // log.Print("ReceiveManager done\n") + // rm.wg.Done() + // break ReceiveManagerInputLoop + // case p := <- c: + // log.Print("ReceiveManager: ", p) + // rm.output <- p + // default: + // } + // } + // } + // } + // }() + // for _, r := range rm.inputs { + // r.Close() + // } } func (rm *receiveManager) AddInput(rawConfig json.RawMessage) error { - var config ReceiverConfig - err := json.Unmarshal(rawConfig, &config) + var config ReceiverConfig + err := json.Unmarshal(rawConfig, &config) if err != nil { - log.Print("[ReceiveManager] SKIP ", config.Type, " JSON config error: ", err.Error()) - log.Print(err.Error()) - return err + log.Print("[ReceiveManager] SKIP ", config.Type, " JSON config error: ", err.Error()) + log.Print(err.Error()) + return err } - if _, found := AvailableReceivers[config.Type]; !found { - log.Print("[ReceiveManager] SKIP ", config.Type, " unknown receiver: ", err.Error()) - return err - } - r := AvailableReceivers[config.Type] - err = r.Init(config) - if err != nil { - log.Print("[ReceiveManager] SKIP ", r.Name(), " initialization failed: ", err.Error()) - return err + if _, found := AvailableReceivers[config.Type]; !found { + log.Print("[ReceiveManager] SKIP ", config.Type, " unknown receiver: ", err.Error()) + return err } - rm.inputs = append(rm.inputs, r) - rm.config = append(rm.config, config) - return nil + r := AvailableReceivers[config.Type] + err = r.Init(config) + if err != nil { + log.Print("[ReceiveManager] SKIP ", r.Name(), " initialization failed: ", err.Error()) + return err + } + rm.inputs = append(rm.inputs, r) + rm.config = append(rm.config, config) + return nil } func (rm *receiveManager) AddOutput(output chan lp.CCMetric) { - rm.output = output - for _, r := range rm.inputs { - r.SetSink(rm.output) - } + rm.output = output + for _, r := range rm.inputs { + r.SetSink(rm.output) + } } func (rm *receiveManager) Close() { - for _, r := range rm.inputs { - log.Print("[ReceiveManager] CLOSE ", r.Name()) - r.Close() - } - rm.wg.Done() - log.Print("[ReceiveManager] CLOSE\n") - log.Print("[ReceiveManager] EXIT\n") + for _, r := range rm.inputs { + log.Print("[ReceiveManager] CLOSE ", r.Name()) + r.Close() + } + rm.wg.Done() + log.Print("[ReceiveManager] CLOSE\n") + log.Print("[ReceiveManager] EXIT\n") } func New(wg *sync.WaitGroup, receiverConfigFile string) (ReceiveManager, error) { - r := &receiveManager{} - err := r.Init(wg, receiverConfigFile) - if err != nil { - return nil, err - } - return r, err + r := &receiveManager{} + err := r.Init(wg, receiverConfigFile) + if err != nil { + return nil, err + } + return r, err } diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 1dc364a..25b0082 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -7,8 +7,8 @@ import ( "net/http" "time" - influx "github.com/influxdata/line-protocol" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + influx "github.com/influxdata/line-protocol" ) type HttpSink struct { @@ -20,7 +20,7 @@ type HttpSink struct { } func (s *HttpSink) Init(config sinkConfig) error { - s.name = "HttpSink" + s.name = "HttpSink" if len(config.Host) == 0 || len(config.Port) == 0 || len(config.Database) == 0 { return errors.New("`host`, `port` and `database` config options required for TCP sink") } diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 5a1b7a7..dca1572 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -39,7 +39,7 @@ func (s *InfluxSink) connect() error { } func (s *InfluxSink) Init(config sinkConfig) error { - s.name = "InfluxSink" + s.name = "InfluxSink" if len(config.Host) == 0 || len(config.Port) == 0 || len(config.Database) == 0 || @@ -65,9 +65,9 @@ func (s *InfluxSink) Write(point lp.CCMetric) error { tags[t.Key] = t.Value } if s.meta_as_tags { - for _, m := range point.MetaList() { - tags[m.Key] = m.Value - } + for _, m := range point.MetaList() { + tags[m.Key] = m.Value + } } for _, f := range point.FieldList() { fields[f.Key] = f.Value diff --git a/sinks/metricSink.go b/sinks/metricSink.go index 4813155..37ffa8f 100644 --- a/sinks/metricSink.go +++ b/sinks/metricSink.go @@ -6,7 +6,7 @@ import ( ) type sinkConfig struct { - Type string `json:"type"` + Type string `json:"type"` Host string `json:"host", omitempty` Port string `json:"port", omitempty` Database string `json:"database, omitempty"` diff --git a/sinks/natsSink.go b/sinks/natsSink.go index de12f4b..55c1558 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -4,8 +4,8 @@ import ( "bytes" "errors" "fmt" - influx "github.com/influxdata/line-protocol" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + influx "github.com/influxdata/line-protocol" nats "github.com/nats-io/nats.go" "log" "time" @@ -33,7 +33,7 @@ func (s *NatsSink) connect() error { } func (s *NatsSink) Init(config sinkConfig) error { - s.name = "NatsSink" + s.name = "NatsSink" if len(config.Host) == 0 || len(config.Port) == 0 || len(config.Database) == 0 { diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index 4a2e829..323966e 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -1,17 +1,16 @@ package sinks import ( - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - "sync" - "log" - "os" - "encoding/json" + "encoding/json" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + "log" + "os" + "sync" ) - type SinkEntity struct { - config json.RawMessage - output Sink + config json.RawMessage + output Sink } var AvailableSinks = map[string]Sink{ @@ -21,13 +20,12 @@ var AvailableSinks = map[string]Sink{ "http": &HttpSink{}, } - type sinkManager struct { - input chan lp.CCMetric - outputs []Sink - done chan bool - wg *sync.WaitGroup - config []sinkConfig + input chan lp.CCMetric + outputs []Sink + done chan bool + wg *sync.WaitGroup + config []sinkConfig } type SinkManager interface { @@ -38,114 +36,111 @@ type SinkManager interface { Close() } - func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { - sm.input = nil - sm.outputs = make([]Sink, 0) - sm.done = make(chan bool) - sm.wg = wg - sm.config = make([]sinkConfig, 0) - if len(sinkConfigFile) > 0 { - configFile, err := os.Open(sinkConfigFile) - if err != nil { - log.Print("[SinkManager] ", err.Error()) - return err - } - defer configFile.Close() - jsonParser := json.NewDecoder(configFile) - var rawConfigs []json.RawMessage - err = jsonParser.Decode(&rawConfigs) - if err != nil { - log.Print("[SinkManager] ", err.Error()) - return err - } - for _, raw := range rawConfigs { - err = sm.AddOutput(raw) - if err != nil { - continue - } - } - } - return nil + sm.input = nil + sm.outputs = make([]Sink, 0) + sm.done = make(chan bool) + sm.wg = wg + sm.config = make([]sinkConfig, 0) + if len(sinkConfigFile) > 0 { + configFile, err := os.Open(sinkConfigFile) + if err != nil { + log.Print("[SinkManager] ", err.Error()) + return err + } + defer configFile.Close() + jsonParser := json.NewDecoder(configFile) + var rawConfigs []json.RawMessage + err = jsonParser.Decode(&rawConfigs) + if err != nil { + log.Print("[SinkManager] ", err.Error()) + return err + } + for _, raw := range rawConfigs { + err = sm.AddOutput(raw) + if err != nil { + continue + } + } + } + return nil } - - func (sm *sinkManager) Start() { - sm.wg.Add(1) - batchcount := 20 - go func() { - for { -SinkManagerLoop: - select { - case <- sm.done: - for _, s := range sm.outputs { - s.Close() - } - log.Print("[SinkManager] DONE\n") - sm.wg.Done() - break SinkManagerLoop - case p := <- sm.input: - log.Print("[SinkManager] WRITE ", p) - for _, s := range sm.outputs { - s.Write(p) - } - if (batchcount == 0) { - log.Print("[SinkManager] FLUSH") - for _, s := range sm.outputs { - s.Flush() - } - batchcount = 20 - } - batchcount-- - default: - } - } - log.Print("[SinkManager] EXIT\n") - }() - log.Print("[SinkManager] STARTED\n") + sm.wg.Add(1) + batchcount := 20 + go func() { + for { + SinkManagerLoop: + select { + case <-sm.done: + for _, s := range sm.outputs { + s.Close() + } + log.Print("[SinkManager] DONE\n") + sm.wg.Done() + break SinkManagerLoop + case p := <-sm.input: + log.Print("[SinkManager] WRITE ", p) + for _, s := range sm.outputs { + s.Write(p) + } + if batchcount == 0 { + log.Print("[SinkManager] FLUSH") + for _, s := range sm.outputs { + s.Flush() + } + batchcount = 20 + } + batchcount-- + default: + } + } + log.Print("[SinkManager] EXIT\n") + }() + log.Print("[SinkManager] STARTED\n") } func (sm *sinkManager) AddInput(input chan lp.CCMetric) { - sm.input = input + sm.input = input } func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error { - var err error - var config sinkConfig - if len(rawConfig) > 3 { - err = json.Unmarshal(rawConfig, &config) - if err != nil { - log.Print("[SinkManager] SKIP ", config.Type, " JSON config error: ", err.Error()) - return err - } - } - if _, found := AvailableSinks[config.Type]; !found { - log.Print("[SinkManager] SKIP ", config.Type, " unknown sink: ", err.Error()) - return err - } - s := AvailableSinks[config.Type] - err = s.Init(config) - if err != nil { - log.Print("[SinkManager] SKIP ", s.Name(), " initialization failed: ", err.Error()) - return err - } - sm.outputs = append(sm.outputs, s) - sm.config = append(sm.config, config) - return nil + var err error + var config sinkConfig + if len(rawConfig) > 3 { + err = json.Unmarshal(rawConfig, &config) + if err != nil { + log.Print("[SinkManager] SKIP ", config.Type, " JSON config error: ", err.Error()) + return err + } + } + if _, found := AvailableSinks[config.Type]; !found { + log.Print("[SinkManager] SKIP ", config.Type, " unknown sink: ", err.Error()) + return err + } + s := AvailableSinks[config.Type] + err = s.Init(config) + if err != nil { + log.Print("[SinkManager] SKIP ", s.Name(), " initialization failed: ", err.Error()) + return err + } + sm.outputs = append(sm.outputs, s) + sm.config = append(sm.config, config) + return nil } func (sm *sinkManager) Close() { - sm.done <- true - log.Print("[SinkManager] CLOSE") - return + sm.done <- true + log.Print("[SinkManager] CLOSE") + return } func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) { - sm := &sinkManager{} - err := sm.Init(wg, sinkConfigFile) - if err != nil { - return nil, err - } - return sm, err + sm := &sinkManager{} + err := sm.Init(wg, sinkConfigFile) + if err != nil { + return nil, err + } + return sm, err } diff --git a/sinks/stdoutSink.go b/sinks/stdoutSink.go index a66ba45..0955f50 100644 --- a/sinks/stdoutSink.go +++ b/sinks/stdoutSink.go @@ -14,8 +14,8 @@ type StdoutSink struct { } func (s *StdoutSink) Init(config sinkConfig) error { - s.name = "StdoutSink" - s.meta_as_tags = config.MetaAsTags + s.name = "StdoutSink" + s.meta_as_tags = config.MetaAsTags return nil } @@ -26,9 +26,9 @@ func (s *StdoutSink) Write(point lp.CCMetric) error { tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", t.Key, t.Value)) } if s.meta_as_tags { - for _, m := range point.MetaList() { - tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", m.Key, m.Value)) - } + for _, m := range point.MetaList() { + tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", m.Key, m.Value)) + } } for _, f := range point.FieldList() { switch f.Value.(type) {