cpustatMetric.go: Use derived values instead of absolute values (#83)

* cpustatMetric.go: Use derived values instead of absolute values

  The values in /proc/stat are absolute counters related to the boot
  time of the system. To obtain a utilization of the CPU, the changes
  in the counters must be derived according to time. To take only the
  absolute values leads to the fact that changes in the utilization,
  straight with larger values, do not become visible.

* Add new collector for /proc/schedstat

  The `schedstat` collector reads data from /proc/schedstat and calculates
  a load value, separated by hwthread. This might be useful to detect bad
  cpu pinning on shared nodes etc.

Co-authored-by: Michael Schwarz <post@michael-schwarz.name>
This commit is contained in:
oscarminus 2022-09-07 14:09:29 +02:00 committed by Thomas Roehl
parent 503705d442
commit 8a3446a596
4 changed files with 197 additions and 10 deletions

View File

@ -37,6 +37,7 @@ var AvailableCollectors = map[string]MetricCollector{
"beegfs_meta": new(BeegfsMetaCollector), "beegfs_meta": new(BeegfsMetaCollector),
"beegfs_storage": new(BeegfsStorageCollector), "beegfs_storage": new(BeegfsStorageCollector),
"rocm_smi": new(RocmSmiCollector), "rocm_smi": new(RocmSmiCollector),
"schedstat": new(SchedstatCollector),
} }
// Metric collector manager data structure // Metric collector manager data structure

View File

@ -11,6 +11,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
sysconf "github.com/tklauser/go-sysconf"
) )
const CPUSTATFILE = `/proc/stat` const CPUSTATFILE = `/proc/stat`
@ -22,9 +23,11 @@ type CpustatCollectorConfig struct {
type CpustatCollector struct { type CpustatCollector struct {
metricCollector metricCollector
config CpustatCollectorConfig config CpustatCollectorConfig
lastTimestamp time.Time // Store time stamp of last tick to derive values
matches map[string]int matches map[string]int
cputags map[string]map[string]string cputags map[string]map[string]string
nodetags map[string]string nodetags map[string]string
olddata map[string]map[string]int64
} }
func (m *CpustatCollector) Init(config json.RawMessage) error { func (m *CpustatCollector) Init(config json.RawMessage) error {
@ -76,36 +79,48 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
// Pre-generate tags for all CPUs // Pre-generate tags for all CPUs
num_cpus := 0 num_cpus := 0
m.cputags = make(map[string]map[string]string) m.cputags = make(map[string]map[string]string)
m.olddata = make(map[string]map[string]int64)
scanner := bufio.NewScanner(file) scanner := bufio.NewScanner(file)
for scanner.Scan() { for scanner.Scan() {
line := scanner.Text() line := scanner.Text()
linefields := strings.Fields(line) linefields := strings.Fields(line)
if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 { if strings.Compare(linefields[0], "cpu") == 0 {
m.olddata["cpu"] = make(map[string]int64)
for k, v := range m.matches {
m.olddata["cpu"][k], _ = strconv.ParseInt(linefields[v], 0, 64)
}
} else if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 {
cpustr := strings.TrimLeft(linefields[0], "cpu") cpustr := strings.TrimLeft(linefields[0], "cpu")
cpu, _ := strconv.Atoi(cpustr) cpu, _ := strconv.Atoi(cpustr)
m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)} m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)}
m.olddata[linefields[0]] = make(map[string]int64)
for k, v := range m.matches {
m.olddata[linefields[0]][k], _ = strconv.ParseInt(linefields[v], 0, 64)
}
num_cpus++ num_cpus++
} }
} }
m.lastTimestamp = time.Now()
m.init = true m.init = true
return nil return nil
} }
func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]string, output chan lp.CCMetric) { func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]string, output chan lp.CCMetric, now time.Time, tsdelta time.Duration) {
values := make(map[string]float64) values := make(map[string]float64)
total := 0.0 clktck, _ := sysconf.Sysconf(sysconf.SC_CLK_TCK)
for match, index := range m.matches { for match, index := range m.matches {
if len(match) > 0 { if len(match) > 0 {
x, err := strconv.ParseInt(linefields[index], 0, 64) x, err := strconv.ParseInt(linefields[index], 0, 64)
if err == nil { if err == nil {
values[match] = float64(x) vdiff := x - m.olddata[linefields[0]][match]
total += values[match] m.olddata[linefields[0]][match] = x // Store new value for next run
values[match] = float64(vdiff) / float64(tsdelta.Seconds()) / float64(clktck)
} }
} }
} }
t := time.Now()
for name, value := range values { for name, value := range values {
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": (value * 100.0) / total}, t) y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value * 100}, now)
if err == nil { if err == nil {
output <- y output <- y
} }
@ -117,6 +132,9 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric)
return return
} }
num_cpus := 0 num_cpus := 0
now := time.Now()
tsdelta := now.Sub(m.lastTimestamp)
file, err := os.Open(string(CPUSTATFILE)) file, err := os.Open(string(CPUSTATFILE))
if err != nil { if err != nil {
cclog.ComponentError(m.name, err.Error()) cclog.ComponentError(m.name, err.Error())
@ -128,9 +146,9 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric)
line := scanner.Text() line := scanner.Text()
linefields := strings.Fields(line) linefields := strings.Fields(line)
if strings.Compare(linefields[0], "cpu") == 0 { if strings.Compare(linefields[0], "cpu") == 0 {
m.parseStatLine(linefields, m.nodetags, output) m.parseStatLine(linefields, m.nodetags, output, now, tsdelta)
} else if strings.HasPrefix(linefields[0], "cpu") { } else if strings.HasPrefix(linefields[0], "cpu") {
m.parseStatLine(linefields, m.cputags[linefields[0]], output) m.parseStatLine(linefields, m.cputags[linefields[0]], output, now, tsdelta)
num_cpus++ num_cpus++
} }
} }
@ -139,11 +157,13 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric)
m.nodetags, m.nodetags,
m.meta, m.meta,
map[string]interface{}{"value": int(num_cpus)}, map[string]interface{}{"value": int(num_cpus)},
time.Now(), now,
) )
if err == nil { if err == nil {
output <- num_cpus_metric output <- num_cpus_metric
} }
m.lastTimestamp = now
} }
func (m *CpustatCollector) Close() { func (m *CpustatCollector) Close() {

View File

@ -0,0 +1,155 @@
package collectors
import (
"encoding/json"
"fmt"
"bufio"
"time"
"os"
"strings"
"strconv"
"math"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
const SCHEDSTATFILE = `/proc/schedstat`
// These are the fields we read from the JSON configuration
type SchedstatCollectorConfig struct {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
}
// This contains all variables we need during execution and the variables
// defined by metricCollector (name, init, ...)
type SchedstatCollector struct {
metricCollector
config SchedstatCollectorConfig // the configuration structure
lastTimestamp time.Time // Store time stamp of last tick to derive values
meta map[string]string // default meta information
cputags map[string]map[string]string // default tags
olddata map[string]map[string]int64 // default tags
}
// Functions to implement MetricCollector interface
// Init(...), Read(...), Close()
// See: metricCollector.go
// Init initializes the sample collector
// Called once by the collector manager
// All tags, meta data tags and metrics that do not change over the runtime should be set here
func (m *SchedstatCollector) Init(config json.RawMessage) error {
var err error = nil
// Always set the name early in Init() to use it in cclog.Component* functions
m.name = "SchedstatCollector"
// This is for later use, also call it early
m.setup()
// Tell whether the collector should be run in parallel with others (reading files, ...)
// or it should be run serially, mostly for collectors acutally doing measurements
// because they should not measure the execution of the other collectors
m.parallel = true
// Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{"source": m.name, "group": "SCHEDSTAT"}
// Read in the JSON configuration
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
}
}
// Check input file
file, err := os.Open(string(SCHEDSTATFILE))
if err != nil {
cclog.ComponentError(m.name, err.Error())
}
defer file.Close()
// Pre-generate tags for all CPUs
num_cpus := 0
m.cputags = make(map[string]map[string]string)
m.olddata = make(map[string]map[string]int64)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
linefields := strings.Fields(line)
if strings.HasPrefix(linefields[0], "cpu") && strings.Compare(linefields[0], "cpu") != 0 {
cpustr := strings.TrimLeft(linefields[0], "cpu")
cpu, _ := strconv.Atoi(cpustr)
running, _ := strconv.ParseInt(linefields[7], 10, 64)
waiting, _ := strconv.ParseInt(linefields[8], 10, 64)
m.cputags[linefields[0]] = map[string]string{"type": "hwthread", "type-id": fmt.Sprintf("%d", cpu)}
m.olddata[linefields[0]] = map[string]int64{"running" : running, "waiting" : waiting}
num_cpus++
}
}
// Save current timestamp
m.lastTimestamp = time.Now()
// Set this flag only if everything is initialized properly, all required files exist, ...
m.init = true
return err
}
func (m *SchedstatCollector) ParseProcLine(linefields []string, tags map[string]string, output chan lp.CCMetric, now time.Time, tsdelta time.Duration) {
running, _ := strconv.ParseInt(linefields[7], 10, 64)
waiting, _ := strconv.ParseInt(linefields[8], 10, 64)
diff_running := running - m.olddata[linefields[0]]["running"]
diff_waiting := waiting - m.olddata[linefields[0]]["waiting"]
var l_running float64 = float64(diff_running) / tsdelta.Seconds() / (math.Pow(1000, 3))
var l_waiting float64 = float64(diff_waiting) / tsdelta.Seconds() / (math.Pow(1000, 3))
m.olddata[linefields[0]]["running"] = running
m.olddata[linefields[0]]["waiting"] = waiting
value := l_running + l_waiting
y, err := lp.New("cpu_load_core", tags, m.meta, map[string]interface{}{"value": value}, now)
if err == nil {
// Send it to output channel
output <- y
}
}
// Read collects all metrics belonging to the sample collector
// and sends them through the output channel to the collector manager
func (m *SchedstatCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if !m.init {
return
}
//timestamps
now := time.Now()
tsdelta := now.Sub(m.lastTimestamp)
file, err := os.Open(string(SCHEDSTATFILE))
if err != nil {
cclog.ComponentError(m.name, err.Error())
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
linefields := strings.Fields(line)
if strings.HasPrefix(linefields[0], "cpu") {
m.ParseProcLine(linefields, m.cputags[linefields[0]], output, now, tsdelta)
}
}
m.lastTimestamp = now
}
// Close metric collector: close network connection, close files, close libraries, ...
// Called once by the collector manager
func (m *SchedstatCollector) Close() {
// Unset flag
m.init = false
}

View File

@ -0,0 +1,11 @@
## `schedstat` collector
```json
"schedstat": {
}
```
The `schedstat` collector reads data from /proc/schedstat and calculates a load value, separated by hwthread. This might be useful to detect bad cpu pinning on shared nodes etc.
Metric:
* `cpu_load_core`