diff --git a/collectors/beegfsmetaMetric.go b/collectors/beegfsmetaMetric.go index 57b1e39..49b1fbb 100644 --- a/collectors/beegfsmetaMetric.go +++ b/collectors/beegfsmetaMetric.go @@ -16,6 +16,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) const DEFAULT_BEEGFS_CMD = "beegfs-ctl" @@ -29,10 +30,11 @@ type BeegfsMetaCollectorConfig struct { type BeegfsMetaCollector struct { metricCollector - tags map[string]string - matches map[string]string - config BeegfsMetaCollectorConfig - skipFS map[string]struct{} + tags map[string]string + matches map[string]string + config BeegfsMetaCollectorConfig + skipFS map[string]struct{} + statsProcessedMetrics int64 } func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { @@ -105,6 +107,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error { if err != nil { return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err) } + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -218,10 +221,12 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMetr y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now()) if err == nil { output <- y + m.statsProcessedMetrics++ } } } } + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) } func (m *BeegfsMetaCollector) Close() { diff --git a/collectors/beegfsstorageMetric.go b/collectors/beegfsstorageMetric.go index cbc8314..9009bf1 100644 --- a/collectors/beegfsstorageMetric.go +++ b/collectors/beegfsstorageMetric.go @@ -16,6 +16,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) // Struct for the collector-specific JSON config @@ -27,10 +28,11 @@ type BeegfsStorageCollectorConfig struct { type BeegfsStorageCollector struct { metricCollector - tags map[string]string - matches map[string]string - config BeegfsStorageCollectorConfig - skipFS map[string]struct{} + tags map[string]string + matches map[string]string + config BeegfsStorageCollectorConfig + skipFS map[string]struct{} + statsProcessedMetrics int64 } func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { @@ -98,6 +100,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error { if err != nil { return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err) } + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -210,10 +213,12 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now()) if err == nil { output <- y + m.statsProcessedMetrics++ } } } } + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) } func (m *BeegfsStorageCollector) Close() { diff --git a/collectors/cpufreqCpuinfoMetric.go b/collectors/cpufreqCpuinfoMetric.go index 6c3de7a..da2551f 100644 --- a/collectors/cpufreqCpuinfoMetric.go +++ b/collectors/cpufreqCpuinfoMetric.go @@ -12,6 +12,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) // @@ -36,7 +37,8 @@ type CPUFreqCpuInfoCollectorTopology struct { type CPUFreqCpuInfoCollector struct { metricCollector - topology []*CPUFreqCpuInfoCollectorTopology + topology []*CPUFreqCpuInfoCollectorTopology + statsProcessedMetrics int64 } func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { @@ -155,7 +157,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { "package_id": t.physicalPackageID, } } - + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -196,6 +198,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC return } if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": value}, now); err == nil { + m.statsProcessedMetrics++ output <- y } } @@ -203,6 +206,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC } } } + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) } func (m *CPUFreqCpuInfoCollector) Close() { diff --git a/collectors/cpufreqMetric.go b/collectors/cpufreqMetric.go index 0bf6d4c..46b98f3 100644 --- a/collectors/cpufreqMetric.go +++ b/collectors/cpufreqMetric.go @@ -11,6 +11,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" "golang.org/x/sys/unix" ) @@ -39,8 +40,9 @@ type CPUFreqCollectorTopology struct { // type CPUFreqCollector struct { metricCollector - topology []CPUFreqCollectorTopology - config struct { + topology []CPUFreqCollectorTopology + statsProcessedMetrics int64 + config struct { ExcludeMetrics []string `json:"exclude_metrics,omitempty"` } } @@ -166,7 +168,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { "package_id": t.physicalPackageID, } } - + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -203,9 +205,11 @@ func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric) } if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": cpuFreq}, now); err == nil { + m.statsProcessedMetrics++ output <- y } } + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) } func (m *CPUFreqCollector) Close() { diff --git a/collectors/cpustatMetric.go b/collectors/cpustatMetric.go index 556aad4..e710bb5 100644 --- a/collectors/cpustatMetric.go +++ b/collectors/cpustatMetric.go @@ -11,6 +11,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) const CPUSTATFILE = `/proc/stat` @@ -21,10 +22,11 @@ type CpustatCollectorConfig struct { type CpustatCollector struct { metricCollector - config CpustatCollectorConfig - matches map[string]int - cputags map[string]map[string]string - nodetags map[string]string + config CpustatCollectorConfig + matches map[string]int + cputags map[string]map[string]string + nodetags map[string]string + statsProcessedMetrics int64 } func (m *CpustatCollector) Init(config json.RawMessage) error { @@ -86,6 +88,7 @@ func (m *CpustatCollector) Init(config json.RawMessage) error { num_cpus++ } } + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -106,6 +109,7 @@ func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]st for name, value := range values { y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": (value * 100.0) / total}, t) if err == nil { + m.statsProcessedMetrics++ output <- y } } @@ -141,8 +145,10 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric) time.Now(), ) if err == nil { + m.statsProcessedMetrics++ output <- num_cpus_metric } + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) } func (m *CpustatCollector) Close() { diff --git a/collectors/customCmdMetric.go b/collectors/customCmdMetric.go index ec2109b..640e2fc 100644 --- a/collectors/customCmdMetric.go +++ b/collectors/customCmdMetric.go @@ -10,6 +10,7 @@ import ( "time" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" influx "github.com/influxdata/line-protocol" ) @@ -23,11 +24,14 @@ type CustomCmdCollectorConfig struct { type CustomCmdCollector struct { metricCollector - handler *influx.MetricHandler - parser *influx.Parser - config CustomCmdCollectorConfig - commands []string - files []string + handler *influx.MetricHandler + parser *influx.Parser + config CustomCmdCollectorConfig + commands []string + files []string + statsProcessedMetrics int64 + statsProcessedCommands int64 + statsProcessedFiles int64 } func (m *CustomCmdCollector) Init(config json.RawMessage) error { @@ -66,6 +70,9 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error { m.handler = influx.NewMetricHandler() m.parser = influx.NewParser(m.handler) m.parser.SetTimeFunc(DefaultTime) + m.statsProcessedMetrics = 0 + m.statsProcessedFiles = 0 + m.statsProcessedCommands = 0 m.init = true return nil } @@ -100,9 +107,13 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri y := lp.FromInfluxMetric(c) if err == nil { + m.statsProcessedMetrics++ + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) output <- y } } + m.statsProcessedCommands++ + stats.ComponentStatInt(m.name, "processed_commands", m.statsProcessedCommands) } for _, file := range m.files { buffer, err := ioutil.ReadFile(file) @@ -122,9 +133,13 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri } y := lp.FromInfluxMetric(f) if err == nil { + m.statsProcessedMetrics++ + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) output <- y } } + m.statsProcessedFiles++ + stats.ComponentStatInt(m.name, "processed_files", m.statsProcessedFiles) } } diff --git a/collectors/diskstatMetric.go b/collectors/diskstatMetric.go index 16c70ba..560f4c4 100644 --- a/collectors/diskstatMetric.go +++ b/collectors/diskstatMetric.go @@ -11,6 +11,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) // "log" @@ -23,9 +24,8 @@ type DiskstatCollectorConfig struct { type DiskstatCollector struct { metricCollector - //matches map[string]int - config IOstatCollectorConfig - //devices map[string]IOstatCollectorEntry + config DiskstatCollectorConfig + statsProcessedMetrics int64 } func (m *DiskstatCollector) Init(config json.RawMessage) error { @@ -44,6 +44,7 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error { return err } defer file.Close() + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -89,12 +90,16 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric y, err := lp.New("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now()) if err == nil { y.AddMeta("unit", "GBytes") + m.statsProcessedMetrics++ + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) output <- y } free := (stat.Bfree * uint64(stat.Bsize)) / uint64(1000000000) y, err = lp.New("disk_free", tags, m.meta, map[string]interface{}{"value": free}, time.Now()) if err == nil { y.AddMeta("unit", "GBytes") + m.statsProcessedMetrics++ + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) output <- y } perc := (100 * (total - free)) / total @@ -105,6 +110,8 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now()) if err == nil { y.AddMeta("unit", "percent") + m.statsProcessedMetrics++ + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) output <- y } } diff --git a/collectors/gpfsMetric.go b/collectors/gpfsMetric.go index 26fc723..4fad42f 100644 --- a/collectors/gpfsMetric.go +++ b/collectors/gpfsMetric.go @@ -15,6 +15,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) const DEFAULT_GPFS_CMD = "mmpmon" @@ -32,9 +33,10 @@ type GpfsCollector struct { ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"` SendBandwidths bool `json:"send_bandwidths"` } - skipFS map[string]struct{} - lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths - lastState map[string]GpfsCollectorLastState + skipFS map[string]struct{} + lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths + lastState map[string]GpfsCollectorLastState + statsProcessedMetrics int64 } func (m *GpfsCollector) Init(config json.RawMessage) error { @@ -86,7 +88,7 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { return fmt.Errorf("failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err) } m.config.Mmpmon = p - + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -211,12 +213,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { } if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil { output <- y + m.statsProcessedMetrics++ } if m.config.SendBandwidths { if lastBytesRead := m.lastState[filesystem].bytesRead; lastBytesRead >= 0 { bwRead := float64(bytesRead-lastBytesRead) / timeDiff if y, err := lp.New("gpfs_bw_read", m.tags, m.meta, map[string]interface{}{"value": bwRead}, timestamp); err == nil { output <- y + m.statsProcessedMetrics++ } } } @@ -231,12 +235,14 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { } if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil { output <- y + m.statsProcessedMetrics++ } if m.config.SendBandwidths { if lastBytesWritten := m.lastState[filesystem].bytesRead; lastBytesWritten >= 0 { bwWrite := float64(bytesWritten-lastBytesWritten) / timeDiff if y, err := lp.New("gpfs_bw_write", m.tags, m.meta, map[string]interface{}{"value": bwWrite}, timestamp); err == nil { output <- y + m.statsProcessedMetrics++ } } } @@ -258,6 +264,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { } if y, err := lp.New("gpfs_num_opens", m.tags, m.meta, map[string]interface{}{"value": numOpens}, timestamp); err == nil { output <- y + m.statsProcessedMetrics++ } // number of closes @@ -270,6 +277,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { } if y, err := lp.New("gpfs_num_closes", m.tags, m.meta, map[string]interface{}{"value": numCloses}, timestamp); err == nil { output <- y + m.statsProcessedMetrics++ } // number of reads @@ -282,6 +290,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { } if y, err := lp.New("gpfs_num_reads", m.tags, m.meta, map[string]interface{}{"value": numReads}, timestamp); err == nil { output <- y + m.statsProcessedMetrics++ } // number of writes @@ -294,6 +303,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { } if y, err := lp.New("gpfs_num_writes", m.tags, m.meta, map[string]interface{}{"value": numWrites}, timestamp); err == nil { output <- y + m.statsProcessedMetrics++ } // number of read directories @@ -306,6 +316,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { } if y, err := lp.New("gpfs_num_readdirs", m.tags, m.meta, map[string]interface{}{"value": numReaddirs}, timestamp); err == nil { output <- y + m.statsProcessedMetrics++ } // Number of inode updates @@ -317,9 +328,11 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { continue } if y, err := lp.New("gpfs_num_inode_updates", m.tags, m.meta, map[string]interface{}{"value": numInodeUpdates}, timestamp); err == nil { + m.statsProcessedMetrics++ output <- y } } + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) } func (m *GpfsCollector) Close() { diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index 274e669..6d2734a 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -7,6 +7,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" "golang.org/x/sys/unix" "encoding/json" @@ -39,8 +40,9 @@ type InfinibandCollector struct { SendAbsoluteValues bool `json:"send_abs_values"` // Send absolut values as read from sys filesystem SendDerivedValues bool `json:"send_derived_values"` // Send derived values e.g. rates } - info []*InfinibandCollectorInfo - lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths + info []*InfinibandCollectorInfo + lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths + statsProcessedMetrics int64 } // Init initializes the Infiniband collector by walking through files below IB_BASEPATH @@ -149,7 +151,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { if len(m.info) == 0 { return fmt.Errorf("found no IB devices") } - + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -196,6 +198,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil { y.AddMeta("unit", counterDef.unit) output <- y + m.statsProcessedMetrics++ } } @@ -206,6 +209,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": rate}, now); err == nil { y.AddMeta("unit", counterDef.unit+"/sec") output <- y + m.statsProcessedMetrics++ } } // Save current state @@ -214,6 +218,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr } } + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) } func (m *InfinibandCollector) Close() { diff --git a/collectors/iostatMetric.go b/collectors/iostatMetric.go index ca7f33c..994f16d 100644 --- a/collectors/iostatMetric.go +++ b/collectors/iostatMetric.go @@ -6,6 +6,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" // "log" "encoding/json" @@ -29,9 +30,10 @@ type IOstatCollectorEntry struct { type IOstatCollector struct { metricCollector - matches map[string]int - config IOstatCollectorConfig - devices map[string]IOstatCollectorEntry + matches map[string]int + config IOstatCollectorConfig + devices map[string]IOstatCollectorEntry + statsProcessedMetrics int64 } func (m *IOstatCollector) Init(config json.RawMessage) error { @@ -102,6 +104,7 @@ func (m *IOstatCollector) Init(config json.RawMessage) error { lastValues: values, } } + m.statsProcessedMetrics = 0 m.init = true return err } @@ -141,6 +144,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMetric) y, err := lp.New(name, entry.tags, m.meta, map[string]interface{}{"value": int(diff)}, time.Now()) if err == nil { output <- y + m.statsProcessedMetrics++ } } entry.lastValues[name] = x @@ -148,6 +152,7 @@ func (m *IOstatCollector) Read(interval time.Duration, output chan lp.CCMetric) } m.devices[device] = entry } + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) } func (m *IOstatCollector) Close() { diff --git a/collectors/ipmiMetric.go b/collectors/ipmiMetric.go index 16b08ef..928a0a6 100644 --- a/collectors/ipmiMetric.go +++ b/collectors/ipmiMetric.go @@ -11,6 +11,7 @@ import ( "time" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) const IPMITOOL_PATH = `ipmitool` @@ -26,9 +27,10 @@ type IpmiCollector struct { metricCollector //tags map[string]string //matches map[string]string - config IpmiCollectorConfig - ipmitool string - ipmisensors string + config IpmiCollectorConfig + ipmitool string + ipmisensors string + statsProcessedMetrics int64 } func (m *IpmiCollector) Init(config json.RawMessage) error { @@ -56,6 +58,7 @@ func (m *IpmiCollector) Init(config json.RawMessage) error { if len(m.ipmitool) == 0 && len(m.ipmisensors) == 0 { return errors.New("no IPMI reader found") } + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -94,6 +97,7 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) { if err == nil { y.AddMeta("unit", unit) output <- y + m.statsProcessedMetrics++ } } } @@ -123,6 +127,7 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) { y.AddMeta("unit", lv[4]) } output <- y + m.statsProcessedMetrics++ } } } @@ -141,6 +146,7 @@ func (m *IpmiCollector) Read(interval time.Duration, output chan lp.CCMetric) { m.readIpmiSensors(m.config.IpmisensorsPath, output) } } + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) } func (m *IpmiCollector) Close() { diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index d808bad..f515d44 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -28,6 +28,7 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" "github.com/NVIDIA/go-nvml/pkg/dl" ) @@ -72,18 +73,21 @@ type LikwidCollectorConfig struct { type LikwidCollector struct { metricCollector - cpulist []C.int - cpu2tid map[int]int - sock2tid map[int]int - metrics map[C.int]map[string]int - groups []C.int - config LikwidCollectorConfig - gmresults map[int]map[string]float64 - basefreq float64 - running bool - initialized bool - likwidGroups map[C.int]LikwidEventsetConfig - lock sync.Mutex + cpulist []C.int + cpu2tid map[int]int + sock2tid map[int]int + metrics map[C.int]map[string]int + groups []C.int + config LikwidCollectorConfig + gmresults map[int]map[string]float64 + basefreq float64 + running bool + initialized bool + likwidGroups map[C.int]LikwidEventsetConfig + lock sync.Mutex + statsMeasurements int64 + statsProcessedMetrics int64 + statsPublishedMetrics int64 } type LikwidMetric struct { @@ -267,6 +271,9 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { cclog.ComponentError(m.name, err.Error()) return err } + m.statsMeasurements = 0 + m.statsProcessedMetrics = 0 + m.statsPublishedMetrics = 0 m.init = true return nil } @@ -274,6 +281,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { // take a measurement for 'interval' seconds of event set index 'group' func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval time.Duration) (bool, error) { var ret C.int + m.lock.Lock() if m.initialized { ret = C.perfmon_setupCounters(evset.gid) @@ -317,6 +325,8 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t } } m.lock.Unlock() + m.statsMeasurements++ + stats.ComponentStatInt(m.name, "measurements", m.statsMeasurements) return false, nil } @@ -357,6 +367,8 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv if m.config.InvalidToZero && math.IsInf(value, 0) { value = 0.0 } + m.statsProcessedMetrics++ + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) // Now we have the result, send it with the proper tags if !math.IsNaN(value) { if metric.Publish { @@ -369,6 +381,8 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv if len(metric.Unit) > 0 { y.AddMeta("unit", metric.Unit) } + m.statsPublishedMetrics++ + stats.ComponentStatInt(m.name, "published_metrics", m.statsPublishedMetrics) output <- y } } @@ -409,6 +423,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan if m.config.InvalidToZero && math.IsInf(value, 0) { value = 0.0 } + m.statsProcessedMetrics++ + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) // Now we have the result, send it with the proper tags if !math.IsNaN(value) { if metric.Publish { @@ -422,6 +438,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan if len(metric.Unit) > 0 { y.AddMeta("unit", metric.Unit) } + m.statsPublishedMetrics++ + stats.ComponentStatInt(m.name, "published_metrics", m.statsPublishedMetrics) output <- y } } diff --git a/collectors/loadavgMetric.go b/collectors/loadavgMetric.go index 3859721..b6edd07 100644 --- a/collectors/loadavgMetric.go +++ b/collectors/loadavgMetric.go @@ -10,6 +10,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) // @@ -32,6 +33,7 @@ type LoadavgCollector struct { config struct { ExcludeMetrics []string `json:"exclude_metrics,omitempty"` } + statsProcessedMetrics int64 } func (m *LoadavgCollector) Init(config json.RawMessage) error { @@ -63,6 +65,7 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error { for i, name := range m.proc_matches { _, m.proc_skips[i] = stringArrayContains(m.config.ExcludeMetrics, name) } + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -98,6 +101,7 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric) y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, now) if err == nil { output <- y + m.statsProcessedMetrics++ } } @@ -117,9 +121,10 @@ func (m *LoadavgCollector) Read(interval time.Duration, output chan lp.CCMetric) y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, now) if err == nil { output <- y + m.statsProcessedMetrics++ } - } + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) } func (m *LoadavgCollector) Close() { diff --git a/collectors/lustreMetric.go b/collectors/lustreMetric.go index d5a96e4..6bde44b 100644 --- a/collectors/lustreMetric.go +++ b/collectors/lustreMetric.go @@ -12,6 +12,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) const LUSTRE_SYSFS = `/sys/fs/lustre` @@ -37,13 +38,14 @@ type LustreMetricDefinition struct { type LustreCollector struct { metricCollector - tags map[string]string - config LustreCollectorConfig - lctl string - sudoCmd string - lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths - definitions []LustreMetricDefinition // Combined list without excluded metrics - stats map[string]map[string]int64 // Data for last value per device and metric + tags map[string]string + config LustreCollectorConfig + lctl string + sudoCmd string + lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths + definitions []LustreMetricDefinition // Combined list without excluded metrics + stats map[string]map[string]int64 // Data for last value per device and metric + statsProcessedMetrics int64 } func (m *LustreCollector) getDeviceDataCommand(device string) []string { @@ -372,6 +374,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error { } } m.lastTimestamp = time.Now() + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -418,11 +421,13 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric) y.AddMeta("unit", def.unit) } output <- y + m.statsProcessedMetrics++ } devData[def.name] = use_x } } m.lastTimestamp = now + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) } func (m *LustreCollector) Close() { diff --git a/collectors/memstatMetric.go b/collectors/memstatMetric.go index c6c7f34..865ec2e 100644 --- a/collectors/memstatMetric.go +++ b/collectors/memstatMetric.go @@ -14,6 +14,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) const MEMSTATFILE = "/proc/meminfo" @@ -32,12 +33,13 @@ type MemstatCollectorNode struct { type MemstatCollector struct { metricCollector - stats map[string]int64 - tags map[string]string - matches map[string]string - config MemstatCollectorConfig - nodefiles map[int]MemstatCollectorNode - sendMemUsed bool + stats map[string]int64 + tags map[string]string + matches map[string]string + config MemstatCollectorConfig + nodefiles map[int]MemstatCollectorNode + sendMemUsed bool + statsProcessedMetrics int64 } type MemstatStats struct { @@ -153,6 +155,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error { } } } + m.statsProcessedMetrics = 0 m.init = true return err } @@ -178,6 +181,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) if len(unit) > 0 { y.AddMeta("unit", unit) } + m.statsProcessedMetrics++ output <- y } } @@ -207,6 +211,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) if len(unit) > 0 { y.AddMeta("unit", unit) } + m.statsProcessedMetrics++ output <- y } } @@ -223,6 +228,7 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) sendStats(stats, nodeConf.tags) } } + stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics) } func (m *MemstatCollector) Close() { diff --git a/collectors/netstatMetric.go b/collectors/netstatMetric.go index d171d4b..84c9222 100644 --- a/collectors/netstatMetric.go +++ b/collectors/netstatMetric.go @@ -11,6 +11,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) const NETSTATFILE = "/proc/net/dev" @@ -32,9 +33,10 @@ type NetstatCollectorMetric struct { type NetstatCollector struct { metricCollector - config NetstatCollectorConfig - matches map[string][]NetstatCollectorMetric - lastTimestamp time.Time + config NetstatCollectorConfig + matches map[string][]NetstatCollectorMetric + lastTimestamp time.Time + statsProcessedMetrics int64 } func (m *NetstatCollector) Init(config json.RawMessage) error { @@ -148,6 +150,7 @@ func (m *NetstatCollector) Init(config json.RawMessage) error { if len(m.matches) == 0 { return errors.New("no devices to collector metrics found") } + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -198,6 +201,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric) if m.config.SendAbsoluteValues { if y, err := lp.New(metric.name, metric.tags, metric.meta, map[string]interface{}{"value": v}, now); err == nil { output <- y + m.statsProcessedMetrics++ } } if m.config.SendDerivedValues { @@ -205,6 +209,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric) rate := float64(v-metric.lastValue) / timeDiff if y, err := lp.New(metric.name+"_bw", metric.tags, metric.meta_rates, map[string]interface{}{"value": rate}, now); err == nil { output <- y + m.statsProcessedMetrics++ } } metric.lastValue = v @@ -212,6 +217,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric) } } } + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) } func (m *NetstatCollector) Close() { diff --git a/collectors/nfsMetric.go b/collectors/nfsMetric.go index c511b0d..474c740 100644 --- a/collectors/nfsMetric.go +++ b/collectors/nfsMetric.go @@ -12,6 +12,7 @@ import ( "time" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) // First part contains the code for the general NfsCollector. @@ -32,7 +33,8 @@ type nfsCollector struct { Nfsstats string `json:"nfsstat"` ExcludeMetrics []string `json:"exclude_metrics,omitempty"` } - data map[string]NfsCollectorData + data map[string]NfsCollectorData + statsProcessedMetrics int64 } func (m *nfsCollector) initStats() error { @@ -113,6 +115,7 @@ func (m *nfsCollector) MainInit(config json.RawMessage) error { } m.data = make(map[string]NfsCollectorData) m.initStats() + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -143,8 +146,10 @@ func (m *nfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { if err == nil { y.AddMeta("version", m.version) output <- y + m.statsProcessedMetrics++ } } + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) } func (m *nfsCollector) Close() { diff --git a/collectors/numastatsMetric.go b/collectors/numastatsMetric.go index 52a2638..e6edd14 100644 --- a/collectors/numastatsMetric.go +++ b/collectors/numastatsMetric.go @@ -12,6 +12,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) // @@ -44,7 +45,8 @@ type NUMAStatsCollectorTopolgy struct { type NUMAStatsCollector struct { metricCollector - topology []NUMAStatsCollectorTopolgy + topology []NUMAStatsCollectorTopolgy + statsProcessedMetrics int64 } func (m *NUMAStatsCollector) Init(config json.RawMessage) error { @@ -80,7 +82,7 @@ func (m *NUMAStatsCollector) Init(config json.RawMessage) error { tagSet: map[string]string{"memoryDomain": node}, }) } - + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -127,11 +129,13 @@ func (m *NUMAStatsCollector) Read(interval time.Duration, output chan lp.CCMetri ) if err == nil { output <- y + m.statsProcessedMetrics++ } } file.Close() } + stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics) } func (m *NUMAStatsCollector) Close() { diff --git a/collectors/nvidiaMetric.go b/collectors/nvidiaMetric.go index 24f0855..4225efd 100644 --- a/collectors/nvidiaMetric.go +++ b/collectors/nvidiaMetric.go @@ -9,6 +9,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" "github.com/NVIDIA/go-nvml/pkg/nvml" ) @@ -26,9 +27,10 @@ type NvidiaCollectorDevice struct { type NvidiaCollector struct { metricCollector - num_gpus int - config NvidiaCollectorConfig - gpus []NvidiaCollectorDevice + num_gpus int + config NvidiaCollectorConfig + gpus []NvidiaCollectorDevice + statsProcessedMetrics int64 } func (m *NvidiaCollector) CatchPanic() { @@ -120,7 +122,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error { pciInfo.Device) } } - + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -151,6 +153,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y.AddMeta("unit", "%") output <- y + m.statsProcessedMetrics++ } } if !device.excludeMetrics["nv_mem_util"] { @@ -158,6 +161,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y.AddMeta("unit", "%") output <- y + m.statsProcessedMetrics++ } } } @@ -186,6 +190,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y.AddMeta("unit", "MByte") output <- y + m.statsProcessedMetrics++ } } @@ -195,6 +200,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y.AddMeta("unit", "MByte") output <- y + m.statsProcessedMetrics++ } } } @@ -212,6 +218,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y.AddMeta("unit", "degC") output <- y + m.statsProcessedMetrics++ } } } @@ -232,6 +239,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y.AddMeta("unit", "%") output <- y + m.statsProcessedMetrics++ } } } @@ -258,11 +266,13 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) } if err == nil { output <- y + m.statsProcessedMetrics++ } } else if ret == nvml.ERROR_NOT_SUPPORTED { y, err := lp.New("nv_ecc_mode", device.tags, m.meta, map[string]interface{}{"value": "N/A"}, time.Now()) if err == nil { output <- y + m.statsProcessedMetrics++ } } } @@ -280,6 +290,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) y, err := lp.New("nv_perf_state", device.tags, m.meta, map[string]interface{}{"value": fmt.Sprintf("P%d", int(pState))}, time.Now()) if err == nil { output <- y + m.statsProcessedMetrics++ } } } @@ -296,6 +307,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y.AddMeta("unit", "watts") output <- y + m.statsProcessedMetrics++ } } } @@ -313,6 +325,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y.AddMeta("unit", "MHz") output <- y + m.statsProcessedMetrics++ } } } @@ -324,6 +337,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y.AddMeta("unit", "MHz") output <- y + m.statsProcessedMetrics++ } } } @@ -335,6 +349,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y.AddMeta("unit", "MHz") output <- y + m.statsProcessedMetrics++ } } } @@ -357,6 +372,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y.AddMeta("unit", "MHz") output <- y + m.statsProcessedMetrics++ } } } @@ -368,6 +384,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y.AddMeta("unit", "MHz") output <- y + m.statsProcessedMetrics++ } } } @@ -379,6 +396,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y.AddMeta("unit", "MHz") output <- y + m.statsProcessedMetrics++ } } } @@ -398,6 +416,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) y, err := lp.New("nv_ecc_db_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_db)}, time.Now()) if err == nil { output <- y + m.statsProcessedMetrics++ } } } @@ -408,6 +427,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) y, err := lp.New("nv_ecc_sb_error", device.tags, m.meta, map[string]interface{}{"value": float64(ecc_sb)}, time.Now()) if err == nil { output <- y + m.statsProcessedMetrics++ } } } @@ -425,6 +445,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y.AddMeta("unit", "watts") output <- y + m.statsProcessedMetrics++ } } } @@ -441,6 +462,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y.AddMeta("unit", "%") output <- y + m.statsProcessedMetrics++ } } } @@ -457,11 +479,12 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { y.AddMeta("unit", "%") output <- y + m.statsProcessedMetrics++ } } } } - + stats.ComponentStatInt(m.name, "collected_metrics", m.statsProcessedMetrics) } func (m *NvidiaCollector) Close() { diff --git a/collectors/sampleMetric.go b/collectors/sampleMetric.go index 47078a6..837d823 100644 --- a/collectors/sampleMetric.go +++ b/collectors/sampleMetric.go @@ -6,6 +6,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) // These are the fields we read from the JSON configuration @@ -17,9 +18,10 @@ type SampleCollectorConfig struct { // defined by metricCollector (name, init, ...) type SampleCollector struct { metricCollector - config SampleTimerCollectorConfig // the configuration structure - meta map[string]string // default meta information - tags map[string]string // default tags + config SampleTimerCollectorConfig // the configuration structure + meta map[string]string // default meta information + tags map[string]string // default tags + statsCount int64 } // Functions to implement MetricCollector interface @@ -58,6 +60,9 @@ func (m *SampleCollector) Init(config json.RawMessage) error { // for all topological entities (sockets, NUMA domains, ...) // Return some useful error message in case of any failures + // Initialize counts for statistics + m.statsCount = 0 + // Set this flag only if everything is initialized properly, all required files exist, ... m.init = true return err @@ -80,8 +85,11 @@ func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMetric) if err == nil { // Send it to output channel output <- y + // increment count for each sent metric or any other operation + m.statsCount++ } - + // Set stats for the component + stats.ComponentStatInt(m.name, "count", m.statsCount) } // Close metric collector: close network connection, close files, close libraries, ... diff --git a/collectors/tempMetric.go b/collectors/tempMetric.go index 7ba8eb1..0c833a8 100644 --- a/collectors/tempMetric.go +++ b/collectors/tempMetric.go @@ -11,6 +11,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) // See: https://www.kernel.org/doc/html/latest/hwmon/sysfs-interface.html @@ -40,7 +41,8 @@ type TempCollector struct { ReportMaxTemp bool `json:"report_max_temperature"` ReportCriticalTemp bool `json:"report_critical_temperature"` } - sensors []*TempCollectorSensor + sensors []*TempCollectorSensor + statsProcessedMetrics int64 } func (m *TempCollector) Init(config json.RawMessage) error { @@ -162,6 +164,7 @@ func (m *TempCollector) Init(config json.RawMessage) error { } // Finished initialization + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -194,6 +197,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) { ) if err == nil { output <- y + m.statsProcessedMetrics++ } // max temperature @@ -207,6 +211,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) { ) if err == nil { output <- y + m.statsProcessedMetrics++ } } @@ -221,10 +226,11 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) { ) if err == nil { output <- y + m.statsProcessedMetrics++ } } } - + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) } func (m *TempCollector) Close() { diff --git a/collectors/topprocsMetric.go b/collectors/topprocsMetric.go index 408c3cc..f94db88 100644 --- a/collectors/topprocsMetric.go +++ b/collectors/topprocsMetric.go @@ -10,6 +10,7 @@ import ( "time" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) const MAX_NUM_PROCS = 10 @@ -21,8 +22,9 @@ type TopProcsCollectorConfig struct { type TopProcsCollector struct { metricCollector - tags map[string]string - config TopProcsCollectorConfig + tags map[string]string + config TopProcsCollectorConfig + statsProcessedMetrics int64 } func (m *TopProcsCollector) Init(config json.RawMessage) error { @@ -48,6 +50,7 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error { if err != nil { return errors.New("failed to execute command") } + m.statsProcessedMetrics = 0 m.init = true return nil } @@ -70,8 +73,10 @@ func (m *TopProcsCollector) Read(interval time.Duration, output chan lp.CCMetric y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": string(lines[i])}, time.Now()) if err == nil { output <- y + m.statsProcessedMetrics++ } } + stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics) } func (m *TopProcsCollector) Close() {