diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index f91db20..4b66605 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -37,7 +37,7 @@ var AvailableCollectors = map[string]MetricCollector{ // Metric collector manager data structure type collectorManager struct { collectors []MetricCollector // List of metric collectors to use - output chan lp.CCMetric // Output channels + output chan *lp.CCMetric // Output channels done chan bool // channel to finish / stop metric collector manager ticker mct.MultiChanTicker // periodically ticking once each interval duration time.Duration // duration (for metrics that measure over a given duration) @@ -48,7 +48,7 @@ type collectorManager struct { // Metric collector manager access functions type CollectorManager interface { Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error - AddOutput(output chan lp.CCMetric) + AddOutput(output chan *lp.CCMetric) Start() Close() } @@ -147,7 +147,7 @@ func (cm *collectorManager) Start() { } // AddOutput adds the output channel to the metric collector manager -func (cm *collectorManager) AddOutput(output chan lp.CCMetric) { +func (cm *collectorManager) AddOutput(output chan *lp.CCMetric) { cm.output = output } diff --git a/collectors/cpufreqCpuinfoMetric.go b/collectors/cpufreqCpuinfoMetric.go index 9c91a50..76505e9 100644 --- a/collectors/cpufreqCpuinfoMetric.go +++ b/collectors/cpufreqCpuinfoMetric.go @@ -10,8 +10,8 @@ import ( "strconv" "strings" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) // @@ -151,8 +151,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { return nil } - -func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan *lp.CCMetric) { if !m.init { return } @@ -183,7 +182,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC } y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": value}, now) if err == nil { - output <- y + output <- &y } } processorCounter++ diff --git a/collectors/cpufreqMetric.go b/collectors/cpufreqMetric.go index f3309ff..1717150 100644 --- a/collectors/cpufreqMetric.go +++ b/collectors/cpufreqMetric.go @@ -183,7 +183,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { return nil } -func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *CPUFreqCollector) Read(interval time.Duration, output chan *lp.CCMetric) { if !m.init { return } @@ -211,7 +211,7 @@ func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric) y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": cpuFreq}, now) if err == nil { - output <- y + output <- &y } } } diff --git a/collectors/cpustatMetric.go b/collectors/cpustatMetric.go index f517300..1afc517 100644 --- a/collectors/cpustatMetric.go +++ b/collectors/cpustatMetric.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" "time" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) @@ -35,7 +36,7 @@ func (m *CpustatCollector) Init(config json.RawMessage) error { return nil } -func (c *CpustatCollector) parseStatLine(line string, cpu int, exclude []string, output chan lp.CCMetric) { +func (c *CpustatCollector) parseStatLine(line string, cpu int, exclude []string, output chan *lp.CCMetric) { ls := strings.Fields(line) matches := []string{"", "cpu_user", "cpu_nice", "cpu_system", "cpu_idle", "cpu_iowait", "cpu_irq", "cpu_softirq", "cpu_steal", "cpu_guest", "cpu_guest_nice"} for _, ex := range exclude { @@ -54,14 +55,14 @@ func (c *CpustatCollector) parseStatLine(line string, cpu int, exclude []string, if err == nil { y, err := lp.New(m, tags, c.meta, map[string]interface{}{"value": int(x)}, time.Now()) if err == nil { - output <- y + output <- &y } } } } } -func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *CpustatCollector) Read(interval time.Duration, output chan *lp.CCMetric) { if !m.init { return } diff --git a/collectors/customCmdMetric.go b/collectors/customCmdMetric.go index ffe8b73..3a6558a 100644 --- a/collectors/customCmdMetric.go +++ b/collectors/customCmdMetric.go @@ -74,7 +74,7 @@ var DefaultTime = func() time.Time { return time.Unix(42, 0) } -func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *CustomCmdCollector) Read(interval time.Duration, output chan *lp.CCMetric) { if !m.init { return } @@ -99,7 +99,7 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri } y, err := lp.New(c.Name(), Tags2Map(c), m.meta, Fields2Map(c), c.Time()) if err == nil { - output <- y + output <- &y } } } @@ -121,7 +121,7 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri } y, err := lp.New(f.Name(), Tags2Map(f), m.meta, Fields2Map(f), f.Time()) if err == nil { - output <- y + output <- &y } } } diff --git a/collectors/diskstatMetric.go b/collectors/diskstatMetric.go index 50c41cd..409ebd3 100644 --- a/collectors/diskstatMetric.go +++ b/collectors/diskstatMetric.go @@ -2,7 +2,9 @@ package collectors import ( "io/ioutil" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + // "log" "encoding/json" "errors" @@ -72,7 +74,7 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error { return err } -func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *DiskstatCollector) Read(interval time.Duration, output chan *lp.CCMetric) { var lines []string if !m.init { return @@ -102,7 +104,7 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric if err == nil { y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": int(x)}, time.Now()) if err == nil { - output <- y + output <- &y } } } diff --git a/collectors/gpfsMetric.go b/collectors/gpfsMetric.go index 53db1c2..1951288 100644 --- a/collectors/gpfsMetric.go +++ b/collectors/gpfsMetric.go @@ -69,7 +69,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { return nil } -func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *GpfsCollector) Read(interval time.Duration, output chan *lp.CCMetric) { if !m.init { return } @@ -157,7 +157,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp) if err == nil { - output <- y + output <- &y } // bytes written @@ -171,7 +171,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { y, err = lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp) if err == nil { - output <- y + output <- &y } // number of opens @@ -184,7 +184,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { } y, err = lp.New("gpfs_num_opens", m.tags, m.meta, map[string]interface{}{"value": numOpens}, timestamp) if err == nil { - output <- y + output <- &y } // number of closes @@ -195,7 +195,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { } y, err = lp.New("gpfs_num_closes", m.tags, m.meta, map[string]interface{}{"value": numCloses}, timestamp) if err == nil { - output <- y + output <- &y } // number of reads @@ -206,7 +206,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { } y, err = lp.New("gpfs_num_reads", m.tags, m.meta, map[string]interface{}{"value": numReads}, timestamp) if err == nil { - output <- y + output <- &y } // number of writes @@ -217,7 +217,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { } y, err = lp.New("gpfs_num_writes", m.tags, m.meta, map[string]interface{}{"value": numWrites}, timestamp) if err == nil { - output <- y + output <- &y } // number of read directories @@ -228,7 +228,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { } y, err = lp.New("gpfs_num_readdirs", m.tags, m.meta, map[string]interface{}{"value": numReaddirs}, timestamp) if err == nil { - output <- y + output <- &y } // Number of inode updates @@ -239,7 +239,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { } y, err = lp.New("gpfs_num_inode_updates", m.tags, m.meta, map[string]interface{}{"value": numInodeUpdates}, timestamp) if err == nil { - output <- y + output <- &y } } } diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index 6b4c882..63b6302 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -140,7 +140,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { } // Read reads Infiniband counter files below IB_BASEPATH -func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *InfinibandCollector) Read(interval time.Duration, output chan *lp.CCMetric) { // Check if already initialized if !m.init { @@ -156,7 +156,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr if data, ok := readOneLine(counterFile); ok { if v, err := strconv.ParseInt(data, 10, 64); err == nil { if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil { - output <- y + output <- &y } } } diff --git a/collectors/infinibandPerfQueryMetric.go b/collectors/infinibandPerfQueryMetric.go index d8f7bf4..060c0d4 100644 --- a/collectors/infinibandPerfQueryMetric.go +++ b/collectors/infinibandPerfQueryMetric.go @@ -108,7 +108,7 @@ func (m *InfinibandPerfQueryCollector) Init(config json.RawMessage) error { return nil } -func (m *InfinibandPerfQueryCollector) doPerfQuery(cmd string, dev string, lid string, port string, tags map[string]string, output chan lp.CCMetric) error { +func (m *InfinibandPerfQueryCollector) doPerfQuery(cmd string, dev string, lid string, port string, tags map[string]string, output chan *lp.CCMetric) error { args := fmt.Sprintf("-r %s %s 0xf000", lid, port) command := exec.Command(cmd, args) @@ -127,7 +127,7 @@ func (m *InfinibandPerfQueryCollector) doPerfQuery(cmd string, dev string, lid s if err == nil { y, err := lp.New("ib_recv", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) if err == nil { - output <- y + output <- &y } } } @@ -137,7 +137,7 @@ func (m *InfinibandPerfQueryCollector) doPerfQuery(cmd string, dev string, lid s if err == nil { y, err := lp.New("ib_xmit", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) if err == nil { - output <- y + output <- &y } } } @@ -147,7 +147,7 @@ func (m *InfinibandPerfQueryCollector) doPerfQuery(cmd string, dev string, lid s if err == nil { y, err := lp.New("ib_recv_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) if err == nil { - output <- y + output <- &y } } } @@ -157,7 +157,7 @@ func (m *InfinibandPerfQueryCollector) doPerfQuery(cmd string, dev string, lid s if err == nil { y, err := lp.New("ib_xmit_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) if err == nil { - output <- y + output <- &y } } } @@ -167,7 +167,7 @@ func (m *InfinibandPerfQueryCollector) doPerfQuery(cmd string, dev string, lid s if err == nil { y, err := lp.New("ib_recv_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) if err == nil { - output <- y + output <- &y } } } @@ -177,7 +177,7 @@ func (m *InfinibandPerfQueryCollector) doPerfQuery(cmd string, dev string, lid s if err == nil { y, err := lp.New("ib_xmit_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) if err == nil { - output <- y + output <- &y } } } @@ -185,7 +185,7 @@ func (m *InfinibandPerfQueryCollector) doPerfQuery(cmd string, dev string, lid s return nil } -func (m *InfinibandPerfQueryCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *InfinibandPerfQueryCollector) Read(interval time.Duration, output chan *lp.CCMetric) { if m.init { for dev, ports := range m.lids { @@ -203,7 +203,7 @@ func (m *InfinibandPerfQueryCollector) Read(interval time.Duration, output chan if err == nil { y, err := lp.New("ib_recv", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) if err == nil { - output <- y + output <- &y } } } @@ -214,7 +214,7 @@ func (m *InfinibandPerfQueryCollector) Read(interval time.Duration, output chan if err == nil { y, err := lp.New("ib_xmit", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) if err == nil { - output <- y + output <- &y } } } @@ -225,7 +225,7 @@ func (m *InfinibandPerfQueryCollector) Read(interval time.Duration, output chan if err == nil { y, err := lp.New("ib_recv_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) if err == nil { - output <- y + output <- &y } } } @@ -236,7 +236,7 @@ func (m *InfinibandPerfQueryCollector) Read(interval time.Duration, output chan if err == nil { y, err := lp.New("ib_xmit_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) if err == nil { - output <- y + output <- &y } } } diff --git a/collectors/ipmiMetric.go b/collectors/ipmiMetric.go index f4c5167..3449af6 100644 --- a/collectors/ipmiMetric.go +++ b/collectors/ipmiMetric.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" "time" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) @@ -53,7 +54,7 @@ func (m *IpmiCollector) Init(config json.RawMessage) error { return nil } -func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) { +func (m *IpmiCollector) readIpmiTool(cmd string, output chan *lp.CCMetric) { command := exec.Command(cmd, "sensor") command.Wait() stdout, err := command.Output() @@ -86,13 +87,13 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) { y, err := lp.New(name, map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": v}, time.Now()) if err == nil { y.AddMeta("unit", unit) - output <- y + output <- &y } } } } -func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) { +func (m *IpmiCollector) readIpmiSensors(cmd string, output chan *lp.CCMetric) { command := exec.Command(cmd, "--comma-separated-output", "--sdr-cache-recreate") command.Wait() @@ -115,14 +116,14 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) { if len(lv) > 4 { y.AddMeta("unit", lv[4]) } - output <- y + output <- &y } } } } } -func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *IpmiCollector) Read(interval time.Duration, output chan *lp.CCMetric) { if len(m.config.IpmitoolPath) > 0 { _, err := os.Stat(m.config.IpmitoolPath) if err == nil { diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index 430a09b..b0044fd 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -20,6 +20,7 @@ import ( "strings" "time" "unsafe" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "gopkg.in/Knetic/govaluate.v2" ) @@ -183,7 +184,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { return nil } -func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *LikwidCollector) Read(interval time.Duration, output chan *lp.CCMetric) { if !m.init { return } @@ -269,7 +270,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) map[string]interface{}{"value": m.mresults[i][tid][metric.Name]}, time.Now()) if err == nil { - output <- y + output <- &y } } } else if metric.Scope.String() == "hwthread" { @@ -281,7 +282,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) map[string]interface{}{"value": m.mresults[i][tid][metric.Name]}, time.Now()) if err == nil { - output <- y + output <- &y } } } @@ -300,7 +301,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) map[string]interface{}{"value": m.gmresults[tid][metric.Name]}, time.Now()) if err == nil { - output <- y + output <- &y } } } else { @@ -312,7 +313,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) map[string]interface{}{"value": m.gmresults[tid][metric.Name]}, time.Now()) if err == nil { - output <- y + output <- &y } } } diff --git a/collectors/loadavgMetric.go b/collectors/loadavgMetric.go index 11c0e5e..b408bc8 100644 --- a/collectors/loadavgMetric.go +++ b/collectors/loadavgMetric.go @@ -6,6 +6,7 @@ import ( "strconv" "strings" "time" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) @@ -40,7 +41,7 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error { return nil } -func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *LoadavgCollector) Read(interval time.Duration, output chan *lp.CCMetric) { var skip bool if !m.init { return @@ -58,7 +59,7 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, name) y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": float64(x)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } } @@ -69,7 +70,7 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, name) y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": float64(x)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } } diff --git a/collectors/lustreMetric.go b/collectors/lustreMetric.go index 3e248fa..fab47cc 100644 --- a/collectors/lustreMetric.go +++ b/collectors/lustreMetric.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" "time" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) @@ -64,7 +65,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error { return nil } -func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *LustreCollector) Read(interval time.Duration, output chan *lp.CCMetric) { if !m.init { return } @@ -93,7 +94,7 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric) if strings.Contains(name, "byte") { y.AddMeta("unit", "Byte") } - output <- y + output <- &y } } } diff --git a/collectors/memstatMetric.go b/collectors/memstatMetric.go index b6ef855..53e6949 100644 --- a/collectors/memstatMetric.go +++ b/collectors/memstatMetric.go @@ -66,7 +66,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error { return err } -func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *MemstatCollector) Read(interval time.Duration, output chan *lp.CCMetric) { if !m.init { return } @@ -100,7 +100,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) } y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": int(float64(m.stats[match]) * 1.0e-3)}, time.Now()) if err == nil { - output <- y + output <- &y } } @@ -111,7 +111,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used") y, err := lp.New("mem_used", m.tags, m.meta, map[string]interface{}{"value": int(float64(memUsed) * 1.0e-3)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } } @@ -120,7 +120,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_shared") y, err := lp.New("mem_shared", m.tags, m.meta, map[string]interface{}{"value": int(float64(m.stats[`MemShared`]) * 1.0e-3)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } } diff --git a/collectors/metricCollector.go b/collectors/metricCollector.go index 3484dca..21c4cac 100644 --- a/collectors/metricCollector.go +++ b/collectors/metricCollector.go @@ -17,7 +17,7 @@ type MetricCollector interface { Name() string Init(config json.RawMessage) error Initialized() bool - Read(duration time.Duration, output chan lp.CCMetric) + Read(duration time.Duration, output chan *lp.CCMetric) Close() } diff --git a/collectors/netstatMetric.go b/collectors/netstatMetric.go index 86437ea..391766f 100644 --- a/collectors/netstatMetric.go +++ b/collectors/netstatMetric.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" "time" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) @@ -46,7 +47,7 @@ func (m *NetstatCollector) Init(config json.RawMessage) error { return nil } -func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *NetstatCollector) Read(interval time.Duration, output chan *lp.CCMetric) { data, err := ioutil.ReadFile(string(NETSTATFILE)) if err != nil { log.Print(err.Error()) @@ -81,7 +82,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric) case strings.Contains(name, "pkt"): y.AddMeta("unit", "Packets") } - output <- y + output <- &y } } } diff --git a/collectors/nfsMetric.go b/collectors/nfsMetric.go index 16a6d23..f09154c 100644 --- a/collectors/nfsMetric.go +++ b/collectors/nfsMetric.go @@ -119,7 +119,7 @@ func (m *NfsCollector) Init(config json.RawMessage) error { return nil } -func (m *NfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *NfsCollector) Read(interval time.Duration, output chan *lp.CCMetric) { if !m.init { return } @@ -136,7 +136,7 @@ func (m *NfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { y, err := lp.New(fmt.Sprintf("nfs_%s", name), m.tags, m.meta, map[string]interface{}{"value": value}, timestamp) if err == nil { y.AddMeta("version", version) - output <- y + output <- &y } } } diff --git a/collectors/nvidiaMetric.go b/collectors/nvidiaMetric.go index 1eff3be..e7a391d 100644 --- a/collectors/nvidiaMetric.go +++ b/collectors/nvidiaMetric.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "time" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "github.com/NVIDIA/go-nvml/pkg/nvml" ) @@ -55,7 +56,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error { return nil } -func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *NvidiaCollector) Read(interval time.Duration, output chan *lp.CCMetric) { if !m.init { return } @@ -76,12 +77,12 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_util") y, err := lp.New("nv_util", tags, m.meta, map[string]interface{}{"value": float64(util.Gpu)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_mem_util") y, err = lp.New("nv_mem_util", tags, m.meta, map[string]interface{}{"value": float64(util.Memory)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } @@ -92,14 +93,14 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) y, err := lp.New("nv_mem_total", tags, m.meta, map[string]interface{}{"value": t}, time.Now()) if err == nil && !skip { y.AddMeta("unit", "MByte") - output <- y + output <- &y } f := float64(meminfo.Used) / (1024 * 1024) _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_fb_memory") y, err = lp.New("nv_fb_memory", tags, m.meta, map[string]interface{}{"value": f}, time.Now()) if err == nil && !skip { y.AddMeta("unit", "MByte") - output <- y + output <- &y } } @@ -109,7 +110,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) y, err := lp.New("nv_temp", tags, m.meta, map[string]interface{}{"value": float64(temp)}, time.Now()) if err == nil && !skip { y.AddMeta("unit", "degC") - output <- y + output <- &y } } @@ -118,7 +119,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_fan") y, err := lp.New("nv_fan", tags, m.meta, map[string]interface{}{"value": float64(fan)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } @@ -136,13 +137,13 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) } _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_ecc_mode") if err == nil && !skip { - output <- y + output <- &y } } else if ret == nvml.ERROR_NOT_SUPPORTED { _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_ecc_mode") y, err := lp.New("nv_ecc_mode", tags, m.meta, map[string]interface{}{"value": string("N/A")}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } @@ -151,7 +152,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_perf_state") y, err := lp.New("nv_perf_state", tags, m.meta, map[string]interface{}{"value": fmt.Sprintf("P%d", int(pstate))}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } @@ -160,7 +161,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_power_usage_report") y, err := lp.New("nv_power_usage_report", tags, m.meta, map[string]interface{}{"value": float64(power) / 1000}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } @@ -169,7 +170,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_graphics_clock_report") y, err := lp.New("nv_graphics_clock_report", tags, m.meta, map[string]interface{}{"value": float64(gclk)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } @@ -178,7 +179,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_sm_clock_report") y, err := lp.New("nv_sm_clock_report", tags, m.meta, map[string]interface{}{"value": float64(smclk)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } @@ -187,7 +188,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_mem_clock_report") y, err := lp.New("nv_mem_clock_report", tags, m.meta, map[string]interface{}{"value": float64(memclk)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } @@ -196,7 +197,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_max_graphics_clock") y, err := lp.New("nv_max_graphics_clock", tags, m.meta, map[string]interface{}{"value": float64(max_gclk)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } @@ -205,7 +206,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_max_sm_clock") y, err := lp.New("nv_max_sm_clock", tags, m.meta, map[string]interface{}{"value": float64(max_smclk)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } @@ -214,7 +215,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_max_mem_clock") y, err := lp.New("nv_max_mem_clock", tags, m.meta, map[string]interface{}{"value": float64(max_memclk)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } @@ -223,7 +224,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_ecc_db_error") y, err := lp.New("nv_ecc_db_error", tags, m.meta, map[string]interface{}{"value": float64(ecc_db)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } @@ -232,7 +233,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_ecc_sb_error") y, err := lp.New("nv_ecc_sb_error", tags, m.meta, map[string]interface{}{"value": float64(ecc_sb)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } @@ -241,7 +242,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_power_man_limit") y, err := lp.New("nv_power_man_limit", tags, m.meta, map[string]interface{}{"value": float64(pwr_limit)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } @@ -250,7 +251,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_encoder_util") y, err := lp.New("nv_encoder_util", tags, m.meta, map[string]interface{}{"value": float64(enc_util)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } @@ -259,7 +260,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) _, skip = stringArrayContains(m.config.ExcludeMetrics, "nv_decoder_util") y, err := lp.New("nv_decoder_util", tags, m.meta, map[string]interface{}{"value": float64(dec_util)}, time.Now()) if err == nil && !skip { - output <- y + output <- &y } } } diff --git a/collectors/tempMetric.go b/collectors/tempMetric.go index caa726e..b7806c2 100644 --- a/collectors/tempMetric.go +++ b/collectors/tempMetric.go @@ -4,12 +4,13 @@ import ( "encoding/json" "fmt" "io/ioutil" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" "os" "path/filepath" "strconv" "strings" "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) @@ -75,7 +76,7 @@ func get_hwmon_sensors() (map[string]map[string]string, error) { return sensors, nil } -func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *TempCollector) Read(interval time.Duration, output chan *lp.CCMetric) { sensors, err := get_hwmon_sensors() if err != nil { @@ -103,7 +104,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) { y, err := lp.New(strings.ToLower(mname), tags, m.meta, map[string]interface{}{"value": int(float64(x) / 1000)}, time.Now()) if err == nil { cclog.ComponentDebug(m.name, y) - output <- y + output <- &y } } } diff --git a/collectors/topprocsMetric.go b/collectors/topprocsMetric.go index d2691dc..4dfce6b 100644 --- a/collectors/topprocsMetric.go +++ b/collectors/topprocsMetric.go @@ -8,6 +8,7 @@ import ( "os/exec" "strings" "time" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) @@ -51,7 +52,7 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error { return nil } -func (m *TopProcsCollector) Read(interval time.Duration, output chan lp.CCMetric) { +func (m *TopProcsCollector) Read(interval time.Duration, output chan *lp.CCMetric) { if !m.init { return } @@ -68,7 +69,7 @@ func (m *TopProcsCollector) Read(interval time.Duration, output chan lp.CCMetric name := fmt.Sprintf("topproc%d", i) y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": string(lines[i])}, time.Now()) if err == nil { - output <- y + output <- &y } } } diff --git a/internal/metricRouter/metricAggregator.go b/internal/metricRouter/metricAggregator.go index 41c5276..5803b0a 100644 --- a/internal/metricRouter/metricAggregator.go +++ b/internal/metricRouter/metricAggregator.go @@ -29,14 +29,14 @@ type metricAggregator struct { functions []*metricAggregatorIntervalConfig constants map[string]interface{} language gval.Language - output chan lp.CCMetric + output chan *lp.CCMetric } type MetricAggregator interface { AddAggregation(name, function, condition string, tags, meta map[string]string) error DeleteAggregation(name string) error - Init(output chan lp.CCMetric) error - Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMetric) + Init(output chan *lp.CCMetric) error + Eval(starttime time.Time, endtime time.Time, metrics []*lp.CCMetric) } var metricCacheLanguage = gval.NewLanguage( @@ -62,7 +62,7 @@ var metricCacheLanguage = gval.NewLanguage( gval.Function("getCpuListOfType", getCpuListOfType), ) -func (c *metricAggregator) Init(output chan lp.CCMetric) error { +func (c *metricAggregator) Init(output chan *lp.CCMetric) error { c.output = output c.functions = make([]*metricAggregatorIntervalConfig, 0) c.constants = make(map[string]interface{}) @@ -100,7 +100,7 @@ func (c *metricAggregator) Init(output chan lp.CCMetric) error { return nil } -func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMetric) { +func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics []*lp.CCMetric) { vars := make(map[string]interface{}) for k, v := range c.constants { vars[k] = v @@ -110,9 +110,9 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics for _, f := range c.functions { cclog.ComponentDebug("MetricCache", "COLLECT", f.Name, "COND", f.Condition) values := make([]float64, 0) - matches := make([]lp.CCMetric, 0) + matches := make([]*lp.CCMetric, 0) for _, m := range metrics { - vars["metric"] = m + vars["metric"] = *m //value, err := gval.Evaluate(f.Condition, vars, c.language) value, err := f.gvalCond.EvalBool(context.Background(), vars) if err != nil { @@ -120,7 +120,7 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics continue } if value { - v, valid := m.GetField("value") + v, valid := (*m).GetField("value") if valid { switch x := v.(type) { case float64: @@ -153,13 +153,14 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics break } - copy_tags := func(tags map[string]string, metrics []lp.CCMetric) map[string]string { + copy_tags := func(tags map[string]string, metrics []*lp.CCMetric) map[string]string { out := make(map[string]string) for key, value := range tags { switch value { case "": for _, m := range metrics { - v, err := m.GetTag(key) + point := *m + v, err := point.GetTag(key) if err { out[key] = v } @@ -170,13 +171,14 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics } return out } - copy_meta := func(meta map[string]string, metrics []lp.CCMetric) map[string]string { + copy_meta := func(meta map[string]string, metrics []*lp.CCMetric) map[string]string { out := make(map[string]string) for key, value := range meta { switch value { case "": for _, m := range metrics { - v, err := m.GetMeta(key) + point := *m + v, err := point.GetMeta(key) if err { out[key] = v } @@ -210,7 +212,7 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics } cclog.ComponentDebug("MetricCache", "SEND", m) select { - case c.output <- m: + case c.output <- &m: default: } @@ -281,7 +283,7 @@ func (c *metricAggregator) AddFunction(name string, function func(args ...interf c.language = gval.NewLanguage(c.language, gval.Function(name, function)) } -func NewAggregator(output chan lp.CCMetric) (MetricAggregator, error) { +func NewAggregator(output chan *lp.CCMetric) (MetricAggregator, error) { a := new(metricAggregator) err := a.Init(output) if err != nil { diff --git a/internal/metricRouter/metricCache.go b/internal/metricRouter/metricCache.go index 1cfd8c3..d78bbe1 100644 --- a/internal/metricRouter/metricCache.go +++ b/internal/metricRouter/metricCache.go @@ -15,7 +15,7 @@ type metricCachePeriod struct { stopstamp time.Time numMetrics int sizeMetrics int - metrics []lp.CCMetric + metrics []*lp.CCMetric } // Metric cache data structure @@ -27,21 +27,21 @@ type metricCache struct { ticker mct.MultiChanTicker tickchan chan time.Time done chan bool - output chan lp.CCMetric + output chan *lp.CCMetric aggEngine MetricAggregator } type MetricCache interface { - Init(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error + Init(output chan *lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error Start() - Add(metric lp.CCMetric) - GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric) + Add(metric *lp.CCMetric) + GetPeriod(index int) (time.Time, time.Time, []*lp.CCMetric) AddAggregation(name, function, condition string, tags, meta map[string]string) error DeleteAggregation(name string) error Close() } -func (c *metricCache) Init(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error { +func (c *metricCache) Init(output chan *lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error { var err error = nil c.done = make(chan bool) c.wg = wg @@ -53,7 +53,7 @@ func (c *metricCache) Init(output chan lp.CCMetric, ticker mct.MultiChanTicker, p := new(metricCachePeriod) p.numMetrics = 0 p.sizeMetrics = 0 - p.metrics = make([]lp.CCMetric, 0) + p.metrics = make([]*lp.CCMetric, 0) c.intervals = append(c.intervals, p) } @@ -120,18 +120,18 @@ func (c *metricCache) Start() { // Add a metric to the cache. The interval is defined by the global timer (rotate() in Start()) // The intervals list is used as round-robin buffer and the metric list grows dynamically and // to avoid reallocations -func (c *metricCache) Add(metric lp.CCMetric) { +func (c *metricCache) Add(metric *lp.CCMetric) { if c.curPeriod >= 0 && c.curPeriod < c.numPeriods { p := c.intervals[c.curPeriod] if p.numMetrics < p.sizeMetrics { p.metrics[p.numMetrics] = metric p.numMetrics = p.numMetrics + 1 - p.stopstamp = metric.Time() + p.stopstamp = (*metric).Time() } else { p.metrics = append(p.metrics, metric) p.numMetrics = p.numMetrics + 1 p.sizeMetrics = p.sizeMetrics + 1 - p.stopstamp = metric.Time() + p.stopstamp = (*metric).Time() } } } @@ -147,7 +147,7 @@ func (c *metricCache) DeleteAggregation(name string) error { // Get all metrics of a interval. The index is the difference to the current interval, so index=0 // is the current one, index=1 the last interval and so on. Returns and empty array if a wrong index // is given (negative index, index larger than configured number of total intervals, ...) -func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric) { +func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []*lp.CCMetric) { if index >= 0 && index < c.numPeriods { pindex := c.curPeriod - index if pindex < 0 { @@ -157,7 +157,7 @@ func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric) return c.intervals[pindex].startstamp, c.intervals[pindex].stopstamp, c.intervals[pindex].metrics } } - return time.Now(), time.Now(), make([]lp.CCMetric, 0) + return time.Now(), time.Now(), make([]*lp.CCMetric, 0) } // Close finishes / stops the metric cache @@ -166,7 +166,7 @@ func (c *metricCache) Close() { c.done <- true } -func NewCache(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) (MetricCache, error) { +func NewCache(output chan *lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) (MetricCache, error) { c := new(metricCache) err := c.Init(output, ticker, wg, numPeriods) if err != nil { diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 870af02..4f484fa 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -33,10 +33,10 @@ type metricRouterConfig struct { // Metric router data structure type metricRouter struct { hostname string // Hostname used in tags - coll_input chan lp.CCMetric // Input channel from CollectorManager - recv_input chan lp.CCMetric // Input channel from ReceiveManager - cache_input chan lp.CCMetric // Input channel from MetricCache - outputs []chan lp.CCMetric // List of all output channels + coll_input chan *lp.CCMetric // Input channel from CollectorManager + recv_input chan *lp.CCMetric // Input channel from ReceiveManager + cache_input chan *lp.CCMetric // Input channel from MetricCache + outputs []chan *lp.CCMetric // List of all output channels done chan bool // channel to finish / stop metric router wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector timestamp time.Time // timestamp periodically updated by ticker each interval @@ -50,9 +50,9 @@ type metricRouter struct { // MetricRouter access functions type MetricRouter interface { Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error - AddCollectorInput(input chan lp.CCMetric) - AddReceiverInput(input chan lp.CCMetric) - AddOutput(output chan lp.CCMetric) + AddCollectorInput(input chan *lp.CCMetric) + AddReceiverInput(input chan *lp.CCMetric) + AddOutput(output chan *lp.CCMetric) Start() Close() } @@ -64,9 +64,9 @@ type MetricRouter interface { // * ticker (from variable ticker) // * configuration (read from config file in variable routerConfigFile) func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error { - r.outputs = make([]chan lp.CCMetric, 0) + r.outputs = make([]chan *lp.CCMetric, 0) r.done = make(chan bool) - r.cache_input = make(chan lp.CCMetric) + r.cache_input = make(chan *lp.CCMetric) r.wg = wg r.ticker = ticker @@ -131,7 +131,8 @@ func (r *metricRouter) StartTimer() { } // EvalCondition evaluates condition cond for metric data from point -func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, error) { +func (r *metricRouter) EvalCondition(cond string, pptr *lp.CCMetric) (bool, error) { + point := *pptr expression, err := govaluate.NewEvaluableExpression(cond) if err != nil { cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error()) @@ -162,7 +163,7 @@ func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, erro } // DoAddTags adds a tag when condition is fullfiled -func (r *metricRouter) DoAddTags(point lp.CCMetric) { +func (r *metricRouter) DoAddTags(point *lp.CCMetric) { for _, m := range r.config.AddTags { var conditionMatches bool @@ -177,13 +178,13 @@ func (r *metricRouter) DoAddTags(point lp.CCMetric) { } } if conditionMatches { - point.AddTag(m.Key, m.Value) + (*point).AddTag(m.Key, m.Value) } } } // DoDelTags removes a tag when condition is fullfiled -func (r *metricRouter) DoDelTags(point lp.CCMetric) { +func (r *metricRouter) DoDelTags(point *lp.CCMetric) { for _, m := range r.config.DelTags { var conditionMatches bool @@ -198,7 +199,7 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) { } } if conditionMatches { - point.RemoveTag(m.Key) + (*point).RemoveTag(m.Key) } } } @@ -220,8 +221,8 @@ func (r *metricRouter) Start() { // Forward takes a received metric, adds or deletes tags // and forwards it to the output channels - forward := func(point lp.CCMetric) { - cclog.ComponentDebug("MetricRouter", "FORWARD", point) + forward := func(point *lp.CCMetric) { + cclog.ComponentDebug("MetricRouter", "FORWARD", *point) r.DoAddTags(point) r.DoDelTags(point) for _, o := range r.outputs { @@ -243,9 +244,9 @@ func (r *metricRouter) Start() { case p := <-r.coll_input: // receive from metric collector - p.AddTag("hostname", r.hostname) + (*p).AddTag("hostname", r.hostname) if r.config.IntervalStamp { - p.SetTime(r.timestamp) + (*p).SetTime(r.timestamp) } forward(p) r.cache.Add(p) @@ -253,13 +254,13 @@ func (r *metricRouter) Start() { case p := <-r.recv_input: // receive from receive manager if r.config.IntervalStamp { - p.SetTime(r.timestamp) + (*p).SetTime(r.timestamp) } forward(p) case p := <-r.cache_input: - // receive from metric collector - p.AddTag("hostname", r.hostname) + // receive from metric cache and aggregator + (*p).AddTag("hostname", r.hostname) forward(p) } } @@ -268,17 +269,17 @@ func (r *metricRouter) Start() { } // AddCollectorInput adds a channel between metric collector and metric router -func (r *metricRouter) AddCollectorInput(input chan lp.CCMetric) { +func (r *metricRouter) AddCollectorInput(input chan *lp.CCMetric) { r.coll_input = input } // AddReceiverInput adds a channel between metric receiver and metric router -func (r *metricRouter) AddReceiverInput(input chan lp.CCMetric) { +func (r *metricRouter) AddReceiverInput(input chan *lp.CCMetric) { r.recv_input = input } // AddOutput adds a output channel to the metric router -func (r *metricRouter) AddOutput(output chan lp.CCMetric) { +func (r *metricRouter) AddOutput(output chan *lp.CCMetric) { r.outputs = append(r.outputs, output) } diff --git a/metric-collector.go b/metric-collector.go index 066fe3c..c535070 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -55,7 +55,7 @@ type RuntimeConfig struct { ReceiveManager receivers.ReceiveManager MultiChanTicker mct.MultiChanTicker - Channels []chan lp.CCMetric + Channels []chan *lp.CCMetric Sync sync.WaitGroup } @@ -251,7 +251,7 @@ func mainFunc() int { } // Connect metric router to sink manager - RouterToSinksChannel := make(chan lp.CCMetric, 200) + RouterToSinksChannel := make(chan *lp.CCMetric, 200) rcfg.SinkManager.AddInput(RouterToSinksChannel) rcfg.MetricRouter.AddOutput(RouterToSinksChannel) @@ -263,7 +263,7 @@ func mainFunc() int { } // Connect collector manager to metric router - CollectToRouterChannel := make(chan lp.CCMetric, 200) + CollectToRouterChannel := make(chan *lp.CCMetric, 200) rcfg.CollectManager.AddOutput(CollectToRouterChannel) rcfg.MetricRouter.AddCollectorInput(CollectToRouterChannel) @@ -276,7 +276,7 @@ func mainFunc() int { } // Connect receive manager to metric router - ReceiveToRouterChannel := make(chan lp.CCMetric, 200) + ReceiveToRouterChannel := make(chan *lp.CCMetric, 200) rcfg.ReceiveManager.AddOutput(ReceiveToRouterChannel) rcfg.MetricRouter.AddReceiverInput(ReceiveToRouterChannel) use_recv = true diff --git a/receivers/metricReceiver.go b/receivers/metricReceiver.go index 2c74409..ba9bc38 100644 --- a/receivers/metricReceiver.go +++ b/receivers/metricReceiver.go @@ -20,7 +20,7 @@ type receiver struct { port string database string organization string - sink chan lp.CCMetric + sink chan *lp.CCMetric } type Receiver interface { @@ -28,14 +28,14 @@ type Receiver interface { Start() Close() Name() string - SetSink(sink chan lp.CCMetric) + SetSink(sink chan *lp.CCMetric) } func (r *receiver) Name() string { return r.name } -func (r *receiver) SetSink(sink chan lp.CCMetric) { +func (r *receiver) SetSink(sink chan *lp.CCMetric) { r.sink = sink } diff --git a/receivers/natsReceiver.go b/receivers/natsReceiver.go index 853edf1..c7c559e 100644 --- a/receivers/natsReceiver.go +++ b/receivers/natsReceiver.go @@ -3,11 +3,12 @@ package receivers import ( "errors" "fmt" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" influx "github.com/influxdata/line-protocol" nats "github.com/nats-io/nats.go" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - "time" ) type NatsReceiverConfig struct { @@ -35,7 +36,7 @@ func (r *NatsReceiver) Init(config ReceiverConfig) error { if len(r.config.Addr) == 0 || len(r.config.Port) == 0 || len(r.config.Database) == 0 { - return errors.New("Not all configuration variables set required by NatsReceiver") + return errors.New("not all configuration variables set required by NatsReceiver") } r.meta = map[string]string{"source": r.name} r.addr = r.config.Addr @@ -76,7 +77,7 @@ func (r *NatsReceiver) _NatsReceive(m *nats.Msg) { y.AddMeta(k, v) } if r.sink != nil { - r.sink <- y + r.sink <- &y } } } diff --git a/receivers/receiveManager.go b/receivers/receiveManager.go index c570aa4..56a1e4c 100644 --- a/receivers/receiveManager.go +++ b/receivers/receiveManager.go @@ -15,7 +15,7 @@ var AvailableReceivers = map[string]Receiver{ type receiveManager struct { inputs []Receiver - output chan lp.CCMetric + output chan *lp.CCMetric done chan bool wg *sync.WaitGroup config []ReceiverConfig @@ -24,7 +24,7 @@ type receiveManager struct { type ReceiveManager interface { Init(wg *sync.WaitGroup, receiverConfigFile string) error AddInput(rawConfig json.RawMessage) error - AddOutput(output chan lp.CCMetric) + AddOutput(output chan *lp.CCMetric) Start() Close() } @@ -87,7 +87,7 @@ func (rm *receiveManager) AddInput(rawConfig json.RawMessage) error { return nil } -func (rm *receiveManager) AddOutput(output chan lp.CCMetric) { +func (rm *receiveManager) AddOutput(output chan *lp.CCMetric) { rm.output = output for _, r := range rm.inputs { r.SetSink(rm.output) diff --git a/sinks/gangliaSink.go b/sinks/gangliaSink.go index 87506a0..822f07f 100644 --- a/sinks/gangliaSink.go +++ b/sinks/gangliaSink.go @@ -26,10 +26,11 @@ func (s *GangliaSink) Init(config sinkConfig) error { return err } -func (s *GangliaSink) Write(point lp.CCMetric) error { +func (s *GangliaSink) Write(pptr *lp.CCMetric) error { var err error = nil var tagsstr []string var argstr []string + point := *pptr for _, t := range point.TagList() { switch t.Key { case "cluster": diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 25b0082..6bf85d4 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -36,8 +36,8 @@ func (s *HttpSink) Init(config sinkConfig) error { return nil } -func (s *HttpSink) Write(point lp.CCMetric) error { - _, err := s.encoder.Encode(point) +func (s *HttpSink) Write(point *lp.CCMetric) error { + _, err := s.encoder.Encode(*point) return err } diff --git a/sinks/influxSink.go b/sinks/influxSink.go index dca1572..594a6f8 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -5,10 +5,11 @@ import ( "crypto/tls" "errors" "fmt" + "log" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" - "log" ) type InfluxSink struct { @@ -58,22 +59,23 @@ func (s *InfluxSink) Init(config sinkConfig) error { return s.connect() } -func (s *InfluxSink) Write(point lp.CCMetric) error { +func (s *InfluxSink) Write(point *lp.CCMetric) error { tags := map[string]string{} fields := map[string]interface{}{} - for _, t := range point.TagList() { + p := *point + for _, t := range p.TagList() { tags[t.Key] = t.Value } if s.meta_as_tags { - for _, m := range point.MetaList() { + for _, m := range p.MetaList() { tags[m.Key] = m.Value } } - for _, f := range point.FieldList() { + for _, f := range p.FieldList() { fields[f.Key] = f.Value } - p := influxdb2.NewPoint(point.Name(), tags, fields, point.Time()) - err := s.writeApi.WritePoint(context.Background(), p) + x := influxdb2.NewPoint(p.Name(), tags, fields, p.Time()) + err := s.writeApi.WritePoint(context.Background(), x) return err } diff --git a/sinks/metricSink.go b/sinks/metricSink.go index 25f66bb..17065e5 100644 --- a/sinks/metricSink.go +++ b/sinks/metricSink.go @@ -31,7 +31,7 @@ type sink struct { type Sink interface { Init(config sinkConfig) error - Write(point lp.CCMetric) error + Write(point *lp.CCMetric) error Flush() error Close() Name() string diff --git a/sinks/natsSink.go b/sinks/natsSink.go index f9cd7eb..e9ba93b 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -4,11 +4,12 @@ import ( "bytes" "errors" "fmt" + "log" + "time" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" influx "github.com/influxdata/line-protocol" nats "github.com/nats-io/nats.go" - "log" - "time" ) type NatsSink struct { @@ -55,9 +56,9 @@ func (s *NatsSink) Init(config sinkConfig) error { return s.connect() } -func (s *NatsSink) Write(point lp.CCMetric) error { +func (s *NatsSink) Write(point *lp.CCMetric) error { if s.client != nil { - _, err := s.encoder.Encode(point) + _, err := s.encoder.Encode(*point) if err != nil { log.Print(err) return err diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index 02421d3..36f98d0 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -20,17 +20,17 @@ var AvailableSinks = map[string]Sink{ // Metric collector manager data structure type sinkManager struct { - input chan lp.CCMetric // input channel - outputs []Sink // List of sinks to use - done chan bool // channel to finish / stop metric sink manager - wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector - config []sinkConfig // json encoded config for sink manager + input chan *lp.CCMetric // input channel + outputs []Sink // List of sinks to use + done chan bool // channel to finish / stop metric sink manager + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + config []sinkConfig // json encoded config for sink manager } // Sink manager access functions type SinkManager interface { Init(wg *sync.WaitGroup, sinkConfigFile string) error - AddInput(input chan lp.CCMetric) + AddInput(input chan *lp.CCMetric) AddOutput(config json.RawMessage) error Start() Close() @@ -94,7 +94,7 @@ func (sm *sinkManager) Start() { case p := <-sm.input: // Send received metric to all outputs - cclog.ComponentDebug("SinkManager", "WRITE", p) + cclog.ComponentDebug("SinkManager", "WRITE", *p) for _, s := range sm.outputs { s.Write(p) } @@ -117,7 +117,7 @@ func (sm *sinkManager) Start() { } // AddInput adds the input channel to the sink manager -func (sm *sinkManager) AddInput(input chan lp.CCMetric) { +func (sm *sinkManager) AddInput(input chan *lp.CCMetric) { sm.input = input } diff --git a/sinks/stdoutSink.go b/sinks/stdoutSink.go index 215239f..d5e6819 100644 --- a/sinks/stdoutSink.go +++ b/sinks/stdoutSink.go @@ -19,9 +19,10 @@ func (s *StdoutSink) Init(config sinkConfig) error { return nil } -func (s *StdoutSink) Write(point lp.CCMetric) error { +func (s *StdoutSink) Write(pptr *lp.CCMetric) error { var tagsstr []string var fieldstr []string + point := *pptr for _, t := range point.TagList() { tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", t.Key, t.Value)) }