From 88fabc2e83bdc55fad8430a99d02c1a35ba4cfe3 Mon Sep 17 00:00:00 2001 From: oscarminus Date: Wed, 7 Sep 2022 14:09:29 +0200 Subject: [PATCH] cpustatMetric.go: Use derived values instead of absolute values (#83) * cpustatMetric.go: Use derived values instead of absolute values The values in /proc/stat are absolute counters related to the boot time of the system. To obtain a utilization of the CPU, the changes in the counters must be derived according to time. To take only the absolute values leads to the fact that changes in the utilization, straight with larger values, do not become visible. * Add new collector for /proc/schedstat The `schedstat` collector reads data from /proc/schedstat and calculates a load value, separated by hwthread. This might be useful to detect bad cpu pinning on shared nodes etc. Co-authored-by: Michael Schwarz --- collectors/collectorManager.go | 1 + collectors/cpustatMetric.go | 40 ++++++--- collectors/schedstatMetric.go | 155 +++++++++++++++++++++++++++++++++ collectors/schedstatMetric.md | 11 +++ 4 files changed, 197 insertions(+), 10 deletions(-) create mode 100644 collectors/schedstatMetric.go create mode 100644 collectors/schedstatMetric.md diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index 49a9db8..63d0cb4 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -37,6 +37,7 @@ var AvailableCollectors = map[string]MetricCollector{ "beegfs_meta": new(BeegfsMetaCollector), "beegfs_storage": new(BeegfsStorageCollector), "rocm_smi": new(RocmSmiCollector), + "schedstat": new(SchedstatCollector), } // Metric collector manager data structure diff --git a/collectors/cpustatMetric.go b/collectors/cpustatMetric.go index c0dcf13..3c09b83 100644 --- a/collectors/cpustatMetric.go +++ b/collectors/cpustatMetric.go @@ -11,6 +11,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + sysconf "github.com/tklauser/go-sysconf" ) const CPUSTATFILE = `/proc/stat` @@ -22,9 +23,11 @@ type CpustatCollectorConfig struct { type CpustatCollector struct { metricCollector config CpustatCollectorConfig + lastTimestamp time.Time // Store time stamp of last tick to derive values matches map[string]int cputags map[string]map[string]string nodetags map[string]string + olddata map[string]map[string]int64 } func (m *CpustatCollector) Init(config json.RawMessage) error { @@ -76,36 +79,48 @@ func (m *CpustatCollector) Init(config json.RawMessage) error { // Pre-generate tags for all CPUs num_cpus := 0 m.cputags = make(map[string]map[string]string) + m.olddata = make(map[string]map[string]int64) scanner := bufio.NewScanner(file) for scanner.Scan() { line := scanner.Text() linefields := strings.Fields(line) - if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 { + if strings.Compare(linefields[0], "cpu") == 0 { + m.olddata["cpu"] = make(map[string]int64) + for k, v := range m.matches { + m.olddata["cpu"][k], _ = strconv.ParseInt(linefields[v], 0, 64) + } + } else if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 { cpustr := strings.TrimLeft(linefields[0], "cpu") cpu, _ := strconv.Atoi(cpustr) m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)} + m.olddata[linefields[0]] = make(map[string]int64) + for k, v := range m.matches { + m.olddata[linefields[0]][k], _ = strconv.ParseInt(linefields[v], 0, 64) + } num_cpus++ } } + m.lastTimestamp = time.Now() m.init = true return nil } -func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]string, output chan lp.CCMetric) { +func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]string, output chan lp.CCMetric, now time.Time, tsdelta time.Duration) { values := make(map[string]float64) - total := 0.0 + clktck, _ := sysconf.Sysconf(sysconf.SC_CLK_TCK) for match, index := range m.matches { if len(match) > 0 { x, err := strconv.ParseInt(linefields[index], 0, 64) if err == nil { - values[match] = float64(x) - total += values[match] + vdiff := x - m.olddata[linefields[0]][match] + m.olddata[linefields[0]][match] = x // Store new value for next run + values[match] = float64(vdiff) / float64(tsdelta.Seconds()) / float64(clktck) } } } - t := time.Now() + for name, value := range values { - y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": (value * 100.0) / total}, t) + y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value * 100}, now) if err == nil { output <- y } @@ -117,6 +132,9 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric) return } num_cpus := 0 + now := time.Now() + tsdelta := now.Sub(m.lastTimestamp) + file, err := os.Open(string(CPUSTATFILE)) if err != nil { cclog.ComponentError(m.name, err.Error()) @@ -128,9 +146,9 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric) line := scanner.Text() linefields := strings.Fields(line) if strings.Compare(linefields[0], "cpu") == 0 { - m.parseStatLine(linefields, m.nodetags, output) + m.parseStatLine(linefields, m.nodetags, output, now, tsdelta) } else if strings.HasPrefix(linefields[0], "cpu") { - m.parseStatLine(linefields, m.cputags[linefields[0]], output) + m.parseStatLine(linefields, m.cputags[linefields[0]], output, now, tsdelta) num_cpus++ } } @@ -139,11 +157,13 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric) m.nodetags, m.meta, map[string]interface{}{"value": int(num_cpus)}, - time.Now(), + now, ) if err == nil { output <- num_cpus_metric } + + m.lastTimestamp = now } func (m *CpustatCollector) Close() { diff --git a/collectors/schedstatMetric.go b/collectors/schedstatMetric.go new file mode 100644 index 0000000..e3041ae --- /dev/null +++ b/collectors/schedstatMetric.go @@ -0,0 +1,155 @@ +package collectors + +import ( + "encoding/json" + "fmt" + "bufio" + "time" + "os" + "strings" + "strconv" + "math" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" +) + +const SCHEDSTATFILE = `/proc/schedstat` + +// These are the fields we read from the JSON configuration +type SchedstatCollectorConfig struct { + ExcludeMetrics []string `json:"exclude_metrics,omitempty"` +} + +// This contains all variables we need during execution and the variables +// defined by metricCollector (name, init, ...) +type SchedstatCollector struct { + metricCollector + config SchedstatCollectorConfig // the configuration structure + lastTimestamp time.Time // Store time stamp of last tick to derive values + meta map[string]string // default meta information + cputags map[string]map[string]string // default tags + olddata map[string]map[string]int64 // default tags +} + +// Functions to implement MetricCollector interface +// Init(...), Read(...), Close() +// See: metricCollector.go + +// Init initializes the sample collector +// Called once by the collector manager +// All tags, meta data tags and metrics that do not change over the runtime should be set here +func (m *SchedstatCollector) Init(config json.RawMessage) error { + var err error = nil + // Always set the name early in Init() to use it in cclog.Component* functions + m.name = "SchedstatCollector" + // This is for later use, also call it early + m.setup() + // Tell whether the collector should be run in parallel with others (reading files, ...) + // or it should be run serially, mostly for collectors acutally doing measurements + // because they should not measure the execution of the other collectors + m.parallel = true + // Define meta information sent with each metric + // (Can also be dynamic or this is the basic set with extension through AddMeta()) + m.meta = map[string]string{"source": m.name, "group": "SCHEDSTAT"} + + // Read in the JSON configuration + if len(config) > 0 { + err = json.Unmarshal(config, &m.config) + if err != nil { + cclog.ComponentError(m.name, "Error reading config:", err.Error()) + return err + } + } + + // Check input file + file, err := os.Open(string(SCHEDSTATFILE)) + if err != nil { + cclog.ComponentError(m.name, err.Error()) + } + defer file.Close() + + // Pre-generate tags for all CPUs + num_cpus := 0 + m.cputags = make(map[string]map[string]string) + m.olddata = make(map[string]map[string]int64) + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + linefields := strings.Fields(line) + if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 { + cpustr := strings.TrimLeft(linefields[0], "cpu") + cpu, _ := strconv.Atoi(cpustr) + running, _ := strconv.ParseInt(linefields[7], 10, 64) + waiting, _ := strconv.ParseInt(linefields[8], 10, 64) + m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)} + m.olddata[linefields[0]] = map[string]int64{"running" : running, "waiting" : waiting} + num_cpus++ + } + } + + + // Save current timestamp + m.lastTimestamp = time.Now() + + // Set this flag only if everything is initialized properly, all required files exist, ... + m.init = true + return err +} + +func (m *SchedstatCollector) ParseProcLine(linefields []string, tags map[string]string, output chan lp.CCMetric, now time.Time, tsdelta time.Duration) { + running, _ := strconv.ParseInt(linefields[7], 10, 64) + waiting, _ := strconv.ParseInt(linefields[8], 10, 64) + diff_running := running - m.olddata[linefields[0]]["running"] + diff_waiting := waiting - m.olddata[linefields[0]]["waiting"] + + var l_running float64 = float64(diff_running) / tsdelta.Seconds() / (math.Pow(1000, 3)) + var l_waiting float64 = float64(diff_waiting) / tsdelta.Seconds() / (math.Pow(1000, 3)) + + m.olddata[linefields[0]]["running"] = running + m.olddata[linefields[0]]["waiting"] = waiting + value := l_running + l_waiting + + y, err := lp.New("cpu_load_core", tags, m.meta, map[string]interface{}{"value": value}, now) + if err == nil { + // Send it to output channel + output <- y + } +} + +// Read collects all metrics belonging to the sample collector +// and sends them through the output channel to the collector manager +func (m *SchedstatCollector) Read(interval time.Duration, output chan lp.CCMetric) { + if !m.init { + return + } + + //timestamps + now := time.Now() + tsdelta := now.Sub(m.lastTimestamp) + + file, err := os.Open(string(SCHEDSTATFILE)) + if err != nil { + cclog.ComponentError(m.name, err.Error()) + } + defer file.Close() + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + linefields := strings.Fields(line) + if strings.HasPrefix(linefields[0], "cpu") { + m.ParseProcLine(linefields, m.cputags[linefields[0]], output, now, tsdelta) + } + } + + m.lastTimestamp = now + +} + +// Close metric collector: close network connection, close files, close libraries, ... +// Called once by the collector manager +func (m *SchedstatCollector) Close() { + // Unset flag + m.init = false +} diff --git a/collectors/schedstatMetric.md b/collectors/schedstatMetric.md new file mode 100644 index 0000000..6369eca --- /dev/null +++ b/collectors/schedstatMetric.md @@ -0,0 +1,11 @@ + +## `schedstat` collector +```json + "schedstat": { + } +``` + +The `schedstat` collector reads data from /proc/schedstat and calculates a load value, separated by hwthread. This might be useful to detect bad cpu pinning on shared nodes etc. + +Metric: +* `cpu_load_core` \ No newline at end of file