From 44d8b0c9794667caca60c234fe7a585e42cc5d38 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Mon, 20 Dec 2021 12:40:51 +0100 Subject: [PATCH] Use channels, add a metric router, split up configuration and use extended version of Influx line protocol internally --- collectors.json | 15 + collectors/collectorManager.go | 138 +++++++ collectors/cpustatMetric.go | 19 +- collectors/customCmdMetric.go | 26 +- collectors/diskstatMetric.go | 13 +- collectors/infinibandMetric.go | 75 +--- collectors/ipmiMetric.go | 37 +- collectors/likwidMetric.go | 55 ++- collectors/loadavgMetric.go | 17 +- collectors/lustreMetric.go | 16 +- collectors/memstatMetric.go | 21 +- collectors/metricCollector.go | 24 +- collectors/netstatMetric.go | 19 +- collectors/nvidiaMetric.go | 102 ++--- collectors/tempMetric.go | 21 +- collectors/topprocsMetric.go | 13 +- config.json | 40 +- go.mod | 3 +- go.sum | 2 + internal/ccMetric/ccMetric.go | 376 +++++++++++++++++ internal/metricRouter/metricRouter.go | 118 ++++++ metric-collector.go | 563 ++++++++++++++++---------- receivers.json | 8 + receivers/metricReceiver.go | 22 +- receivers/natsReceiver.go | 63 +-- receivers/receiveManager.go | 156 +++++++ router.json | 17 + sinks.json | 6 + sinks/httpSink.go | 16 +- sinks/influxSink.go | 16 +- sinks/metricSink.go | 36 +- sinks/natsSink.go | 14 +- sinks/sinkManager.go | 151 +++++++ sinks/stdoutSink.go | 15 +- 34 files changed, 1695 insertions(+), 538 deletions(-) create mode 100644 collectors.json create mode 100644 collectors/collectorManager.go create mode 100644 internal/ccMetric/ccMetric.go create mode 100644 internal/metricRouter/metricRouter.go create mode 100644 receivers.json create mode 100644 receivers/receiveManager.go create mode 100644 router.json create mode 100644 sinks.json create mode 100644 sinks/sinkManager.go diff --git a/collectors.json b/collectors.json new file mode 100644 index 0000000..df2fce3 --- /dev/null +++ b/collectors.json @@ -0,0 +1,15 @@ +{ + "tempstat": { + "tag_override": { + "hwmon0" : { + "type" : "socket", + "type-id" : "0" + }, + "hwmon1" : { + "type" : "socket", + "type-id" : "1" + } + } + } + +} diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go new file mode 100644 index 0000000..1bd0d6f --- /dev/null +++ b/collectors/collectorManager.go @@ -0,0 +1,138 @@ +package collectors + +import ( + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + "sync" + "time" + "log" + "os" + "encoding/json" +) + + +var AvailableCollectors = map[string]MetricCollector{ + "likwid": &LikwidCollector{}, + "loadavg": &LoadavgCollector{}, + "memstat": &MemstatCollector{}, + "netstat": &NetstatCollector{}, + "ibstat": &InfinibandCollector{}, + "lustrestat": &LustreCollector{}, + "cpustat": &CpustatCollector{}, + "topprocs": &TopProcsCollector{}, + "nvidia": &NvidiaCollector{}, + "customcmd": &CustomCmdCollector{}, + "diskstat": &DiskstatCollector{}, + "tempstat": &TempCollector{}, + "ipmistat": &IpmiCollector{}, +} + + +type collectorManager struct { + collectors []MetricCollector + output chan lp.CCMetric + done chan bool + interval time.Duration + duration time.Duration + wg *sync.WaitGroup + config map[string]json.RawMessage +} + +type CollectorManager interface { + Init(interval time.Duration, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error + AddOutput(output chan lp.CCMetric) + Start() + Close() +} + + +func (cm *collectorManager) Init(interval time.Duration, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error { + cm.collectors = make([]MetricCollector, 0) + cm.output = nil + cm.done = make(chan bool) + cm.wg = wg + cm.interval = interval + cm.duration = duration + configFile, err := os.Open(collectConfigFile) + if err != nil { + log.Print(err.Error()) + return err + } + defer configFile.Close() + jsonParser := json.NewDecoder(configFile) + err = jsonParser.Decode(&cm.config) + if err != nil { + log.Print(err.Error()) + return err + } + for k, cfg := range cm.config { + log.Print(k, " ", cfg) + if _, found := AvailableCollectors[k]; !found { + log.Print("[CollectorManager] SKIP unknown collector ", k) + continue + } + c := AvailableCollectors[k] + + err = c.Init(cfg) + if err != nil { + log.Print("[CollectorManager] Collector ", k, "initialization failed: ", err.Error()) + continue + } + cm.collectors = append(cm.collectors, c) + } + return nil +} + +func (cm *collectorManager) Start() { + cm.wg.Add(1) + ticker := time.NewTicker(cm.interval) + go func() { + for { +CollectorManagerLoop: + select { + case <- cm.done: + for _, c := range cm.collectors { + c.Close() + } + cm.wg.Done() + log.Print("[CollectorManager] DONE\n") + break CollectorManagerLoop + case t := <-ticker.C: + for _, c := range cm.collectors { +CollectorManagerInputLoop: + select { + case <- cm.done: + for _, c := range cm.collectors { + c.Close() + } + cm.wg.Done() + log.Print("[CollectorManager] DONE\n") + break CollectorManagerInputLoop + default: + log.Print("[CollectorManager] ", c.Name(), " ", t) + c.Read(cm.duration, cm.output) + } + } + } + } + log.Print("[CollectorManager] EXIT\n") + }() + log.Print("[CollectorManager] STARTED\n") +} + +func (cm *collectorManager) AddOutput(output chan lp.CCMetric) { + cm.output = output +} + +func (cm *collectorManager) Close() { + cm.done <- true + log.Print("[CollectorManager] CLOSE") +} + +func New(interval time.Duration, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) { + cm := &collectorManager{} + err := cm.Init(interval, duration, wg, collectConfigFile) + if err != nil { + return nil, err + } + return cm, err +} diff --git a/collectors/cpustatMetric.go b/collectors/cpustatMetric.go index fe31c3c..c9dd746 100644 --- a/collectors/cpustatMetric.go +++ b/collectors/cpustatMetric.go @@ -3,7 +3,7 @@ package collectors import ( "encoding/json" "fmt" - lp "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "io/ioutil" "strconv" "strings" @@ -17,13 +17,14 @@ type CpustatCollectorConfig struct { } type CpustatCollector struct { - MetricCollector + metricCollector config CpustatCollectorConfig } -func (m *CpustatCollector) Init(config []byte) error { +func (m *CpustatCollector) Init(config json.RawMessage) error { m.name = "CpustatCollector" m.setup() + m.meta = map[string]string{"source" : m.name, "group" : "CPU"} if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { @@ -34,7 +35,7 @@ func (m *CpustatCollector) Init(config []byte) error { return nil } -func ParseStatLine(line string, cpu int, exclude []string, out *[]lp.MutableMetric) { +func (c *CpustatCollector) parseStatLine(line string, cpu int, exclude []string, output chan lp.CCMetric) { ls := strings.Fields(line) matches := []string{"", "cpu_user", "cpu_nice", "cpu_system", "cpu_idle", "cpu_iowait", "cpu_irq", "cpu_softirq", "cpu_steal", "cpu_guest", "cpu_guest_nice"} for _, ex := range exclude { @@ -51,16 +52,16 @@ func ParseStatLine(line string, cpu int, exclude []string, out *[]lp.MutableMetr if len(m) > 0 { x, err := strconv.ParseInt(ls[i], 0, 64) if err == nil { - y, err := lp.New(m, tags, map[string]interface{}{"value": int(x)}, time.Now()) + y, err := lp.New(m, tags, c.meta, map[string]interface{}{"value": int(x)}, time.Now()) if err == nil { - *out = append(*out, y) + output <- y } } } } } -func (m *CpustatCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { +func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric) { if !m.init { return } @@ -77,11 +78,11 @@ func (m *CpustatCollector) Read(interval time.Duration, out *[]lp.MutableMetric) } ls := strings.Fields(line) if strings.Compare(ls[0], "cpu") == 0 { - ParseStatLine(line, -1, m.config.ExcludeMetrics, out) + m.parseStatLine(line, -1, m.config.ExcludeMetrics, output) } else if strings.HasPrefix(ls[0], "cpu") { cpustr := strings.TrimLeft(ls[0], "cpu") cpu, _ := strconv.Atoi(cpustr) - ParseStatLine(line, cpu, m.config.ExcludeMetrics, out) + m.parseStatLine(line, cpu, m.config.ExcludeMetrics, output) } } } diff --git a/collectors/customCmdMetric.go b/collectors/customCmdMetric.go index 547bb87..444d534 100644 --- a/collectors/customCmdMetric.go +++ b/collectors/customCmdMetric.go @@ -3,7 +3,8 @@ package collectors import ( "encoding/json" "errors" - lp "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + influx "github.com/influxdata/line-protocol" "io/ioutil" "log" "os/exec" @@ -20,17 +21,18 @@ type CustomCmdCollectorConfig struct { } type CustomCmdCollector struct { - MetricCollector - handler *lp.MetricHandler - parser *lp.Parser + metricCollector + handler *influx.MetricHandler + parser *influx.Parser config CustomCmdCollectorConfig commands []string files []string } -func (m *CustomCmdCollector) Init(config []byte) error { +func (m *CustomCmdCollector) Init(config json.RawMessage) error { var err error m.name = "CustomCmdCollector" + m.meta = map[string]string{"source" : m.name, "group" : "Custom"} if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { @@ -60,8 +62,8 @@ func (m *CustomCmdCollector) Init(config []byte) error { if len(m.files) == 0 && len(m.commands) == 0 { return errors.New("No metrics to collect") } - m.handler = lp.NewMetricHandler() - m.parser = lp.NewParser(m.handler) + m.handler = influx.NewMetricHandler() + m.parser = influx.NewParser(m.handler) m.parser.SetTimeFunc(DefaultTime) m.init = true return nil @@ -71,7 +73,7 @@ var DefaultTime = func() time.Time { return time.Unix(42, 0) } -func (m *CustomCmdCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { +func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetric) { if !m.init { return } @@ -94,9 +96,9 @@ func (m *CustomCmdCollector) Read(interval time.Duration, out *[]lp.MutableMetri if skip { continue } - y, err := lp.New(c.Name(), Tags2Map(c), Fields2Map(c), c.Time()) + y, err := lp.New(c.Name(), Tags2Map(c), m.meta, Fields2Map(c), c.Time()) if err == nil { - *out = append(*out, y) + output <- y } } } @@ -116,9 +118,9 @@ func (m *CustomCmdCollector) Read(interval time.Duration, out *[]lp.MutableMetri if skip { continue } - y, err := lp.New(f.Name(), Tags2Map(f), Fields2Map(f), f.Time()) + y, err := lp.New(f.Name(), Tags2Map(f), m.meta, Fields2Map(f), f.Time()) if err == nil { - *out = append(*out, y) + output <- y } } } diff --git a/collectors/diskstatMetric.go b/collectors/diskstatMetric.go index e2d2f25..611cdb9 100644 --- a/collectors/diskstatMetric.go +++ b/collectors/diskstatMetric.go @@ -1,7 +1,7 @@ package collectors import ( - lp "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "io/ioutil" // "log" "encoding/json" @@ -19,14 +19,15 @@ type DiskstatCollectorConfig struct { } type DiskstatCollector struct { - MetricCollector + metricCollector matches map[int]string config DiskstatCollectorConfig } -func (m *DiskstatCollector) Init(config []byte) error { +func (m *DiskstatCollector) Init(config json.RawMessage) error { var err error m.name = "DiskstatCollector" + m.meta = map[string]string{"source" : m.name, "group" : "Disk"} m.setup() if len(config) > 0 { err = json.Unmarshal(config, &m.config) @@ -71,7 +72,7 @@ func (m *DiskstatCollector) Init(config []byte) error { return err } -func (m *DiskstatCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { +func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric) { var lines []string if !m.init { return @@ -99,9 +100,9 @@ func (m *DiskstatCollector) Read(interval time.Duration, out *[]lp.MutableMetric if idx < len(f) { x, err := strconv.ParseInt(f[idx], 0, 64) if err == nil { - y, err := lp.New(name, tags, map[string]interface{}{"value": int(x)}, time.Now()) + y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": int(x)}, time.Now()) if err == nil { - *out = append(*out, y) + output <- y } } } diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index 93725d1..a076646 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -2,7 +2,7 @@ package collectors import ( "fmt" - lp "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "io/ioutil" "log" "os/exec" @@ -25,7 +25,7 @@ type InfinibandCollectorConfig struct { } type InfinibandCollector struct { - MetricCollector + metricCollector tags map[string]string lids map[string]map[string]string config InfinibandCollectorConfig @@ -49,11 +49,12 @@ func (m *InfinibandCollector) Help() { fmt.Println("- ib_xmit") } -func (m *InfinibandCollector) Init(config []byte) error { +func (m *InfinibandCollector) Init(config json.RawMessage) error { var err error m.name = "InfinibandCollector" m.use_perfquery = false m.setup() + m.meta = map[string]string{"source" : m.name, "group" : "Network"} m.tags = map[string]string{"type": "node"} if len(config) > 0 { err = json.Unmarshal(config, &m.config) @@ -110,7 +111,7 @@ func (m *InfinibandCollector) Init(config []byte) error { return err } -func DoPerfQuery(cmd string, dev string, lid string, port string, tags map[string]string, out *[]lp.MutableMetric) error { +func (m *InfinibandCollector) doPerfQuery(cmd string, dev string, lid string, port string, tags map[string]string, output chan lp.CCMetric) error { args := fmt.Sprintf("-r %s %s 0xf000", lid, port) command := exec.Command(cmd, args) @@ -127,9 +128,9 @@ func DoPerfQuery(cmd string, dev string, lid string, port string, tags map[strin lv := strings.Fields(line) v, err := strconv.ParseFloat(lv[1], 64) if err == nil { - y, err := lp.New("ib_recv", tags, map[string]interface{}{"value": float64(v)}, time.Now()) + y, err := lp.New("ib_recv", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) if err == nil { - *out = append(*out, y) + output <- y } } } @@ -137,9 +138,9 @@ func DoPerfQuery(cmd string, dev string, lid string, port string, tags map[strin lv := strings.Fields(line) v, err := strconv.ParseFloat(lv[1], 64) if err == nil { - y, err := lp.New("ib_xmit", tags, map[string]interface{}{"value": float64(v)}, time.Now()) + y, err := lp.New("ib_xmit", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) if err == nil { - *out = append(*out, y) + output <- y } } } @@ -147,16 +148,16 @@ func DoPerfQuery(cmd string, dev string, lid string, port string, tags map[strin return nil } -func DoSysfsRead(dev string, lid string, port string, tags map[string]string, out *[]lp.MutableMetric) error { +func (m *InfinibandCollector) doSysfsRead(dev string, lid string, port string, tags map[string]string, output chan lp.CCMetric) error { path := fmt.Sprintf("%s/%s/ports/%s/counters/", string(IBBASEPATH), dev, port) buffer, err := ioutil.ReadFile(fmt.Sprintf("%s/port_rcv_data", path)) if err == nil { data := strings.Replace(string(buffer), "\n", "", -1) v, err := strconv.ParseFloat(data, 64) if err == nil { - y, err := lp.New("ib_recv", tags, map[string]interface{}{"value": float64(v)}, time.Now()) + y, err := lp.New("ib_recv", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) if err == nil { - *out = append(*out, y) + output <- y } } } @@ -165,71 +166,29 @@ func DoSysfsRead(dev string, lid string, port string, tags map[string]string, ou data := strings.Replace(string(buffer), "\n", "", -1) v, err := strconv.ParseFloat(data, 64) if err == nil { - y, err := lp.New("ib_xmit", tags, map[string]interface{}{"value": float64(v)}, time.Now()) + y, err := lp.New("ib_xmit", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) if err == nil { - *out = append(*out, y) + output <- y } } } return nil } -func (m *InfinibandCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { +func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetric) { if m.init { for dev, ports := range m.lids { for port, lid := range ports { tags := map[string]string{"type": "node", "device": dev, "port": port} if m.use_perfquery { - DoPerfQuery(m.config.PerfQueryPath, dev, lid, port, tags, out) + m.doPerfQuery(m.config.PerfQueryPath, dev, lid, port, tags, output) } else { - DoSysfsRead(dev, lid, port, tags, out) + m.doSysfsRead(dev, lid, port, tags, output) } } } } - - // buffer, err := ioutil.ReadFile(string(LIDFILE)) - - // if err != nil { - // log.Print(err) - // return - // } - - // args := fmt.Sprintf("-r %s 1 0xf000", string(buffer)) - - // command := exec.Command(PERFQUERY, args) - // command.Wait() - // stdout, err := command.Output() - // if err != nil { - // log.Print(err) - // return - // } - - // ll := strings.Split(string(stdout), "\n") - - // for _, line := range ll { - // if strings.HasPrefix(line, "PortRcvData") || strings.HasPrefix(line, "RcvData") { - // lv := strings.Fields(line) - // v, err := strconv.ParseFloat(lv[1], 64) - // if err == nil { - // y, err := lp.New("ib_recv", m.tags, map[string]interface{}{"value": float64(v)}, time.Now()) - // if err == nil { - // *out = append(*out, y) - // } - // } - // } - // if strings.HasPrefix(line, "PortXmitData") || strings.HasPrefix(line, "XmtData") { - // lv := strings.Fields(line) - // v, err := strconv.ParseFloat(lv[1], 64) - // if err == nil { - // y, err := lp.New("ib_xmit", m.tags, map[string]interface{}{"value": float64(v)}, time.Now()) - // if err == nil { - // *out = append(*out, y) - // } - // } - // } - // } } func (m *InfinibandCollector) Close() { diff --git a/collectors/ipmiMetric.go b/collectors/ipmiMetric.go index d28a134..341a895 100644 --- a/collectors/ipmiMetric.go +++ b/collectors/ipmiMetric.go @@ -3,7 +3,7 @@ package collectors import ( "encoding/json" "errors" - lp "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "log" "os" "os/exec" @@ -22,15 +22,16 @@ type IpmiCollectorConfig struct { } type IpmiCollector struct { - MetricCollector + metricCollector tags map[string]string matches map[string]string config IpmiCollectorConfig } -func (m *IpmiCollector) Init(config []byte) error { +func (m *IpmiCollector) Init(config json.RawMessage) error { m.name = "IpmiCollector" m.setup() + m.meta = map[string]string{"source" : m.name, "group" : "IPMI"} if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { @@ -52,7 +53,7 @@ func (m *IpmiCollector) Init(config []byte) error { return nil } -func ReadIpmiTool(cmd string, out *[]lp.MutableMetric) { +func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) { command := exec.Command(cmd, "sensor") command.Wait() stdout, err := command.Output() @@ -73,24 +74,25 @@ func ReadIpmiTool(cmd string, out *[]lp.MutableMetric) { name := strings.ToLower(strings.Replace(strings.Trim(lv[0], " "), " ", "_", -1)) unit := strings.Trim(lv[2], " ") if unit == "Volts" { - unit = "V" + unit = "Volts" } else if unit == "degrees C" { - unit = "C" + unit = "degC" } else if unit == "degrees F" { - unit = "F" + unit = "degF" } else if unit == "Watts" { - unit = "W" + unit = "Watts" } - y, err := lp.New(name, map[string]string{"unit": unit, "type": "node"}, map[string]interface{}{"value": v}, time.Now()) + y, err := lp.New(name, map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": v}, time.Now()) if err == nil { - *out = append(*out, y) + y.AddMeta("unit", unit) + output <- y } } } } -func ReadIpmiSensors(cmd string, out *[]lp.MutableMetric) { +func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) { command := exec.Command(cmd, "--comma-separated-output", "--sdr-cache-recreate") command.Wait() @@ -108,25 +110,28 @@ func ReadIpmiSensors(cmd string, out *[]lp.MutableMetric) { v, err := strconv.ParseFloat(lv[3], 64) if err == nil { name := strings.ToLower(strings.Replace(lv[1], " ", "_", -1)) - y, err := lp.New(name, map[string]string{"unit": lv[4], "type": "node"}, map[string]interface{}{"value": v}, time.Now()) + y, err := lp.New(name, map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": v}, time.Now()) if err == nil { - *out = append(*out, y) + if len(lv) > 4 { + y.AddMeta("unit", lv[4]) + } + output <- y } } } } } -func (m *IpmiCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { +func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMetric) { if len(m.config.IpmitoolPath) > 0 { _, err := os.Stat(m.config.IpmitoolPath) if err == nil { - ReadIpmiTool(m.config.IpmitoolPath, out) + m.readIpmiTool(m.config.IpmitoolPath, output) } } else if len(m.config.IpmisensorsPath) > 0 { _, err := os.Stat(m.config.IpmisensorsPath) if err == nil { - ReadIpmiSensors(m.config.IpmisensorsPath, out) + m.readIpmiSensors(m.config.IpmisensorsPath, output) } } } diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index 2fd1129..cc3dd98 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -12,7 +12,7 @@ import ( "encoding/json" "errors" "fmt" - lp "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "gopkg.in/Knetic/govaluate.v2" "io/ioutil" "log" @@ -24,10 +24,23 @@ import ( "unsafe" ) +type MetricScope int + +const ( + METRIC_SCOPE_HWTHREAD = iota + METRIC_SCOPE_SOCKET + METRIC_SCOPE_NUMA + METRIC_SCOPE_NODE +) + +func (ms MetricScope) String() string { + return []string{"Head", "Shoulder", "Knee", "Toe"}[ms] +} + type LikwidCollectorMetricConfig struct { Name string `json:"name"` Calc string `json:"calc"` - Socket_scope bool `json:"socket_scope"` + Scope MetricScope `json:"socket_scope"` Publish bool `json:"publish"` } @@ -44,7 +57,7 @@ type LikwidCollectorConfig struct { } type LikwidCollector struct { - MetricCollector + metricCollector cpulist []C.int sock2tid map[int]int metrics map[C.int]map[string]int @@ -104,7 +117,7 @@ func getSocketCpus() map[C.int]int { return outmap } -func (m *LikwidCollector) Init(config []byte) error { +func (m *LikwidCollector) Init(config json.RawMessage) error { var ret C.int m.name = "LikwidCollector" if len(config) > 0 { @@ -114,11 +127,13 @@ func (m *LikwidCollector) Init(config []byte) error { } } m.setup() + m.meta = map[string]string{"source" : m.name, "group" : "PerfCounter"} cpulist := CpuList() m.cpulist = make([]C.int, len(cpulist)) slist := getSocketCpus() m.sock2tid = make(map[int]int) +// m.numa2tid = make(map[int]int) for i, c := range cpulist { m.cpulist[i] = C.int(c) if sid, found := slist[m.cpulist[i]]; found { @@ -168,7 +183,7 @@ func (m *LikwidCollector) Init(config []byte) error { return nil } -func (m *LikwidCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { +func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) { if !m.init { return } @@ -245,24 +260,28 @@ func (m *LikwidCollector) Read(interval time.Duration, out *[]lp.MutableMetric) for _, metric := range evset.Metrics { _, skip := stringArrayContains(m.config.ExcludeMetrics, metric.Name) if metric.Publish && !skip { - if metric.Socket_scope { + if metric.Scope.String() == "socket" { for sid, tid := range m.sock2tid { y, err := lp.New(metric.Name, - map[string]string{"type": "socket", "type-id": fmt.Sprintf("%d", int(sid))}, + map[string]string{"type": "socket", + "type-id": fmt.Sprintf("%d", int(sid))}, + m.meta, map[string]interface{}{"value": m.mresults[i][tid][metric.Name]}, time.Now()) if err == nil { - *out = append(*out, y) + output <- y } } - } else { + } else if metric.Scope.String() == "hwthread" { for tid, cpu := range m.cpulist { y, err := lp.New(metric.Name, - map[string]string{"type": "cpu", "type-id": fmt.Sprintf("%d", int(cpu))}, + map[string]string{"type": "cpu", + "type-id": fmt.Sprintf("%d", int(cpu))}, + m.meta, map[string]interface{}{"value": m.mresults[i][tid][metric.Name]}, time.Now()) if err == nil { - *out = append(*out, y) + output <- y } } } @@ -272,24 +291,28 @@ func (m *LikwidCollector) Read(interval time.Duration, out *[]lp.MutableMetric) for _, metric := range m.config.Metrics { _, skip := stringArrayContains(m.config.ExcludeMetrics, metric.Name) if metric.Publish && !skip { - if metric.Socket_scope { + if metric.Scope.String() == "socket" { for sid, tid := range m.sock2tid { y, err := lp.New(metric.Name, - map[string]string{"type": "socket", "type-id": fmt.Sprintf("%d", int(sid))}, + map[string]string{"type": "socket", + "type-id": fmt.Sprintf("%d", int(sid))}, + m.meta, map[string]interface{}{"value": m.gmresults[tid][metric.Name]}, time.Now()) if err == nil { - *out = append(*out, y) + output <- y } } } else { for tid, cpu := range m.cpulist { y, err := lp.New(metric.Name, - map[string]string{"type": "cpu", "type-id": fmt.Sprintf("%d", int(cpu))}, + map[string]string{"type": "cpu", + "type-id": fmt.Sprintf("%d", int(cpu))}, + m.meta, map[string]interface{}{"value": m.gmresults[tid][metric.Name]}, time.Now()) if err == nil { - *out = append(*out, y) + output <- y } } } diff --git a/collectors/loadavgMetric.go b/collectors/loadavgMetric.go index dbccf22..418b15c 100644 --- a/collectors/loadavgMetric.go +++ b/collectors/loadavgMetric.go @@ -2,7 +2,7 @@ package collectors import ( "encoding/json" - lp "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "io/ioutil" "strconv" "strings" @@ -16,14 +16,14 @@ type LoadavgCollectorConfig struct { } type LoadavgCollector struct { - MetricCollector + metricCollector tags map[string]string load_matches []string proc_matches []string config LoadavgCollectorConfig } -func (m *LoadavgCollector) Init(config []byte) error { +func (m *LoadavgCollector) Init(config json.RawMessage) error { m.name = "LoadavgCollector" m.setup() if len(config) > 0 { @@ -32,6 +32,7 @@ func (m *LoadavgCollector) Init(config []byte) error { return err } } + m.meta = map[string]string{"source" : m.name, "group" : "LOAD"} m.tags = map[string]string{"type": "node"} m.load_matches = []string{"load_one", "load_five", "load_fifteen"} m.proc_matches = []string{"proc_run", "proc_total"} @@ -39,7 +40,7 @@ func (m *LoadavgCollector) Init(config []byte) error { return nil } -func (m *LoadavgCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { +func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric) { var skip bool if !m.init { return @@ -55,9 +56,9 @@ func (m *LoadavgCollector) Read(interval time.Duration, out *[]lp.MutableMetric) x, err := strconv.ParseFloat(ls[i], 64) if err == nil { _, skip = stringArrayContains(m.config.ExcludeMetrics, name) - y, err := lp.New(name, m.tags, map[string]interface{}{"value": float64(x)}, time.Now()) + y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": float64(x)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } } @@ -66,9 +67,9 @@ func (m *LoadavgCollector) Read(interval time.Duration, out *[]lp.MutableMetric) x, err := strconv.ParseFloat(lv[i], 64) if err == nil { _, skip = stringArrayContains(m.config.ExcludeMetrics, name) - y, err := lp.New(name, m.tags, map[string]interface{}{"value": float64(x)}, time.Now()) + y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": float64(x)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } } diff --git a/collectors/lustreMetric.go b/collectors/lustreMetric.go index e7bb7a6..bde77c2 100644 --- a/collectors/lustreMetric.go +++ b/collectors/lustreMetric.go @@ -3,7 +3,7 @@ package collectors import ( "encoding/json" "errors" - lp "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "io/ioutil" "log" "strconv" @@ -19,14 +19,14 @@ type LustreCollectorConfig struct { } type LustreCollector struct { - MetricCollector + metricCollector tags map[string]string matches map[string]map[string]int devices []string config LustreCollectorConfig } -func (m *LustreCollector) Init(config []byte) error { +func (m *LustreCollector) Init(config json.RawMessage) error { var err error m.name = "LustreCollector" if len(config) > 0 { @@ -37,6 +37,7 @@ func (m *LustreCollector) Init(config []byte) error { } m.setup() m.tags = map[string]string{"type": "node"} + m.meta = map[string]string{"source" : m.name, "group" : "Lustre"} m.matches = map[string]map[string]int{"read_bytes": {"read_bytes": 6, "read_requests": 1}, "write_bytes": {"write_bytes": 6, "write_requests": 1}, "open": {"open": 1}, @@ -63,7 +64,7 @@ func (m *LustreCollector) Init(config []byte) error { return nil } -func (m *LustreCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { +func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric) { if !m.init { return } @@ -87,9 +88,12 @@ func (m *LustreCollector) Read(interval time.Duration, out *[]lp.MutableMetric) } x, err := strconv.ParseInt(lf[idx], 0, 64) if err == nil { - y, err := lp.New(name, m.tags, map[string]interface{}{"value": x}, time.Now()) + y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, time.Now()) if err == nil { - *out = append(*out, y) + if strings.Contains(name, "byte") { + y.AddMeta("unit", "Byte") + } + output <- y } } } diff --git a/collectors/memstatMetric.go b/collectors/memstatMetric.go index 91987bb..1d07cd8 100644 --- a/collectors/memstatMetric.go +++ b/collectors/memstatMetric.go @@ -4,7 +4,7 @@ import ( "encoding/json" "errors" "fmt" - lp "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "io/ioutil" "log" "strconv" @@ -19,14 +19,14 @@ type MemstatCollectorConfig struct { } type MemstatCollector struct { - MetricCollector + metricCollector stats map[string]int64 tags map[string]string matches map[string]string config MemstatCollectorConfig } -func (m *MemstatCollector) Init(config []byte) error { +func (m *MemstatCollector) Init(config json.RawMessage) error { var err error m.name = "MemstatCollector" if len(config) > 0 { @@ -35,6 +35,7 @@ func (m *MemstatCollector) Init(config []byte) error { return err } } + m.meta = map[string]string{"source" : m.name, "group" : "Memory", "unit": "kByte"} m.stats = make(map[string]int64) m.matches = make(map[string]string) m.tags = map[string]string{"type": "node"} @@ -64,7 +65,7 @@ func (m *MemstatCollector) Init(config []byte) error { return err } -func (m *MemstatCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { +func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) { if !m.init { return } @@ -96,9 +97,9 @@ func (m *MemstatCollector) Read(interval time.Duration, out *[]lp.MutableMetric) log.Print(err) continue } - y, err := lp.New(name, m.tags, map[string]interface{}{"value": int(float64(m.stats[match]) * 1.0e-3)}, time.Now()) + y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": int(float64(m.stats[match]) * 1.0e-3)}, time.Now()) if err == nil { - *out = append(*out, y) + output <- y } } @@ -107,18 +108,18 @@ func (m *MemstatCollector) Read(interval time.Duration, out *[]lp.MutableMetric) if _, cached := m.stats[`Cached`]; cached { memUsed := m.stats[`MemTotal`] - (m.stats[`MemFree`] + m.stats[`Buffers`] + m.stats[`Cached`]) _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used") - y, err := lp.New("mem_used", m.tags, map[string]interface{}{"value": int(float64(memUsed) * 1.0e-3)}, time.Now()) + y, err := lp.New("mem_used", m.tags, m.meta, map[string]interface{}{"value": int(float64(memUsed) * 1.0e-3)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } } } if _, found := m.stats[`MemShared`]; found { _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_shared") - y, err := lp.New("mem_shared", m.tags, map[string]interface{}{"value": int(float64(m.stats[`MemShared`]) * 1.0e-3)}, time.Now()) + y, err := lp.New("mem_shared", m.tags, m.meta, map[string]interface{}{"value": int(float64(m.stats[`MemShared`]) * 1.0e-3)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } } diff --git a/collectors/metricCollector.go b/collectors/metricCollector.go index 0228530..ebe5f0f 100644 --- a/collectors/metricCollector.go +++ b/collectors/metricCollector.go @@ -2,36 +2,40 @@ package collectors import ( "errors" - lp "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + influx "github.com/influxdata/line-protocol" "io/ioutil" "log" "strconv" "strings" "time" + "encoding/json" ) -type MetricGetter interface { +type MetricCollector interface { Name() string - Init(config []byte) error + Init(config json.RawMessage) error Initialized() bool - Read(time.Duration, *[]lp.MutableMetric) + Read(duration time.Duration, output chan lp.CCMetric) Close() } -type MetricCollector struct { +type metricCollector struct { + output chan lp.CCMetric name string init bool + meta map[string]string } -func (c *MetricCollector) Name() string { +func (c *metricCollector) Name() string { return c.name } -func (c *MetricCollector) setup() error { +func (c *metricCollector) setup() error { return nil } -func (c *MetricCollector) Initialized() bool { +func (c *metricCollector) Initialized() bool { return c.init == true } @@ -103,7 +107,7 @@ func CpuList() []int { return cpulist } -func Tags2Map(metric lp.Metric) map[string]string { +func Tags2Map(metric influx.Metric) map[string]string { tags := make(map[string]string) for _, t := range metric.TagList() { tags[t.Key] = t.Value @@ -111,7 +115,7 @@ func Tags2Map(metric lp.Metric) map[string]string { return tags } -func Fields2Map(metric lp.Metric) map[string]interface{} { +func Fields2Map(metric influx.Metric) map[string]interface{} { fields := make(map[string]interface{}) for _, f := range metric.FieldList() { fields[f.Key] = f.Value diff --git a/collectors/netstatMetric.go b/collectors/netstatMetric.go index 659b89f..d28f828 100644 --- a/collectors/netstatMetric.go +++ b/collectors/netstatMetric.go @@ -2,7 +2,7 @@ package collectors import ( "encoding/json" - lp "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "io/ioutil" "log" "strconv" @@ -17,14 +17,15 @@ type NetstatCollectorConfig struct { } type NetstatCollector struct { - MetricCollector + metricCollector config NetstatCollectorConfig matches map[int]string } -func (m *NetstatCollector) Init(config []byte) error { +func (m *NetstatCollector) Init(config json.RawMessage) error { m.name = "NetstatCollector" m.setup() + m.meta = map[string]string{"source" : m.name, "group" : "Memory"} m.matches = map[int]string{ 1: "bytes_in", 9: "bytes_out", @@ -45,7 +46,7 @@ func (m *NetstatCollector) Init(config []byte) error { return nil } -func (m *NetstatCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { +func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric) { data, err := ioutil.ReadFile(string(NETSTATFILE)) if err != nil { log.Print(err.Error()) @@ -72,9 +73,15 @@ func (m *NetstatCollector) Read(interval time.Duration, out *[]lp.MutableMetric) for i, name := range m.matches { v, err := strconv.ParseInt(f[i], 10, 0) if err == nil { - y, err := lp.New(name, tags, map[string]interface{}{"value": int(float64(v) * 1.0e-3)}, time.Now()) + y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": int(float64(v) * 1.0e-3)}, time.Now()) if err == nil { - *out = append(*out, y) + switch { + case strings.Contains(name, "byte"): + y.AddMeta("unit", "Byte") + case strings.Contains(name, "pkt"): + y.AddMeta("unit", "Packets") + } + output <- y } } } diff --git a/collectors/nvidiaMetric.go b/collectors/nvidiaMetric.go index 4597610..aad3129 100644 --- a/collectors/nvidiaMetric.go +++ b/collectors/nvidiaMetric.go @@ -5,7 +5,7 @@ import ( "errors" "fmt" "github.com/NVIDIA/go-nvml/pkg/nvml" - lp "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "log" "time" ) @@ -16,7 +16,7 @@ type NvidiaCollectorConfig struct { } type NvidiaCollector struct { - MetricCollector + metricCollector num_gpus int config NvidiaCollectorConfig } @@ -28,10 +28,11 @@ func (m *NvidiaCollector) CatchPanic() { } } -func (m *NvidiaCollector) Init(config []byte) error { +func (m *NvidiaCollector) Init(config json.RawMessage) error { var err error m.name = "NvidiaCollector" m.setup() + m.meta = map[string]string{"source" : m.name, "group" : "Nvidia"} if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { @@ -54,7 +55,7 @@ func (m *NvidiaCollector) Init(config []byte) error { return nil } -func (m *NvidiaCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { +func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) { if !m.init { return } @@ -73,14 +74,14 @@ func (m *NvidiaCollector) Read(interval time.Duration, out *[]lp.MutableMetric) util, ret := nvml.DeviceGetUtilizationRates(device) if ret == nvml.SUCCESS { _, skip = stringArrayContains(m.config.ExcludeMetrics, "util") - y, err := lp.New("util", tags, map[string]interface{}{"value": float64(util.Gpu)}, time.Now()) + y, err := lp.New("util", tags, m.meta, map[string]interface{}{"value": float64(util.Gpu)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } _, skip = stringArrayContains(m.config.ExcludeMetrics, "mem_util") - y, err = lp.New("mem_util", tags, map[string]interface{}{"value": float64(util.Memory)}, time.Now()) + y, err = lp.New("mem_util", tags, m.meta, map[string]interface{}{"value": float64(util.Memory)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } @@ -88,174 +89,177 @@ func (m *NvidiaCollector) Read(interval time.Duration, out *[]lp.MutableMetric) if ret == nvml.SUCCESS { t := float64(meminfo.Total) / (1024 * 1024) _, skip = stringArrayContains(m.config.ExcludeMetrics, "mem_total") - y, err := lp.New("mem_total", tags, map[string]interface{}{"value": t}, time.Now()) + y, err := lp.New("mem_total", tags, m.meta, map[string]interface{}{"value": t}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + y.AddMeta("unit", "MByte") + output <- y } f := float64(meminfo.Used) / (1024 * 1024) _, skip = stringArrayContains(m.config.ExcludeMetrics, "fb_memory") - y, err = lp.New("fb_memory", tags, map[string]interface{}{"value": f}, time.Now()) + y, err = lp.New("fb_memory", tags, m.meta, map[string]interface{}{"value": f}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + y.AddMeta("unit", "MByte") + output <- y } } temp, ret := nvml.DeviceGetTemperature(device, nvml.TEMPERATURE_GPU) if ret == nvml.SUCCESS { _, skip = stringArrayContains(m.config.ExcludeMetrics, "temp") - y, err := lp.New("temp", tags, map[string]interface{}{"value": float64(temp)}, time.Now()) + y, err := lp.New("temp", tags, m.meta, map[string]interface{}{"value": float64(temp)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + y.AddMeta("unit", "degC") + output <- y } } fan, ret := nvml.DeviceGetFanSpeed(device) if ret == nvml.SUCCESS { _, skip = stringArrayContains(m.config.ExcludeMetrics, "fan") - y, err := lp.New("fan", tags, map[string]interface{}{"value": float64(fan)}, time.Now()) + y, err := lp.New("fan", tags, m.meta, map[string]interface{}{"value": float64(fan)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } _, ecc_pend, ret := nvml.DeviceGetEccMode(device) if ret == nvml.SUCCESS { - var y lp.MutableMetric + var y lp.CCMetric var err error switch ecc_pend { case nvml.FEATURE_DISABLED: - y, err = lp.New("ecc_mode", tags, map[string]interface{}{"value": string("OFF")}, time.Now()) + y, err = lp.New("ecc_mode", tags, m.meta, map[string]interface{}{"value": string("OFF")}, time.Now()) case nvml.FEATURE_ENABLED: - y, err = lp.New("ecc_mode", tags, map[string]interface{}{"value": string("ON")}, time.Now()) + y, err = lp.New("ecc_mode", tags, m.meta, map[string]interface{}{"value": string("ON")}, time.Now()) default: - y, err = lp.New("ecc_mode", tags, map[string]interface{}{"value": string("UNKNOWN")}, time.Now()) + y, err = lp.New("ecc_mode", tags, m.meta, map[string]interface{}{"value": string("UNKNOWN")}, time.Now()) } _, skip = stringArrayContains(m.config.ExcludeMetrics, "ecc_mode") if err == nil && !skip { - *out = append(*out, y) + output <- y } } else if ret == nvml.ERROR_NOT_SUPPORTED { _, skip = stringArrayContains(m.config.ExcludeMetrics, "ecc_mode") - y, err := lp.New("ecc_mode", tags, map[string]interface{}{"value": string("N/A")}, time.Now()) + y, err := lp.New("ecc_mode", tags, m.meta, map[string]interface{}{"value": string("N/A")}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } pstate, ret := nvml.DeviceGetPerformanceState(device) if ret == nvml.SUCCESS { _, skip = stringArrayContains(m.config.ExcludeMetrics, "perf_state") - y, err := lp.New("perf_state", tags, map[string]interface{}{"value": fmt.Sprintf("P%d", int(pstate))}, time.Now()) + y, err := lp.New("perf_state", tags, m.meta, map[string]interface{}{"value": fmt.Sprintf("P%d", int(pstate))}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } power, ret := nvml.DeviceGetPowerUsage(device) if ret == nvml.SUCCESS { _, skip = stringArrayContains(m.config.ExcludeMetrics, "power_usage_report") - y, err := lp.New("power_usage_report", tags, map[string]interface{}{"value": float64(power) / 1000}, time.Now()) + y, err := lp.New("power_usage_report", tags, m.meta, map[string]interface{}{"value": float64(power) / 1000}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } gclk, ret := nvml.DeviceGetClockInfo(device, nvml.CLOCK_GRAPHICS) if ret == nvml.SUCCESS { _, skip = stringArrayContains(m.config.ExcludeMetrics, "graphics_clock_report") - y, err := lp.New("graphics_clock_report", tags, map[string]interface{}{"value": float64(gclk)}, time.Now()) + y, err := lp.New("graphics_clock_report", tags, m.meta, map[string]interface{}{"value": float64(gclk)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } smclk, ret := nvml.DeviceGetClockInfo(device, nvml.CLOCK_SM) if ret == nvml.SUCCESS { _, skip = stringArrayContains(m.config.ExcludeMetrics, "sm_clock_report") - y, err := lp.New("sm_clock_report", tags, map[string]interface{}{"value": float64(smclk)}, time.Now()) + y, err := lp.New("sm_clock_report", tags, m.meta, map[string]interface{}{"value": float64(smclk)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } memclk, ret := nvml.DeviceGetClockInfo(device, nvml.CLOCK_MEM) if ret == nvml.SUCCESS { _, skip = stringArrayContains(m.config.ExcludeMetrics, "mem_clock_report") - y, err := lp.New("mem_clock_report", tags, map[string]interface{}{"value": float64(memclk)}, time.Now()) + y, err := lp.New("mem_clock_report", tags, m.meta, map[string]interface{}{"value": float64(memclk)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } max_gclk, ret := nvml.DeviceGetMaxClockInfo(device, nvml.CLOCK_GRAPHICS) if ret == nvml.SUCCESS { _, skip = stringArrayContains(m.config.ExcludeMetrics, "max_graphics_clock") - y, err := lp.New("max_graphics_clock", tags, map[string]interface{}{"value": float64(max_gclk)}, time.Now()) + y, err := lp.New("max_graphics_clock", tags, m.meta, map[string]interface{}{"value": float64(max_gclk)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } max_smclk, ret := nvml.DeviceGetClockInfo(device, nvml.CLOCK_SM) if ret == nvml.SUCCESS { _, skip = stringArrayContains(m.config.ExcludeMetrics, "max_sm_clock") - y, err := lp.New("max_sm_clock", tags, map[string]interface{}{"value": float64(max_smclk)}, time.Now()) + y, err := lp.New("max_sm_clock", tags, m.meta, map[string]interface{}{"value": float64(max_smclk)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } max_memclk, ret := nvml.DeviceGetClockInfo(device, nvml.CLOCK_MEM) if ret == nvml.SUCCESS { _, skip = stringArrayContains(m.config.ExcludeMetrics, "max_mem_clock") - y, err := lp.New("max_mem_clock", tags, map[string]interface{}{"value": float64(max_memclk)}, time.Now()) + y, err := lp.New("max_mem_clock", tags, m.meta, map[string]interface{}{"value": float64(max_memclk)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } ecc_db, ret := nvml.DeviceGetTotalEccErrors(device, 1, 1) if ret == nvml.SUCCESS { _, skip = stringArrayContains(m.config.ExcludeMetrics, "ecc_db_error") - y, err := lp.New("ecc_db_error", tags, map[string]interface{}{"value": float64(ecc_db)}, time.Now()) + y, err := lp.New("ecc_db_error", tags, m.meta, map[string]interface{}{"value": float64(ecc_db)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } ecc_sb, ret := nvml.DeviceGetTotalEccErrors(device, 0, 1) if ret == nvml.SUCCESS { _, skip = stringArrayContains(m.config.ExcludeMetrics, "ecc_sb_error") - y, err := lp.New("ecc_sb_error", tags, map[string]interface{}{"value": float64(ecc_sb)}, time.Now()) + y, err := lp.New("ecc_sb_error", tags, m.meta, map[string]interface{}{"value": float64(ecc_sb)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } pwr_limit, ret := nvml.DeviceGetPowerManagementLimit(device) if ret == nvml.SUCCESS { _, skip = stringArrayContains(m.config.ExcludeMetrics, "power_man_limit") - y, err := lp.New("power_man_limit", tags, map[string]interface{}{"value": float64(pwr_limit)}, time.Now()) + y, err := lp.New("power_man_limit", tags, m.meta, map[string]interface{}{"value": float64(pwr_limit)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } enc_util, _, ret := nvml.DeviceGetEncoderUtilization(device) if ret == nvml.SUCCESS { _, skip = stringArrayContains(m.config.ExcludeMetrics, "encoder_util") - y, err := lp.New("encoder_util", tags, map[string]interface{}{"value": float64(enc_util)}, time.Now()) + y, err := lp.New("encoder_util", tags, m.meta, map[string]interface{}{"value": float64(enc_util)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } dec_util, _, ret := nvml.DeviceGetDecoderUtilization(device) if ret == nvml.SUCCESS { _, skip = stringArrayContains(m.config.ExcludeMetrics, "decoder_util") - y, err := lp.New("decoder_util", tags, map[string]interface{}{"value": float64(dec_util)}, time.Now()) + y, err := lp.New("decoder_util", tags, m.meta, map[string]interface{}{"value": float64(dec_util)}, time.Now()) if err == nil && !skip { - *out = append(*out, y) + output <- y } } } diff --git a/collectors/tempMetric.go b/collectors/tempMetric.go index 3665025..4aba391 100644 --- a/collectors/tempMetric.go +++ b/collectors/tempMetric.go @@ -3,13 +3,14 @@ package collectors import ( "encoding/json" "fmt" - lp "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "io/ioutil" "os" "path/filepath" "strconv" "strings" "time" + "log" ) const HWMON_PATH = `/sys/class/hwmon` @@ -20,20 +21,21 @@ type TempCollectorConfig struct { } type TempCollector struct { - MetricCollector + metricCollector config TempCollectorConfig } -func (m *TempCollector) Init(config []byte) error { +func (m *TempCollector) Init(config json.RawMessage) error { m.name = "TempCollector" m.setup() - m.init = true + m.meta = map[string]string{"source" : m.name, "group" : "IPMI", "unit": "degC"} if len(config) > 0 { err := json.Unmarshal(config, &m.config) if err != nil { return err } } + m.init = true return nil } @@ -73,7 +75,7 @@ func get_hwmon_sensors() (map[string]map[string]string, error) { return sensors, nil } -func (m *TempCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { +func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) { sensors, err := get_hwmon_sensors() if err != nil { @@ -88,15 +90,20 @@ func (m *TempCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { break } } + mname := strings.Replace(name, " ", "_", -1) + if !strings.Contains(mname, "temp") { + mname = fmt.Sprintf("temp_%s", mname) + } buffer, err := ioutil.ReadFile(string(file)) if err != nil { continue } x, err := strconv.ParseInt(strings.Replace(string(buffer), "\n", "", -1), 0, 64) if err == nil { - y, err := lp.New(strings.ToLower(name), tags, map[string]interface{}{"value": float64(x) / 1000}, time.Now()) + y, err := lp.New(strings.ToLower(mname), tags, m.meta, map[string]interface{}{"value": int(float64(x) / 1000)}, time.Now()) if err == nil { - *out = append(*out, y) + log.Print("[", m.name, "] ",y) + output <- y } } } diff --git a/collectors/topprocsMetric.go b/collectors/topprocsMetric.go index a1bf989..df25b6f 100644 --- a/collectors/topprocsMetric.go +++ b/collectors/topprocsMetric.go @@ -4,7 +4,7 @@ import ( "encoding/json" "errors" "fmt" - lp "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "log" "os/exec" "strings" @@ -19,15 +19,16 @@ type TopProcsCollectorConfig struct { } type TopProcsCollector struct { - MetricCollector + metricCollector tags map[string]string config TopProcsCollectorConfig } -func (m *TopProcsCollector) Init(config []byte) error { +func (m *TopProcsCollector) Init(config json.RawMessage) error { var err error m.name = "TopProcsCollector" m.tags = map[string]string{"type": "node"} + m.meta = map[string]string{"source" : m.name, "group" : "TopProcs"} if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { @@ -50,7 +51,7 @@ func (m *TopProcsCollector) Init(config []byte) error { return nil } -func (m *TopProcsCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { +func (m *TopProcsCollector) Read(interval time.Duration, output chan lp.CCMetric) { if !m.init { return } @@ -65,9 +66,9 @@ func (m *TopProcsCollector) Read(interval time.Duration, out *[]lp.MutableMetric lines := strings.Split(string(stdout), "\n") for i := 1; i < m.config.num_procs+1; i++ { name := fmt.Sprintf("topproc%d", i) - y, err := lp.New(name, m.tags, map[string]interface{}{"value": string(lines[i])}, time.Now()) + y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": string(lines[i])}, time.Now()) if err == nil { - *out = append(*out, y) + output <- y } } } diff --git a/config.json b/config.json index 4a7fd89..52f9df1 100644 --- a/config.json +++ b/config.json @@ -1,36 +1,8 @@ { - "sink": { - "user": "testuser", - "password": "testpass", - "host": "127.0.0.1", - "port": "9090", - "database": "testdb", - "organization": "testorg", - "type": "stdout" - }, - "interval": 3, - "duration": 1, - "collectors": [ - "tempstat" - ], - "default_tags": { - "cluster": "testcluster" - }, - "receiver": { - "type": "none" - }, - "collect_config": { - "tempstat": { - "tag_override": { - "hwmon0" : { - "type" : "socket", - "type-id" : "0" - }, - "hwmon1" : { - "type" : "socket", - "type-id" : "1" - } - } - } - } + "sinks": "sinks.json", + "collectors" : "collectors.json", + "receivers" : "receivers.json", + "router" : "router.json", + "interval": 10, + "duration": 1 } diff --git a/go.mod b/go.mod index 903ea80..be384b6 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,11 @@ module github.com/ClusterCockpit/cc-metric-collector go 1.16 require ( - github.com/NVIDIA/go-nvml v0.11.1-0 // indirect + github.com/NVIDIA/go-nvml v0.11.1-0 github.com/influxdata/influxdb-client-go/v2 v2.2.2 github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 github.com/nats-io/nats.go v1.10.0 github.com/nats-io/nkeys v0.1.4 // indirect github.com/prometheus/client_golang v1.10.0 // indirect + gopkg.in/Knetic/govaluate.v2 v2.3.0 ) diff --git a/go.sum b/go.sum index 4bd7c8a..a6f98d7 100644 --- a/go.sum +++ b/go.sum @@ -421,6 +421,8 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +gopkg.in/Knetic/govaluate.v2 v2.3.0 h1:naJVc9CZlWA8rC8f5mvECJD7jreTrn7FvGXjBthkHJQ= +gopkg.in/Knetic/govaluate.v2 v2.3.0/go.mod h1:NW0gr10J8s7aNghEg6uhdxiEaBvc0+8VgJjVViHUKp4= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/ccMetric/ccMetric.go b/internal/ccMetric/ccMetric.go new file mode 100644 index 0000000..31c813c --- /dev/null +++ b/internal/ccMetric/ccMetric.go @@ -0,0 +1,376 @@ +package ccmetric + +import ( + lp "github.com/influxdata/line-protocol" // MIT license + "time" + "sort" + "fmt" +) + +// Most functions are derived from github.com/influxdata/line-protocol/metric.go +// The metric type is extended with an extra meta information list re-using the Tag +// type. + +type ccMetric struct { + name string + tags []*lp.Tag + fields []*lp.Field + tm time.Time + meta []*lp.Tag +} + +type CCMetric interface { + lp.MutableMetric + AddMeta(key, value string) + MetaList() []*lp.Tag +} + +func (m *ccMetric) Meta() map[string]string { + meta := make(map[string]string, len(m.meta)) + for _, m := range m.meta { + meta[m.Key] = m.Value + } + return meta +} + +func (m *ccMetric) MetaList() []*lp.Tag { + return m.meta +} + +func (m *ccMetric) String() string { + return fmt.Sprintf("%s %v %v %v %d", m.name, m.Tags(), m.Meta(), m.Fields(), m.tm.UnixNano()) +} + +func (m *ccMetric) Name() string { + return m.name +} + +func (m *ccMetric) Tags() map[string]string { + tags := make(map[string]string, len(m.tags)) + for _, tag := range m.tags { + tags[tag.Key] = tag.Value + } + return tags +} + +func (m *ccMetric) TagList() []*lp.Tag { + return m.tags +} + +func (m *ccMetric) Fields() map[string]interface{} { + fields := make(map[string]interface{}, len(m.fields)) + for _, field := range m.fields { + fields[field.Key] = field.Value + } + + return fields +} + +func (m *ccMetric) FieldList() []*lp.Field { + return m.fields +} + +func (m *ccMetric) Time() time.Time { + return m.tm +} + +func (m *ccMetric) SetTime(t time.Time) { + m.tm = t +} + +func (m *ccMetric) HasTag(key string) bool { + for _, tag := range m.tags { + if tag.Key == key { + return true + } + } + return false +} + +func (m *ccMetric) GetTag(key string) (string, bool) { + for _, tag := range m.tags { + if tag.Key == key { + return tag.Value, true + } + } + return "", false +} + +func (m *ccMetric) RemoveTag(key string) { + for i, tag := range m.tags { + if tag.Key == key { + copy(m.tags[i:], m.tags[i+1:]) + m.tags[len(m.tags)-1] = nil + m.tags = m.tags[:len(m.tags)-1] + return + } + } +} + +func (m *ccMetric) AddTag(key, value string) { + for i, tag := range m.tags { + if key > tag.Key { + continue + } + + if key == tag.Key { + tag.Value = value + return + } + + m.tags = append(m.tags, nil) + copy(m.tags[i+1:], m.tags[i:]) + m.tags[i] = &lp.Tag{Key: key, Value: value} + return + } + + m.tags = append(m.tags, &lp.Tag{Key: key, Value: value}) +} + +func (m *ccMetric) HasMeta(key string) bool { + for _, tag := range m.meta { + if tag.Key == key { + return true + } + } + return false +} + +func (m *ccMetric) GetMeta(key string) (string, bool) { + for _, tag := range m.meta { + if tag.Key == key { + return tag.Value, true + } + } + return "", false +} + +func (m *ccMetric) RemoveMeta(key string) { + for i, tag := range m.meta { + if tag.Key == key { + copy(m.meta[i:], m.meta[i+1:]) + m.meta[len(m.meta)-1] = nil + m.meta = m.meta[:len(m.meta)-1] + return + } + } +} + +func (m *ccMetric) AddMeta(key, value string) { + for i, tag := range m.meta { + if key > tag.Key { + continue + } + + if key == tag.Key { + tag.Value = value + return + } + + m.meta = append(m.meta, nil) + copy(m.meta[i+1:], m.meta[i:]) + m.meta[i] = &lp.Tag{Key: key, Value: value} + return + } + + m.meta = append(m.meta, &lp.Tag{Key: key, Value: value}) +} + +func (m *ccMetric) AddField(key string, value interface{}) { + for i, field := range m.fields { + if key == field.Key { + m.fields[i] = &lp.Field{Key: key, Value: convertField(value)} + return + } + } + m.fields = append(m.fields, &lp.Field{Key: key, Value: convertField(value)}) +} + + + + +func New( + name string, + tags map[string]string, + meta map[string]string, + fields map[string]interface{}, + tm time.Time, +) (CCMetric, error) { + m := &ccMetric{ + name: name, + tags: nil, + fields: nil, + tm: tm, + meta: nil, + } + + if len(tags) > 0 { + m.tags = make([]*lp.Tag, 0, len(tags)) + for k, v := range tags { + m.tags = append(m.tags, + &lp.Tag{Key: k, Value: v}) + } + sort.Slice(m.tags, func(i, j int) bool { return m.tags[i].Key < m.tags[j].Key }) + } + + if len(meta) > 0 { + m.meta = make([]*lp.Tag, 0, len(meta)) + for k, v := range meta { + m.meta = append(m.meta, + &lp.Tag{Key: k, Value: v}) + } + sort.Slice(m.meta, func(i, j int) bool { return m.meta[i].Key < m.meta[j].Key }) + } + + if len(fields) > 0 { + m.fields = make([]*lp.Field, 0, len(fields)) + for k, v := range fields { + v := convertField(v) + if v == nil { + continue + } + m.AddField(k, v) + } + } + + return m, nil +} + +func FromMetric(other CCMetric) CCMetric { + m := &ccMetric{ + name: other.Name(), + tags: make([]*lp.Tag, len(other.TagList())), + fields: make([]*lp.Field, len(other.FieldList())), + meta: make([]*lp.Tag, len(other.MetaList())), + tm: other.Time(), + } + + for i, tag := range other.TagList() { + m.tags[i] = &lp.Tag{Key: tag.Key, Value: tag.Value} + } + for i, s := range other.MetaList() { + m.meta[i] = &lp.Tag{Key: s.Key, Value: s.Value} + } + + for i, field := range other.FieldList() { + m.fields[i] = &lp.Field{Key: field.Key, Value: field.Value} + } + return m +} + +func FromInfluxMetric(other lp.Metric) CCMetric { + m := &ccMetric{ + name: other.Name(), + tags: make([]*lp.Tag, len(other.TagList())), + fields: make([]*lp.Field, len(other.FieldList())), + meta: make([]*lp.Tag, 0), + tm: other.Time(), + } + + for i, tag := range other.TagList() { + m.tags[i] = &lp.Tag{Key: tag.Key, Value: tag.Value} + } + + for i, field := range other.FieldList() { + m.fields[i] = &lp.Field{Key: field.Key, Value: field.Value} + } + return m +} + +func convertField(v interface{}) interface{} { + switch v := v.(type) { + case float64: + return v + case int64: + return v + case string: + return v + case bool: + return v + case int: + return int64(v) + case uint: + return uint64(v) + case uint64: + return uint64(v) + case []byte: + return string(v) + case int32: + return int64(v) + case int16: + return int64(v) + case int8: + return int64(v) + case uint32: + return uint64(v) + case uint16: + return uint64(v) + case uint8: + return uint64(v) + case float32: + return float64(v) + case *float64: + if v != nil { + return *v + } + case *int64: + if v != nil { + return *v + } + case *string: + if v != nil { + return *v + } + case *bool: + if v != nil { + return *v + } + case *int: + if v != nil { + return int64(*v) + } + case *uint: + if v != nil { + return uint64(*v) + } + case *uint64: + if v != nil { + return uint64(*v) + } + case *[]byte: + if v != nil { + return string(*v) + } + case *int32: + if v != nil { + return int64(*v) + } + case *int16: + if v != nil { + return int64(*v) + } + case *int8: + if v != nil { + return int64(*v) + } + case *uint32: + if v != nil { + return uint64(*v) + } + case *uint16: + if v != nil { + return uint64(*v) + } + case *uint8: + if v != nil { + return uint64(*v) + } + case *float32: + if v != nil { + return float64(*v) + } + default: + return nil + } + return nil +} diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go new file mode 100644 index 0000000..7c4d6e3 --- /dev/null +++ b/internal/metricRouter/metricRouter.go @@ -0,0 +1,118 @@ +package metricRouter + +import ( + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + "sync" + "log" + "encoding/json" + "os" +) + +type metricRounterTagConfig struct { + Key string `json:"key"` + Value string `json:"value"` + Condition string `json:"if"` +} + +type metricRouterConfig struct { + AddTags []metricRounterTagConfig `json:"add_tags"` + DelTags []metricRounterTagConfig `json:"delete_tags"` + IntervalStamp bool `json:"interval_timestamp"` +} + +type metricRouter struct { + inputs []chan lp.CCMetric + outputs []chan lp.CCMetric + done chan bool + wg *sync.WaitGroup + config metricRouterConfig +} + +type MetricRouter interface { + Init(routerDone chan bool, wg *sync.WaitGroup) error + AddInput(input chan lp.CCMetric) + AddOutput(output chan lp.CCMetric) + ReadConfig(filename string) error + Start() + Close() +} + + +func (r *metricRouter) Init(routerDone chan bool, wg *sync.WaitGroup) error { + r.inputs = make([]chan lp.CCMetric, 0) + r.outputs = make([]chan lp.CCMetric, 0) + r.done = routerDone + r.wg = wg + return nil +} + +func (r *metricRouter) ReadConfig(filename string) error { + configFile, err := os.Open(filename) + if err != nil { + log.Print(err.Error()) + return err + } + defer configFile.Close() + jsonParser := json.NewDecoder(configFile) + err = jsonParser.Decode(&r.config) + if err != nil { + log.Print(err.Error()) + return err + } + return nil +} + +func (r *metricRouter) Start() { + r.wg.Add(1) + go func() { + for { +RouterLoop: + select { + case <- r.done: + log.Print("[MetricRouter] DONE\n") + r.wg.Done() + break RouterLoop + default: + for _, c := range r.inputs { +RouterInputLoop: + select { + case <- r.done: + log.Print("[MetricRouter] DONE\n") + r.wg.Done() + break RouterInputLoop + case p := <- c: + log.Print("[MetricRouter] FORWARD ",p) + for _, o := range r.outputs { + o <- p + } + default: + } + } + } + } + log.Print("[MetricRouter] EXIT\n") + }() + log.Print("[MetricRouter] STARTED\n") +} + +func (r *metricRouter) AddInput(input chan lp.CCMetric) { + r.inputs = append(r.inputs, input) +} + +func (r *metricRouter) AddOutput(output chan lp.CCMetric) { + r.outputs = append(r.outputs, output) +} + +func (r *metricRouter) Close() { + r.done <- true + log.Print("[MetricRouter] CLOSE\n") +} + +func New(done chan bool, wg *sync.WaitGroup) (MetricRouter, error) { + r := &metricRouter{} + err := r.Init(done, wg) + if err != nil { + return nil, err + } + return r, err +} diff --git a/metric-collector.go b/metric-collector.go index f6c8f5c..680ba68 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -7,58 +7,57 @@ import ( "github.com/ClusterCockpit/cc-metric-collector/collectors" "github.com/ClusterCockpit/cc-metric-collector/receivers" "github.com/ClusterCockpit/cc-metric-collector/sinks" - lp "github.com/influxdata/line-protocol" "log" "os" "os/signal" - "strings" +// "strings" "sync" "time" + mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) // List of provided collectors. Which collector should be run can be // configured at 'collectors' list in 'config.json'. -var Collectors = map[string]collectors.MetricGetter{ - "likwid": &collectors.LikwidCollector{}, - "loadavg": &collectors.LoadavgCollector{}, - "memstat": &collectors.MemstatCollector{}, - "netstat": &collectors.NetstatCollector{}, - "ibstat": &collectors.InfinibandCollector{}, - "lustrestat": &collectors.LustreCollector{}, - "cpustat": &collectors.CpustatCollector{}, - "topprocs": &collectors.TopProcsCollector{}, - "nvidia": &collectors.NvidiaCollector{}, - "customcmd": &collectors.CustomCmdCollector{}, - "diskstat": &collectors.DiskstatCollector{}, - "tempstat": &collectors.TempCollector{}, - "ipmistat": &collectors.IpmiCollector{}, -} +//var Collectors = map[string]collectors.MetricCollector{ +// "likwid": &collectors.LikwidCollector{}, +// "loadavg": &collectors.LoadavgCollector{}, +// "memstat": &collectors.MemstatCollector{}, +// "netstat": &collectors.NetstatCollector{}, +// "ibstat": &collectors.InfinibandCollector{}, +// "lustrestat": &collectors.LustreCollector{}, +// "cpustat": &collectors.CpustatCollector{}, +// "topprocs": &collectors.TopProcsCollector{}, +// "nvidia": &collectors.NvidiaCollector{}, +// "customcmd": &collectors.CustomCmdCollector{}, +// "diskstat": &collectors.DiskstatCollector{}, +// "tempstat": &collectors.TempCollector{}, +// "ipmistat": &collectors.IpmiCollector{}, +//} -var Sinks = map[string]sinks.SinkFuncs{ - "influxdb": &sinks.InfluxSink{}, - "stdout": &sinks.StdoutSink{}, - "nats": &sinks.NatsSink{}, - "http": &sinks.HttpSink{}, -} +//var Sinks = map[string]sinks.Sink{ +// "influxdb": &sinks.InfluxSink{}, +// "stdout": &sinks.StdoutSink{}, +// "nats": &sinks.NatsSink{}, +// "http": &sinks.HttpSink{}, +//} -var Receivers = map[string]receivers.ReceiverFuncs{ - "nats": &receivers.NatsReceiver{}, -} +//var Receivers = map[string]receivers.ReceiverFuncs{ +// "nats": &receivers.NatsReceiver{}, +//} -// Structure of the configuration file -type GlobalConfig struct { - Sink sinks.SinkConfig `json:"sink"` - Interval int `json:"interval"` +type CentralConfigFile struct { + Interval int `json:"interval"` Duration int `json:"duration"` - Collectors []string `json:"collectors"` - Receiver receivers.ReceiverConfig `json:"receiver"` - DefTags map[string]string `json:"default_tags"` - CollectConfigs map[string]json.RawMessage `json:"collect_config"` + Pidfile string `json:"pidfile", omitempty` + CollectorConfigFile string `json:"collectors"` + RouterConfigFile string `json:"router"` + SinkConfigFile string `json:"sinks"` + ReceiverConfigFile string `json:"receivers", omitempty` } -// Load JSON configuration file -func LoadConfiguration(file string, config *GlobalConfig) error { - configFile, err := os.Open(file) +func LoadCentralConfiguration(file string, config *CentralConfigFile) error { + configFile, err := os.Open(file) defer configFile.Close() if err != nil { fmt.Println(err.Error()) @@ -69,6 +68,59 @@ func LoadConfiguration(file string, config *GlobalConfig) error { return err } +type RuntimeConfig struct { + Hostname string + Interval time.Duration + Duration time.Duration + CliArgs map[string]string + ConfigFile CentralConfigFile + + Router mr.MetricRouter + RouterDone chan bool + CollectManager collectors.CollectorManager + CollectManagerDone chan bool + SinkManager sinks.SinkManager + SinkManagerDone chan bool + ReceiveManager receivers.ReceiveManager + ReceiveManagerDone chan bool + + Channels []chan lp.CCMetric + Sync sync.WaitGroup +} + +func prepare_runcfg() RuntimeConfig { + r := RuntimeConfig{} + r.Router = nil + r.CollectManager = nil + r.SinkManager = nil + r.ReceiveManager = nil + return r +} + +//// Structure of the configuration file +//type GlobalConfig struct { +// Sink sinks.SinkConfig `json:"sink"` +// Interval int `json:"interval"` +// Duration int `json:"duration"` +// Collectors []string `json:"collectors"` +// Receiver receivers.ReceiverConfig `json:"receiver"` +// DefTags map[string]string `json:"default_tags"` +// CollectConfigs map[string]json.RawMessage `json:"collect_config"` +//} + +//// Load JSON configuration file +//func LoadConfiguration(file string, config *GlobalConfig) error { +// configFile, err := os.Open(file) +// defer configFile.Close() +// if err != nil { +// fmt.Println(err.Error()) +// return err +// } +// jsonParser := json.NewDecoder(configFile) +// err = jsonParser.Decode(config) +// return err +//} + func ReadCli() map[string]string { var m map[string]string cfg := flag.String("config", "./config.json", "Path to configuration file") @@ -88,226 +140,297 @@ func ReadCli() map[string]string { return m } -func SetLogging(logfile string) error { - var file *os.File - var err error - if logfile != "stderr" { - file, err = os.OpenFile(logfile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) - if err != nil { - log.Fatal(err) - return err - } - } else { - file = os.Stderr - } - log.SetOutput(file) - return nil -} +//func SetLogging(logfile string) error { +// var file *os.File +// var err error +// if logfile != "stderr" { +// file, err = os.OpenFile(logfile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) +// if err != nil { +// log.Fatal(err) +// return err +// } +// } else { +// file = os.Stderr +// } +// log.SetOutput(file) +// return nil +//} -func CreatePidfile(pidfile string) error { - file, err := os.OpenFile(pidfile, os.O_CREATE|os.O_RDWR, 0600) - if err != nil { - log.Print(err) - return err - } - file.Write([]byte(fmt.Sprintf("%d", os.Getpid()))) - file.Close() - return nil -} +//func CreatePidfile(pidfile string) error { +// file, err := os.OpenFile(pidfile, os.O_CREATE|os.O_RDWR, 0600) +// if err != nil { +// log.Print(err) +// return err +// } +// file.Write([]byte(fmt.Sprintf("%d", os.Getpid()))) +// file.Close() +// return nil +//} -func RemovePidfile(pidfile string) error { - info, err := os.Stat(pidfile) - if !os.IsNotExist(err) && !info.IsDir() { - os.Remove(pidfile) - } - return nil -} +//func RemovePidfile(pidfile string) error { +// info, err := os.Stat(pidfile) +// if !os.IsNotExist(err) && !info.IsDir() { +// os.Remove(pidfile) +// } +// return nil +//} // General shutdown function that gets executed in case of interrupt or graceful shutdown -func shutdown(wg *sync.WaitGroup, collectors []string, sink sinks.SinkFuncs, recv receivers.ReceiverFuncs, pidfile string) { +func shutdown(config *RuntimeConfig) { log.Print("Shutdown...") - for _, c := range collectors { - col := Collectors[c] - log.Print("Stop ", col.Name()) - col.Close() - } - time.Sleep(1 * time.Second) - if recv != nil { - recv.Close() - } - sink.Close() - RemovePidfile(pidfile) - wg.Done() + if config.CollectManager != nil { + log.Print("Shutdown CollectManager...") + config.CollectManager.Close() + } + if config.ReceiveManager != nil { + log.Print("Shutdown ReceiveManager...") + config.ReceiveManager.Close() + } + if config.Router != nil { + log.Print("Shutdown Router...") + config.Router.Close() + } + if config.SinkManager != nil { + log.Print("Shutdown SinkManager...") + config.SinkManager.Close() + } + +// pidfile := config.ConfigFile.Pidfile +// RemovePidfile(pidfile) +// pidfile = config.CliArgs["pidfile"] +// RemovePidfile(pidfile) + config.Sync.Done() } // Register an interrupt handler for Ctrl+C and similar. At signal, // all collectors are closed -func prepare_shutdown(wg *sync.WaitGroup, config *GlobalConfig, sink sinks.SinkFuncs, recv receivers.ReceiverFuncs, pidfile string) { +func prepare_shutdown(config *RuntimeConfig) { sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt) - go func(wg *sync.WaitGroup) { + go func(config *RuntimeConfig) { <-sigs log.Print("Shutdown...") - shutdown(wg, config.Collectors, sink, recv, pidfile) - }(wg) + shutdown(config) + }(config) } func main() { - var config GlobalConfig - var wg sync.WaitGroup - var recv receivers.ReceiverFuncs = nil - var use_recv bool - use_recv = false - wg.Add(1) - host, err := os.Hostname() - if err != nil { - log.Print(err) - return - } - clicfg := ReadCli() - err = CreatePidfile(clicfg["pidfile"]) - err = SetLogging(clicfg["logfile"]) - if err != nil { - log.Print("Error setting up logging system to ", clicfg["logfile"], " on ", host) - return - } + var err error + use_recv := false + + rcfg := prepare_runcfg() + rcfg.CliArgs = ReadCli() // Load and check configuration - err = LoadConfiguration(clicfg["configfile"], &config) + err = LoadCentralConfiguration(rcfg.CliArgs["configfile"], &rcfg.ConfigFile) if err != nil { - log.Print("Error reading configuration file ", clicfg["configfile"]) + log.Print("Error reading configuration file ", rcfg.CliArgs["configfile"]) log.Print(err.Error()) return } - if config.Interval <= 0 || time.Duration(config.Interval)*time.Second <= 0 { + if rcfg.ConfigFile.Interval <= 0 || time.Duration(rcfg.ConfigFile.Interval)*time.Second <= 0 { log.Print("Configuration value 'interval' must be greater than zero") return } - if config.Duration <= 0 { + rcfg.Interval = time.Duration(rcfg.ConfigFile.Interval)*time.Second + if rcfg.ConfigFile.Duration <= 0 || time.Duration(rcfg.ConfigFile.Duration)*time.Second <= 0 { log.Print("Configuration value 'duration' must be greater than zero") return } - if len(config.Collectors) == 0 { - var keys []string - for k := range Collectors { - keys = append(keys, k) - } - log.Print("Configuration value 'collectors' does not contain any collector. Available: ", strings.Join(keys, ", ")) - return - } - for _, name := range config.Collectors { - if _, found := Collectors[name]; !found { - log.Print("Invalid collector '", name, "' in configuration") - return - } - } - if _, found := Sinks[config.Sink.Type]; !found { - log.Print("Invalid sink type '", config.Sink.Type, "' in configuration") - return - } - // Setup sink - sink := Sinks[config.Sink.Type] - err = sink.Init(config.Sink) + rcfg.Duration = time.Duration(rcfg.ConfigFile.Duration)*time.Second + + + rcfg.Hostname, err = os.Hostname() if err != nil { - log.Print(err) + log.Print(err.Error()) return } - // Setup receiver - if len(config.Receiver.Type) > 0 && config.Receiver.Type != "none" { - if _, found := Receivers[config.Receiver.Type]; !found { - log.Print("Invalid receiver type '", config.Receiver.Type, "' in configuration") - return - } else { - recv = Receivers[config.Receiver.Type] - err = recv.Init(config.Receiver, sink) - if err == nil { - use_recv = true - } else { - log.Print(err) - } - } - } - - // Register interrupt handler - prepare_shutdown(&wg, &config, sink, recv, clicfg["pidfile"]) - - // Initialize all collectors - tmp := make([]string, 0) - for _, c := range config.Collectors { - col := Collectors[c] - conf, found := config.CollectConfigs[c] - if !found { - conf = json.RawMessage("") - } - err = col.Init([]byte(conf)) - if err != nil { - log.Print("SKIP ", col.Name(), " (", err.Error(), ")") - } else if !col.Initialized() { - log.Print("SKIP ", col.Name(), " (Not initialized)") - } else { - log.Print("Start ", col.Name()) - tmp = append(tmp, c) - } - } - config.Collectors = tmp - config.DefTags["hostname"] = host - - // Setup up ticker loop - if clicfg["once"] != "true" { - log.Print("Running loop every ", time.Duration(config.Interval)*time.Second) - } else { - log.Print("Running loop only once") - } - ticker := time.NewTicker(time.Duration(config.Interval) * time.Second) - done := make(chan bool) - - // Storage for all node metrics - tmpPoints := make([]lp.MutableMetric, 0) - - // Start receiver +// err = CreatePidfile(rcfg.CliArgs["pidfile"]) +// err = SetLogging(rcfg.CliArgs["logfile"]) +// if err != nil { +// log.Print("Error setting up logging system to ", rcfg.CliArgs["logfile"], " on ", rcfg.Hostname) +// return +// } + + if len(rcfg.ConfigFile.RouterConfigFile) > 0 { + rcfg.RouterDone = make(chan bool) + rcfg.Router, err = mr.New(rcfg.RouterDone, &rcfg.Sync) + if err != nil { + log.Print(err.Error()) + return + } + rcfg.Router.ReadConfig(rcfg.ConfigFile.RouterConfigFile) + } + if len(rcfg.ConfigFile.SinkConfigFile) > 0 { + rcfg.SinkManagerDone = make(chan bool) + rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile) + if err != nil { + log.Print(err.Error()) + return + } +// rcfg.SinkManager.ReadConfig(rcfg.ConfigFile.SinkConfigFile) + RouterToSinksChannel := make(chan lp.CCMetric) + rcfg.SinkManager.AddInput(RouterToSinksChannel) + rcfg.Router.AddOutput(RouterToSinksChannel) + } + if len(rcfg.ConfigFile.CollectorConfigFile) > 0 { +// rcfg.CollectManagerDone = make(chan bool) + rcfg.CollectManager, err = collectors.New(rcfg.Interval, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile) + if err != nil { + log.Print(err.Error()) + return + } +// rcfg.CollectManager.ReadConfig(rcfg.ConfigFile.CollectorConfigFile) +// if err != nil { +// log.Print(err.Error()) +// return +// } + CollectToRouterChannel := make(chan lp.CCMetric) + rcfg.CollectManager.AddOutput(CollectToRouterChannel) + rcfg.Router.AddInput(CollectToRouterChannel) + } + if len(rcfg.ConfigFile.ReceiverConfigFile) > 0 { + rcfg.ReceiveManager, err = receivers.New(&rcfg.Sync, rcfg.ConfigFile.ReceiverConfigFile) + if err != nil { + log.Print(err.Error()) + return + } + ReceiveToRouterChannel := make(chan lp.CCMetric) + rcfg.ReceiveManager.AddOutput(ReceiveToRouterChannel) + rcfg.Router.AddInput(ReceiveToRouterChannel) + use_recv = true + } + prepare_shutdown(&rcfg) + rcfg.Sync.Add(1) + rcfg.Router.Start() + rcfg.SinkManager.Start() + rcfg.CollectManager.Start() if use_recv { - recv.Start() - } + rcfg.ReceiveManager.Start() + } +// if len(config.Collectors) == 0 { +// var keys []string +// for k := range Collectors { +// keys = append(keys, k) +// } +// log.Print("Configuration value 'collectors' does not contain any collector. Available: ", strings.Join(keys, ", ")) +// return +// } +// for _, name := range config.Collectors { +// if _, found := Collectors[name]; !found { +// log.Print("Invalid collector '", name, "' in configuration") +// return +// } +// } +// if _, found := Sinks[config.Sink.Type]; !found { +// log.Print("Invalid sink type '", config.Sink.Type, "' in configuration") +// return +// } +// // Setup sink +// sink := Sinks[config.Sink.Type] +// err = sink.Init(config.Sink) +// if err != nil { +// log.Print(err) +// return +// } +// sinkChannel := make(chan bool) +// mproxy.Init(sinkChannel, &wg) +// // Setup receiver +// if len(config.Receiver.Type) > 0 && config.Receiver.Type != "none" { +// if _, found := Receivers[config.Receiver.Type]; !found { +// log.Print("Invalid receiver type '", config.Receiver.Type, "' in configuration") +// return +// } else { +// recv = Receivers[config.Receiver.Type] +// err = recv.Init(config.Receiver, sink) +// if err == nil { +// use_recv = true +// } else { +// log.Print(err) +// } +// } +// } - go func() { - for { - select { - case <-done: - return - case t := <-ticker.C: +// // Register interrupt handler +// prepare_shutdown(&wg, &config, sink, recv, clicfg["pidfile"]) - // Read all collectors are sort the results in the right - // storage locations - for _, c := range config.Collectors { - col := Collectors[c] - col.Read(time.Duration(config.Duration), &tmpPoints) +// // Initialize all collectors +// tmp := make([]string, 0) +// for _, c := range config.Collectors { +// col := Collectors[c] +// conf, found := config.CollectConfigs[c] +// if !found { +// conf = json.RawMessage("") +// } +// err = col.Init([]byte(conf)) +// if err != nil { +// log.Print("SKIP ", col.Name(), " (", err.Error(), ")") +// } else if !col.Initialized() { +// log.Print("SKIP ", col.Name(), " (Not initialized)") +// } else { +// log.Print("Start ", col.Name()) +// tmp = append(tmp, c) +// } +// } +// config.Collectors = tmp +// config.DefTags["hostname"] = host - for { - if len(tmpPoints) == 0 { - break - } - p := tmpPoints[0] - for k, v := range config.DefTags { - p.AddTag(k, v) - p.SetTime(t) - } - sink.Write(p) - tmpPoints = tmpPoints[1:] - } - } +// // Setup up ticker loop +// if clicfg["once"] != "true" { +// log.Print("Running loop every ", time.Duration(config.Interval)*time.Second) +// } else { +// log.Print("Running loop only once") +// } +// ticker := time.NewTicker(time.Duration(config.Interval) * time.Second) +// done := make(chan bool) - if err := sink.Flush(); err != nil { - log.Printf("sink error: %s\n", err) - } - if clicfg["once"] == "true" { - shutdown(&wg, config.Collectors, sink, recv, clicfg["pidfile"]) - return - } - } - } - }() +// // Storage for all node metrics +// tmpPoints := make([]lp.MutableMetric, 0) + +// // Start receiver +// if use_recv { +// recv.Start() +// } + +// go func() { +// for { +// select { +// case <-done: +// return +// case t := <-ticker.C: + +// // Read all collectors are sort the results in the right +// // storage locations +// for _, c := range config.Collectors { +// col := Collectors[c] +// col.Read(time.Duration(config.Duration), &tmpPoints) + +// for { +// if len(tmpPoints) == 0 { +// break +// } +// p := tmpPoints[0] +// for k, v := range config.DefTags { +// p.AddTag(k, v) +// p.SetTime(t) +// } +// sink.Write(p) +// tmpPoints = tmpPoints[1:] +// } +// } + +// if err := sink.Flush(); err != nil { +// log.Printf("sink error: %s\n", err) +// } +// if clicfg["once"] == "true" { +// shutdown(&wg, config.Collectors, sink, recv, clicfg["pidfile"]) +// return +// } +// } +// } +// }() // Wait until receiving an interrupt - wg.Wait() + rcfg.Sync.Wait() } diff --git a/receivers.json b/receivers.json new file mode 100644 index 0000000..e368fc3 --- /dev/null +++ b/receivers.json @@ -0,0 +1,8 @@ +[ + { + "type": "nats", + "address": "nats://my-url", + "port" : "4222", + "database": "testcluster" + } +] diff --git a/receivers/metricReceiver.go b/receivers/metricReceiver.go index acdc455..b074f97 100644 --- a/receivers/metricReceiver.go +++ b/receivers/metricReceiver.go @@ -2,30 +2,42 @@ package receivers import ( // "time" - s "github.com/ClusterCockpit/cc-metric-collector/sinks" influx "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + ) type ReceiverConfig struct { Addr string `json:"address"` Port string `json:"port"` Database string `json:"database"` + Organization string `json:"organization", omitempty` Type string `json:"type"` } -type Receiver struct { +type receiver struct { name string addr string port string database string organization string - sink s.SinkFuncs + sink chan lp.CCMetric } -type ReceiverFuncs interface { - Init(config ReceiverConfig, sink s.SinkFuncs) error +type Receiver interface { + Init(config ReceiverConfig) error Start() Close() + Name() string + SetSink(sink chan lp.CCMetric) +} + +func (r *receiver) Name() string { + return r.name +} + +func (r *receiver) SetSink(sink chan lp.CCMetric) { + r.sink = sink } func Tags2Map(metric influx.Metric) map[string]string { diff --git a/receivers/natsReceiver.go b/receivers/natsReceiver.go index 9d98f00..e3f037c 100644 --- a/receivers/natsReceiver.go +++ b/receivers/natsReceiver.go @@ -2,56 +2,68 @@ package receivers import ( "errors" - s "github.com/ClusterCockpit/cc-metric-collector/sinks" - lp "github.com/influxdata/line-protocol" + influx "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" nats "github.com/nats-io/nats.go" "log" "time" + "fmt" ) +type NatsReceiverConfig struct { + Addr string `json:"address"` + Port string `json:"port"` + Database string `json:"database"` +} + type NatsReceiver struct { - Receiver + receiver nc *nats.Conn - handler *lp.MetricHandler - parser *lp.Parser + handler *influx.MetricHandler + parser *influx.Parser + meta map[string]string + config ReceiverConfig } var DefaultTime = func() time.Time { return time.Unix(42, 0) } -func (r *NatsReceiver) Init(config ReceiverConfig, sink s.SinkFuncs) error { - if len(config.Addr) == 0 || - len(config.Port) == 0 || - len(config.Database) == 0 { +func (r *NatsReceiver) Init(config ReceiverConfig) error { + r.name = "NatsReceiver" + r.config = config + if len(r.config.Addr) == 0 || + len(r.config.Port) == 0 || + len(r.config.Database) == 0 { return errors.New("Not all configuration variables set required by NatsReceiver") } - r.addr = config.Addr + r.meta = map[string]string{"source" : r.name} + r.addr = r.config.Addr if len(r.addr) == 0 { r.addr = nats.DefaultURL } - r.port = config.Port + r.port = r.config.Port if len(r.port) == 0 { r.port = "4222" } - log.Print("Init NATS Receiver") - nc, err := nats.Connect(r.addr) + log.Print("[NatsReceiver] INIT") + uri := fmt.Sprintf("%s:%s", r.addr, r.port) + nc, err := nats.Connect(uri) if err == nil { - r.database = config.Database - r.sink = sink + r.database = r.config.Database r.nc = nc } else { - log.Print(err) r.nc = nil + return err } - r.handler = lp.NewMetricHandler() - r.parser = lp.NewParser(r.handler) + r.handler = influx.NewMetricHandler() + r.parser = influx.NewParser(r.handler) r.parser.SetTimeFunc(DefaultTime) return err } func (r *NatsReceiver) Start() { - log.Print("Start NATS Receiver") + log.Print("[NatsReceiver] START") r.nc.Subscribe(r.database, r._NatsReceive) } @@ -59,9 +71,13 @@ func (r *NatsReceiver) _NatsReceive(m *nats.Msg) { metrics, err := r.parser.Parse(m.Data) if err == nil { for _, m := range metrics { - y, err := lp.New(m.Name(), Tags2Map(m), Fields2Map(m), m.Time()) - if err == nil { - r.sink.Write(y) + y := lp.FromInfluxMetric(m) + for k, v := range r.meta { + y.AddMeta(k, v) + } + //y, err := lp.New(m.Name(), Tags2Map(m), r.meta, Fields2Map(m), m.Time()) + if r.sink != nil { + r.sink <- y } } } @@ -69,7 +85,8 @@ func (r *NatsReceiver) _NatsReceive(m *nats.Msg) { func (r *NatsReceiver) Close() { if r.nc != nil { - log.Print("Close NATS Receiver") + log.Print("[NatsReceiver] CLOSE") r.nc.Close() } } + diff --git a/receivers/receiveManager.go b/receivers/receiveManager.go new file mode 100644 index 0000000..322862d --- /dev/null +++ b/receivers/receiveManager.go @@ -0,0 +1,156 @@ +package receivers + +import ( + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + "sync" + "log" + "os" + "encoding/json" +) + +var AvailableReceivers = map[string]Receiver{ + "nats": &NatsReceiver{}, +} + + +type receiveManager struct { + inputs []Receiver + output chan lp.CCMetric + done chan bool + wg *sync.WaitGroup + config []ReceiverConfig +} + +type ReceiveManager interface { + Init(wg *sync.WaitGroup, receiverConfigFile string) error + AddInput(rawConfig json.RawMessage) error + AddOutput(output chan lp.CCMetric) + Start() + Close() +} + + +func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) error { + rm.inputs = make([]Receiver, 0) + rm.output = nil + rm.done = make(chan bool) + rm.wg = wg + rm.config = make([]ReceiverConfig, 0) + configFile, err := os.Open(receiverConfigFile) + if err != nil { + log.Print(err.Error()) + return err + } + defer configFile.Close() + jsonParser := json.NewDecoder(configFile) + var rawConfigs []json.RawMessage + err = jsonParser.Decode(&rawConfigs) + if err != nil { + log.Print(err.Error()) + return err + } + for _, raw := range rawConfigs { + log.Print("[ReceiveManager] ", string(raw)) + rm.AddInput(raw) +// if _, found := AvailableReceivers[k.Type]; !found { +// log.Print("[ReceiveManager] SKIP Config specifies unknown receiver 'type': ", k.Type) +// continue +// } +// r := AvailableReceivers[k.Type] +// err = r.Init(k) +// if err != nil { +// log.Print("[ReceiveManager] SKIP Receiver ", k.Type, " cannot be initialized: ", err.Error()) +// continue +// } +// rm.inputs = append(rm.inputs, r) + } + return nil +} + + +func (rm *receiveManager) Start() { + rm.wg.Add(1) + + for _, r := range rm.inputs { + log.Print("[ReceiveManager] START ", r.Name()) + r.Start() + } + log.Print("[ReceiveManager] STARTED\n") +// go func() { +// for { +//ReceiveManagerLoop: +// select { +// case <- rm.done: +// log.Print("ReceiveManager done\n") +// rm.wg.Done() +// break ReceiveManagerLoop +// default: +// for _, c := range rm.inputs { +//ReceiveManagerInputLoop: +// select { +// case <- rm.done: +// log.Print("ReceiveManager done\n") +// rm.wg.Done() +// break ReceiveManagerInputLoop +// case p := <- c: +// log.Print("ReceiveManager: ", p) +// rm.output <- p +// default: +// } +// } +// } +// } +// }() +// for _, r := range rm.inputs { +// r.Close() +// } +} + +func (rm *receiveManager) AddInput(rawConfig json.RawMessage) error { + var config ReceiverConfig + err := json.Unmarshal(rawConfig, &config) + if err != nil { + log.Print("[ReceiveManager] SKIP ", config.Type, " JSON config error: ", err.Error()) + log.Print(err.Error()) + return err + } + if _, found := AvailableReceivers[config.Type]; !found { + log.Print("[ReceiveManager] SKIP ", config.Type, " unknown receiver: ", err.Error()) + return err + } + r := AvailableReceivers[config.Type] + err = r.Init(config) + if err != nil { + log.Print("[ReceiveManager] SKIP ", r.Name(), " initialization failed: ", err.Error()) + return err + } + rm.inputs = append(rm.inputs, r) + rm.config = append(rm.config, config) + return nil +} + +func (rm *receiveManager) AddOutput(output chan lp.CCMetric) { + rm.output = output + for _, r := range rm.inputs { + r.SetSink(rm.output) + } +} + +func (rm *receiveManager) Close() { + for _, r := range rm.inputs { + log.Print("[ReceiveManager] CLOSE ", r.Name()) + r.Close() + } + rm.wg.Done() + log.Print("[ReceiveManager] CLOSE\n") + log.Print("[ReceiveManager] EXIT\n") +} + +func New(wg *sync.WaitGroup, receiverConfigFile string) (ReceiveManager, error) { + r := &receiveManager{} + err := r.Init(wg, receiverConfigFile) + if err != nil { + return nil, err + } + return r, err +} diff --git a/router.json b/router.json new file mode 100644 index 0000000..9ada37a --- /dev/null +++ b/router.json @@ -0,0 +1,17 @@ +{ + "add_tags" : [ + { + "key" : "cluster", + "value" : "testcluster", + "if" : "*" + } + ], + "delete_tags" : [ + { + "key" : "unit", + "value" : "*", + "if" : "*" + } + ], + "interval_timestamp" : true +} diff --git a/sinks.json b/sinks.json new file mode 100644 index 0000000..d304018 --- /dev/null +++ b/sinks.json @@ -0,0 +1,6 @@ +[ + { + "type" : "stdout", + "meta_as_tags" : true + } +] diff --git a/sinks/httpSink.go b/sinks/httpSink.go index e443ceb..1dc364a 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -7,19 +7,21 @@ import ( "net/http" "time" - lp "github.com/influxdata/line-protocol" + influx "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) type HttpSink struct { - Sink + sink client *http.Client url, jwt string - encoder *lp.Encoder + encoder *influx.Encoder buffer *bytes.Buffer } -func (s *HttpSink) Init(config SinkConfig) error { - if len(config.Host) == 0 || len(config.Port) == 0 { +func (s *HttpSink) Init(config sinkConfig) error { + s.name = "HttpSink" + if len(config.Host) == 0 || len(config.Port) == 0 || len(config.Database) == 0 { return errors.New("`host`, `port` and `database` config options required for TCP sink") } @@ -28,13 +30,13 @@ func (s *HttpSink) Init(config SinkConfig) error { s.port = config.Port s.jwt = config.Password s.buffer = &bytes.Buffer{} - s.encoder = lp.NewEncoder(s.buffer) + s.encoder = influx.NewEncoder(s.buffer) s.encoder.SetPrecision(time.Second) return nil } -func (s *HttpSink) Write(point lp.MutableMetric) error { +func (s *HttpSink) Write(point lp.CCMetric) error { _, err := s.encoder.Encode(point) return err } diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 40e681f..5a1b7a7 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -5,15 +5,14 @@ import ( "crypto/tls" "errors" "fmt" - + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" - lp "github.com/influxdata/line-protocol" "log" ) type InfluxSink struct { - Sink + sink client influxdb2.Client writeApi influxdb2Api.WriteAPIBlocking retPolicy string @@ -39,7 +38,8 @@ func (s *InfluxSink) connect() error { return nil } -func (s *InfluxSink) Init(config SinkConfig) error { +func (s *InfluxSink) Init(config sinkConfig) error { + s.name = "InfluxSink" if len(config.Host) == 0 || len(config.Port) == 0 || len(config.Database) == 0 || @@ -54,15 +54,21 @@ func (s *InfluxSink) Init(config SinkConfig) error { s.user = config.User s.password = config.Password s.ssl = config.SSL + s.meta_as_tags = config.MetaAsTags return s.connect() } -func (s *InfluxSink) Write(point lp.MutableMetric) error { +func (s *InfluxSink) Write(point lp.CCMetric) error { tags := map[string]string{} fields := map[string]interface{}{} for _, t := range point.TagList() { tags[t.Key] = t.Value } + if s.meta_as_tags { + for _, m := range point.MetaList() { + tags[m.Key] = m.Value + } + } for _, f := range point.FieldList() { fields[f.Key] = f.Value } diff --git a/sinks/metricSink.go b/sinks/metricSink.go index 182495a..4813155 100644 --- a/sinks/metricSink.go +++ b/sinks/metricSink.go @@ -2,21 +2,22 @@ package sinks import ( // "time" - lp "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) -type SinkConfig struct { - Host string `json:"host"` - Port string `json:"port"` - Database string `json:"database"` - User string `json:"user"` - Password string `json:"password"` - Organization string `json:"organization"` - Type string `json:"type"` - SSL bool `json:"ssl"` +type sinkConfig struct { + Type string `json:"type"` + Host string `json:"host", omitempty` + Port string `json:"port", omitempty` + Database string `json:"database, omitempty"` + User string `json:"user, omitempty"` + Password string `json:"password", omitempty` + Organization string `json:"organization", omitempty` + SSL bool `json:"ssl", omitempty` + MetaAsTags bool `json:"meta_as_tags", omitempty` } -type Sink struct { +type sink struct { host string port string user string @@ -24,11 +25,18 @@ type Sink struct { database string organization string ssl bool + meta_as_tags bool + name string } -type SinkFuncs interface { - Init(config SinkConfig) error - Write(point lp.MutableMetric) error +type Sink interface { + Init(config sinkConfig) error + Write(point lp.CCMetric) error Flush() error Close() + Name() string +} + +func (s *sink) Name() string { + return s.name } diff --git a/sinks/natsSink.go b/sinks/natsSink.go index 0df14f4..de12f4b 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -4,16 +4,17 @@ import ( "bytes" "errors" "fmt" - lp "github.com/influxdata/line-protocol" + influx "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" nats "github.com/nats-io/nats.go" "log" "time" ) type NatsSink struct { - Sink + sink client *nats.Conn - encoder *lp.Encoder + encoder *influx.Encoder buffer *bytes.Buffer } @@ -31,7 +32,8 @@ func (s *NatsSink) connect() error { return nil } -func (s *NatsSink) Init(config SinkConfig) error { +func (s *NatsSink) Init(config sinkConfig) error { + s.name = "NatsSink" if len(config.Host) == 0 || len(config.Port) == 0 || len(config.Database) == 0 { @@ -46,14 +48,14 @@ func (s *NatsSink) Init(config SinkConfig) error { // Setup Influx line protocol s.buffer = &bytes.Buffer{} s.buffer.Grow(1025) - s.encoder = lp.NewEncoder(s.buffer) + s.encoder = influx.NewEncoder(s.buffer) s.encoder.SetPrecision(time.Second) s.encoder.SetMaxLineBytes(1024) // Setup infos for connection return s.connect() } -func (s *NatsSink) Write(point lp.MutableMetric) error { +func (s *NatsSink) Write(point lp.CCMetric) error { if s.client != nil { // var tags map[string]string // var fields map[string]interface{} diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go new file mode 100644 index 0000000..4a2e829 --- /dev/null +++ b/sinks/sinkManager.go @@ -0,0 +1,151 @@ +package sinks + +import ( + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + "sync" + "log" + "os" + "encoding/json" +) + + +type SinkEntity struct { + config json.RawMessage + output Sink +} + +var AvailableSinks = map[string]Sink{ + "influxdb": &InfluxSink{}, + "stdout": &StdoutSink{}, + "nats": &NatsSink{}, + "http": &HttpSink{}, +} + + +type sinkManager struct { + input chan lp.CCMetric + outputs []Sink + done chan bool + wg *sync.WaitGroup + config []sinkConfig +} + +type SinkManager interface { + Init(wg *sync.WaitGroup, sinkConfigFile string) error + AddInput(input chan lp.CCMetric) + AddOutput(config json.RawMessage) error + Start() + Close() +} + + +func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { + sm.input = nil + sm.outputs = make([]Sink, 0) + sm.done = make(chan bool) + sm.wg = wg + sm.config = make([]sinkConfig, 0) + if len(sinkConfigFile) > 0 { + configFile, err := os.Open(sinkConfigFile) + if err != nil { + log.Print("[SinkManager] ", err.Error()) + return err + } + defer configFile.Close() + jsonParser := json.NewDecoder(configFile) + var rawConfigs []json.RawMessage + err = jsonParser.Decode(&rawConfigs) + if err != nil { + log.Print("[SinkManager] ", err.Error()) + return err + } + for _, raw := range rawConfigs { + err = sm.AddOutput(raw) + if err != nil { + continue + } + } + } + return nil +} + + + +func (sm *sinkManager) Start() { + sm.wg.Add(1) + batchcount := 20 + go func() { + for { +SinkManagerLoop: + select { + case <- sm.done: + for _, s := range sm.outputs { + s.Close() + } + log.Print("[SinkManager] DONE\n") + sm.wg.Done() + break SinkManagerLoop + case p := <- sm.input: + log.Print("[SinkManager] WRITE ", p) + for _, s := range sm.outputs { + s.Write(p) + } + if (batchcount == 0) { + log.Print("[SinkManager] FLUSH") + for _, s := range sm.outputs { + s.Flush() + } + batchcount = 20 + } + batchcount-- + default: + } + } + log.Print("[SinkManager] EXIT\n") + }() + log.Print("[SinkManager] STARTED\n") +} + +func (sm *sinkManager) AddInput(input chan lp.CCMetric) { + sm.input = input +} + +func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error { + var err error + var config sinkConfig + if len(rawConfig) > 3 { + err = json.Unmarshal(rawConfig, &config) + if err != nil { + log.Print("[SinkManager] SKIP ", config.Type, " JSON config error: ", err.Error()) + return err + } + } + if _, found := AvailableSinks[config.Type]; !found { + log.Print("[SinkManager] SKIP ", config.Type, " unknown sink: ", err.Error()) + return err + } + s := AvailableSinks[config.Type] + err = s.Init(config) + if err != nil { + log.Print("[SinkManager] SKIP ", s.Name(), " initialization failed: ", err.Error()) + return err + } + sm.outputs = append(sm.outputs, s) + sm.config = append(sm.config, config) + return nil +} + +func (sm *sinkManager) Close() { + sm.done <- true + log.Print("[SinkManager] CLOSE") + return +} + +func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) { + sm := &sinkManager{} + err := sm.Init(wg, sinkConfigFile) + if err != nil { + return nil, err + } + return sm, err +} diff --git a/sinks/stdoutSink.go b/sinks/stdoutSink.go index 34561e0..a66ba45 100644 --- a/sinks/stdoutSink.go +++ b/sinks/stdoutSink.go @@ -6,23 +6,30 @@ import ( "strings" // "time" - lp "github.com/influxdata/line-protocol" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) type StdoutSink struct { - Sink + sink } -func (s *StdoutSink) Init(config SinkConfig) error { +func (s *StdoutSink) Init(config sinkConfig) error { + s.name = "StdoutSink" + s.meta_as_tags = config.MetaAsTags return nil } -func (s *StdoutSink) Write(point lp.MutableMetric) error { +func (s *StdoutSink) Write(point lp.CCMetric) error { var tagsstr []string var fieldstr []string for _, t := range point.TagList() { tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", t.Key, t.Value)) } + if s.meta_as_tags { + for _, m := range point.MetaList() { + tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", m.Key, m.Value)) + } + } for _, f := range point.FieldList() { switch f.Value.(type) { case float64: