Run LIKWID in separate thread and check metric type

This commit is contained in:
Thomas Roehl 2022-12-12 16:39:55 +01:00
parent 5918f96fd8
commit a6699bbe75

View File

@ -28,6 +28,7 @@ import (
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology" topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
"github.com/NVIDIA/go-nvml/pkg/dl" "github.com/NVIDIA/go-nvml/pkg/dl"
"golang.design/x/thread"
) )
const ( const (
@ -71,18 +72,19 @@ type LikwidCollectorConfig struct {
type LikwidCollector struct { type LikwidCollector struct {
metricCollector metricCollector
cpulist []C.int cpulist []C.int
cpu2tid map[int]int cpu2tid map[int]int
sock2tid map[int]int sock2tid map[int]int
metrics map[C.int]map[string]int metrics map[C.int]map[string]int
groups []C.int groups []C.int
config LikwidCollectorConfig config LikwidCollectorConfig
gmresults map[int]map[string]float64 gmresults map[int]map[string]float64
basefreq float64 basefreq float64
running bool running bool
initialized bool initialized bool
likwidGroups map[C.int]LikwidEventsetConfig likwidGroups map[C.int]LikwidEventsetConfig
lock sync.Mutex lock sync.Mutex
measureThread thread.Thread
} }
type LikwidMetric struct { type LikwidMetric struct {
@ -92,6 +94,18 @@ type LikwidMetric struct {
group_idx int group_idx int
} }
func checkMetricType(t string) bool {
valid := map[string]bool{
"node": true,
"socket": true,
"hwthread": true,
"core": true,
"memoryDomain": true,
}
_, ok := valid[t]
return ok
}
func eventsToEventStr(events map[string]string) string { func eventsToEventStr(events map[string]string) string {
elist := make([]string, 0) elist := make([]string, 0)
for k, v := range events { for k, v := range events {
@ -239,7 +253,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
} }
for _, metric := range evset.Metrics { for _, metric := range evset.Metrics {
// Try to evaluate the metric // Try to evaluate the metric
if testLikwidMetricFormula(metric.Calc, params) { if testLikwidMetricFormula(metric.Calc, params) && checkMetricType(metric.Type) {
// Add the computable metric to the parameter list for the global metrics // Add the computable metric to the parameter list for the global metrics
globalParams = append(globalParams, metric.Name) globalParams = append(globalParams, metric.Name)
totalMetrics++ totalMetrics++
@ -257,6 +271,9 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
if !testLikwidMetricFormula(metric.Calc, globalParams) { if !testLikwidMetricFormula(metric.Calc, globalParams) {
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed") cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed")
metric.Calc = "" metric.Calc = ""
} else if !checkMetricType(metric.Type) {
cclog.ComponentError(m.name, "Metric", metric.Name, "has invalid type")
metric.Calc = ""
} else { } else {
totalMetrics++ totalMetrics++
} }
@ -268,6 +285,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
cclog.ComponentError(m.name, err.Error()) cclog.ComponentError(m.name, err.Error())
return err return err
} }
m.measureThread = thread.New()
m.init = true m.init = true
return nil return nil
} }
@ -289,6 +307,7 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
m.lock.Unlock() m.lock.Unlock()
return skip, err return skip, err
} }
m.running = true
ret = C.perfmon_startCounters() ret = C.perfmon_startCounters()
if ret != 0 { if ret != 0 {
var err error = nil var err error = nil
@ -301,7 +320,7 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
m.lock.Unlock() m.lock.Unlock()
return skip, err return skip, err
} }
m.running = true ret = C.perfmon_readCounters()
time.Sleep(interval) time.Sleep(interval)
m.running = false m.running = false
ret = C.perfmon_stopCounters() ret = C.perfmon_stopCounters()
@ -316,6 +335,25 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
m.lock.Unlock() m.lock.Unlock()
return skip, err return skip, err
} }
m.running = false
runtime := float64(C.perfmon_getLastTimeOfGroup(evset.gid))
// Go over events and get the results
for eidx, counter := range evset.eorder {
gctr := C.GoString(counter)
for _, tid := range m.cpu2tid {
res := C.perfmon_getLastResult(evset.gid, C.int(eidx), C.int(tid))
fres := float64(res)
if m.config.InvalidToZero && (math.IsNaN(fres) || math.IsInf(fres, 0)) {
cclog.ComponentDebug(m.name, "Sanitize", gctr, "to zero")
fres = 0.0
}
evset.results[tid][gctr] = fres
}
}
for _, tid := range m.cpu2tid {
evset.results[tid]["time"] = runtime
}
} }
m.lock.Unlock() m.lock.Unlock()
return false, nil return false, nil
@ -325,19 +363,8 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interval time.Duration, output chan lp.CCMetric) error { func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interval time.Duration, output chan lp.CCMetric) error {
invClock := float64(1.0 / m.basefreq) invClock := float64(1.0 / m.basefreq)
// Go over events and get the results for _, tid := range m.cpu2tid {
for eidx, counter := range evset.eorder { evset.results[tid]["inverseClock"] = invClock
gctr := C.GoString(counter)
for _, tid := range m.cpu2tid {
res := C.perfmon_getLastResult(evset.gid, C.int(eidx), C.int(tid))
fres := float64(res)
if m.config.InvalidToZero && (math.IsNaN(fres) || math.IsInf(fres, 0)) {
fres = 0.0
}
evset.results[tid][gctr] = fres
evset.results[tid]["time"] = interval.Seconds()
evset.results[tid]["inverseClock"] = invClock
}
} }
// Go over the event set metrics, derive the value out of the event:counter values and send it // Go over the event set metrics, derive the value out of the event:counter values and send it
@ -445,6 +472,9 @@ func (m *LikwidCollector) LateInit() error {
os.Setenv("PATH", m.config.DaemonPath+":"+p) os.Setenv("PATH", m.config.DaemonPath+":"+p)
} }
C.HPMmode(1) C.HPMmode(1)
for _, c := range m.cpulist {
C.HPMaddThread(c)
}
} }
cclog.ComponentDebug(m.name, "initialize LIKWID topology") cclog.ComponentDebug(m.name, "initialize LIKWID topology")
ret = C.topology_init() ret = C.topology_init()
@ -540,38 +570,44 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric)
return return
} }
if !m.initialized { m.measureThread.Call(func() {
m.lock.Lock() if !m.initialized {
err = m.LateInit() m.lock.Lock()
if err != nil { err = m.LateInit()
if err != nil {
m.lock.Unlock()
cclog.ComponentError(m.name, "lateinit failed")
return
}
m.initialized = true
m.lock.Unlock() m.lock.Unlock()
return skip = true
} }
m.initialized = true
m.lock.Unlock()
}
if m.initialized && !skip { if m.initialized && !skip {
for _, evset := range m.likwidGroups { time := interval
if !skip { for _, evset := range m.likwidGroups {
// measure event set 'i' for 'interval' seconds if !skip {
skip, err = m.takeMeasurement(evset, interval) // measure event set 'i' for 'interval' seconds
if err != nil { skip, err = m.takeMeasurement(evset, interval)
cclog.ComponentError(m.name, err.Error()) if err != nil {
return cclog.ComponentError(m.name, err.Error())
return
}
}
if !skip {
// read measurements and derive event set metrics
m.calcEventsetMetrics(evset, time, output)
} }
} }
if !skip { if !skip {
// read measurements and derive event set metrics // use the event set metrics to derive the global metrics
m.calcEventsetMetrics(evset, interval, output) m.calcGlobalMetrics(time, output)
} }
} }
if !skip { })
// use the event set metrics to derive the global metrics
m.calcGlobalMetrics(interval, output)
}
}
} }
func (m *LikwidCollector) Close() { func (m *LikwidCollector) Close() {