diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index 49a9db8..63d0cb4 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -37,6 +37,7 @@ var AvailableCollectors = map[string]MetricCollector{ "beegfs_meta": new(BeegfsMetaCollector), "beegfs_storage": new(BeegfsStorageCollector), "rocm_smi": new(RocmSmiCollector), + "schedstat": new(SchedstatCollector), } // Metric collector manager data structure diff --git a/collectors/cpustatMetric.go b/collectors/cpustatMetric.go index c0dcf13..3c09b83 100644 --- a/collectors/cpustatMetric.go +++ b/collectors/cpustatMetric.go @@ -11,6 +11,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + sysconf "github.com/tklauser/go-sysconf" ) const CPUSTATFILE = `/proc/stat` @@ -22,9 +23,11 @@ type CpustatCollectorConfig struct { type CpustatCollector struct { metricCollector config CpustatCollectorConfig + lastTimestamp time.Time // Store time stamp of last tick to derive values matches map[string]int cputags map[string]map[string]string nodetags map[string]string + olddata map[string]map[string]int64 } func (m *CpustatCollector) Init(config json.RawMessage) error { @@ -76,36 +79,48 @@ func (m *CpustatCollector) Init(config json.RawMessage) error { // Pre-generate tags for all CPUs num_cpus := 0 m.cputags = make(map[string]map[string]string) + m.olddata = make(map[string]map[string]int64) scanner := bufio.NewScanner(file) for scanner.Scan() { line := scanner.Text() linefields := strings.Fields(line) - if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 { + if strings.Compare(linefields[0], "cpu") == 0 { + m.olddata["cpu"] = make(map[string]int64) + for k, v := range m.matches { + m.olddata["cpu"][k], _ = strconv.ParseInt(linefields[v], 0, 64) + } + } else if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 { cpustr := strings.TrimLeft(linefields[0], "cpu") cpu, _ := strconv.Atoi(cpustr) m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)} + m.olddata[linefields[0]] = make(map[string]int64) + for k, v := range m.matches { + m.olddata[linefields[0]][k], _ = strconv.ParseInt(linefields[v], 0, 64) + } num_cpus++ } } + m.lastTimestamp = time.Now() m.init = true return nil } -func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]string, output chan lp.CCMetric) { +func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]string, output chan lp.CCMetric, now time.Time, tsdelta time.Duration) { values := make(map[string]float64) - total := 0.0 + clktck, _ := sysconf.Sysconf(sysconf.SC_CLK_TCK) for match, index := range m.matches { if len(match) > 0 { x, err := strconv.ParseInt(linefields[index], 0, 64) if err == nil { - values[match] = float64(x) - total += values[match] + vdiff := x - m.olddata[linefields[0]][match] + m.olddata[linefields[0]][match] = x // Store new value for next run + values[match] = float64(vdiff) / float64(tsdelta.Seconds()) / float64(clktck) } } } - t := time.Now() + for name, value := range values { - y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": (value * 100.0) / total}, t) + y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value * 100}, now) if err == nil { output <- y } @@ -117,6 +132,9 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric) return } num_cpus := 0 + now := time.Now() + tsdelta := now.Sub(m.lastTimestamp) + file, err := os.Open(string(CPUSTATFILE)) if err != nil { cclog.ComponentError(m.name, err.Error()) @@ -128,9 +146,9 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric) line := scanner.Text() linefields := strings.Fields(line) if strings.Compare(linefields[0], "cpu") == 0 { - m.parseStatLine(linefields, m.nodetags, output) + m.parseStatLine(linefields, m.nodetags, output, now, tsdelta) } else if strings.HasPrefix(linefields[0], "cpu") { - m.parseStatLine(linefields, m.cputags[linefields[0]], output) + m.parseStatLine(linefields, m.cputags[linefields[0]], output, now, tsdelta) num_cpus++ } } @@ -139,11 +157,13 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric) m.nodetags, m.meta, map[string]interface{}{"value": int(num_cpus)}, - time.Now(), + now, ) if err == nil { output <- num_cpus_metric } + + m.lastTimestamp = now } func (m *CpustatCollector) Close() { diff --git a/collectors/schedstatMetric.go b/collectors/schedstatMetric.go new file mode 100644 index 0000000..e3041ae --- /dev/null +++ b/collectors/schedstatMetric.go @@ -0,0 +1,155 @@ +package collectors + +import ( + "encoding/json" + "fmt" + "bufio" + "time" + "os" + "strings" + "strconv" + "math" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" +) + +const SCHEDSTATFILE = `/proc/schedstat` + +// These are the fields we read from the JSON configuration +type SchedstatCollectorConfig struct { + ExcludeMetrics []string `json:"exclude_metrics,omitempty"` +} + +// This contains all variables we need during execution and the variables +// defined by metricCollector (name, init, ...) +type SchedstatCollector struct { + metricCollector + config SchedstatCollectorConfig // the configuration structure + lastTimestamp time.Time // Store time stamp of last tick to derive values + meta map[string]string // default meta information + cputags map[string]map[string]string // default tags + olddata map[string]map[string]int64 // default tags +} + +// Functions to implement MetricCollector interface +// Init(...), Read(...), Close() +// See: metricCollector.go + +// Init initializes the sample collector +// Called once by the collector manager +// All tags, meta data tags and metrics that do not change over the runtime should be set here +func (m *SchedstatCollector) Init(config json.RawMessage) error { + var err error = nil + // Always set the name early in Init() to use it in cclog.Component* functions + m.name = "SchedstatCollector" + // This is for later use, also call it early + m.setup() + // Tell whether the collector should be run in parallel with others (reading files, ...) + // or it should be run serially, mostly for collectors acutally doing measurements + // because they should not measure the execution of the other collectors + m.parallel = true + // Define meta information sent with each metric + // (Can also be dynamic or this is the basic set with extension through AddMeta()) + m.meta = map[string]string{"source": m.name, "group": "SCHEDSTAT"} + + // Read in the JSON configuration + if len(config) > 0 { + err = json.Unmarshal(config, &m.config) + if err != nil { + cclog.ComponentError(m.name, "Error reading config:", err.Error()) + return err + } + } + + // Check input file + file, err := os.Open(string(SCHEDSTATFILE)) + if err != nil { + cclog.ComponentError(m.name, err.Error()) + } + defer file.Close() + + // Pre-generate tags for all CPUs + num_cpus := 0 + m.cputags = make(map[string]map[string]string) + m.olddata = make(map[string]map[string]int64) + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + linefields := strings.Fields(line) + if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 { + cpustr := strings.TrimLeft(linefields[0], "cpu") + cpu, _ := strconv.Atoi(cpustr) + running, _ := strconv.ParseInt(linefields[7], 10, 64) + waiting, _ := strconv.ParseInt(linefields[8], 10, 64) + m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)} + m.olddata[linefields[0]] = map[string]int64{"running" : running, "waiting" : waiting} + num_cpus++ + } + } + + + // Save current timestamp + m.lastTimestamp = time.Now() + + // Set this flag only if everything is initialized properly, all required files exist, ... + m.init = true + return err +} + +func (m *SchedstatCollector) ParseProcLine(linefields []string, tags map[string]string, output chan lp.CCMetric, now time.Time, tsdelta time.Duration) { + running, _ := strconv.ParseInt(linefields[7], 10, 64) + waiting, _ := strconv.ParseInt(linefields[8], 10, 64) + diff_running := running - m.olddata[linefields[0]]["running"] + diff_waiting := waiting - m.olddata[linefields[0]]["waiting"] + + var l_running float64 = float64(diff_running) / tsdelta.Seconds() / (math.Pow(1000, 3)) + var l_waiting float64 = float64(diff_waiting) / tsdelta.Seconds() / (math.Pow(1000, 3)) + + m.olddata[linefields[0]]["running"] = running + m.olddata[linefields[0]]["waiting"] = waiting + value := l_running + l_waiting + + y, err := lp.New("cpu_load_core", tags, m.meta, map[string]interface{}{"value": value}, now) + if err == nil { + // Send it to output channel + output <- y + } +} + +// Read collects all metrics belonging to the sample collector +// and sends them through the output channel to the collector manager +func (m *SchedstatCollector) Read(interval time.Duration, output chan lp.CCMetric) { + if !m.init { + return + } + + //timestamps + now := time.Now() + tsdelta := now.Sub(m.lastTimestamp) + + file, err := os.Open(string(SCHEDSTATFILE)) + if err != nil { + cclog.ComponentError(m.name, err.Error()) + } + defer file.Close() + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + linefields := strings.Fields(line) + if strings.HasPrefix(linefields[0], "cpu") { + m.ParseProcLine(linefields, m.cputags[linefields[0]], output, now, tsdelta) + } + } + + m.lastTimestamp = now + +} + +// Close metric collector: close network connection, close files, close libraries, ... +// Called once by the collector manager +func (m *SchedstatCollector) Close() { + // Unset flag + m.init = false +} diff --git a/collectors/schedstatMetric.md b/collectors/schedstatMetric.md new file mode 100644 index 0000000..6369eca --- /dev/null +++ b/collectors/schedstatMetric.md @@ -0,0 +1,11 @@ + +## `schedstat` collector +```json + "schedstat": { + } +``` + +The `schedstat` collector reads data from /proc/schedstat and calculates a load value, separated by hwthread. This might be useful to detect bad cpu pinning on shared nodes etc. + +Metric: +* `cpu_load_core` \ No newline at end of file