mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-07-19 03:11:41 +02:00
Fix for Likwid collector (#95)
* Run LIKWID in separate thread and check metric type * Change LIKWID collector documentation to use 'type' instead of 'scope' * Re-initialize LIKWID after one read is missing due to lock toggle
This commit is contained in:
@@ -28,6 +28,7 @@ import (
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
|
||||
"github.com/NVIDIA/go-nvml/pkg/dl"
|
||||
"golang.design/x/thread"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -71,18 +72,20 @@ type LikwidCollectorConfig struct {
|
||||
|
||||
type LikwidCollector struct {
|
||||
metricCollector
|
||||
cpulist []C.int
|
||||
cpu2tid map[int]int
|
||||
sock2tid map[int]int
|
||||
metrics map[C.int]map[string]int
|
||||
groups []C.int
|
||||
config LikwidCollectorConfig
|
||||
gmresults map[int]map[string]float64
|
||||
basefreq float64
|
||||
running bool
|
||||
initialized bool
|
||||
likwidGroups map[C.int]LikwidEventsetConfig
|
||||
lock sync.Mutex
|
||||
cpulist []C.int
|
||||
cpu2tid map[int]int
|
||||
sock2tid map[int]int
|
||||
metrics map[C.int]map[string]int
|
||||
groups []C.int
|
||||
config LikwidCollectorConfig
|
||||
gmresults map[int]map[string]float64
|
||||
basefreq float64
|
||||
running bool
|
||||
initialized bool
|
||||
needs_reinit bool
|
||||
likwidGroups map[C.int]LikwidEventsetConfig
|
||||
lock sync.Mutex
|
||||
measureThread thread.Thread
|
||||
}
|
||||
|
||||
type LikwidMetric struct {
|
||||
@@ -92,6 +95,18 @@ type LikwidMetric struct {
|
||||
group_idx int
|
||||
}
|
||||
|
||||
func checkMetricType(t string) bool {
|
||||
valid := map[string]bool{
|
||||
"node": true,
|
||||
"socket": true,
|
||||
"hwthread": true,
|
||||
"core": true,
|
||||
"memoryDomain": true,
|
||||
}
|
||||
_, ok := valid[t]
|
||||
return ok
|
||||
}
|
||||
|
||||
func eventsToEventStr(events map[string]string) string {
|
||||
elist := make([]string, 0)
|
||||
for k, v := range events {
|
||||
@@ -179,6 +194,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
m.name = "LikwidCollector"
|
||||
m.parallel = false
|
||||
m.initialized = false
|
||||
m.needs_reinit = true
|
||||
m.running = false
|
||||
m.config.AccessMode = LIKWID_DEF_ACCESSMODE
|
||||
m.config.LibraryPath = LIKWID_LIB_NAME
|
||||
@@ -239,7 +255,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
for _, metric := range evset.Metrics {
|
||||
// Try to evaluate the metric
|
||||
if testLikwidMetricFormula(metric.Calc, params) {
|
||||
if testLikwidMetricFormula(metric.Calc, params) && checkMetricType(metric.Type) {
|
||||
// Add the computable metric to the parameter list for the global metrics
|
||||
globalParams = append(globalParams, metric.Name)
|
||||
totalMetrics++
|
||||
@@ -257,6 +273,9 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
if !testLikwidMetricFormula(metric.Calc, globalParams) {
|
||||
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed")
|
||||
metric.Calc = ""
|
||||
} else if !checkMetricType(metric.Type) {
|
||||
cclog.ComponentError(m.name, "Metric", metric.Name, "has invalid type")
|
||||
metric.Calc = ""
|
||||
} else {
|
||||
totalMetrics++
|
||||
}
|
||||
@@ -268,6 +287,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
}
|
||||
m.measureThread = thread.New()
|
||||
m.init = true
|
||||
return nil
|
||||
}
|
||||
@@ -281,6 +301,7 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
|
||||
if ret != 0 {
|
||||
var err error = nil
|
||||
var skip bool = false
|
||||
cclog.ComponentDebug(m.name, "Setup returns", ret)
|
||||
if ret == -37 {
|
||||
skip = true
|
||||
} else {
|
||||
@@ -289,6 +310,7 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
|
||||
m.lock.Unlock()
|
||||
return skip, err
|
||||
}
|
||||
m.running = true
|
||||
ret = C.perfmon_startCounters()
|
||||
if ret != 0 {
|
||||
var err error = nil
|
||||
@@ -301,7 +323,7 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
|
||||
m.lock.Unlock()
|
||||
return skip, err
|
||||
}
|
||||
m.running = true
|
||||
ret = C.perfmon_readCounters()
|
||||
time.Sleep(interval)
|
||||
m.running = false
|
||||
ret = C.perfmon_stopCounters()
|
||||
@@ -316,6 +338,24 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
|
||||
m.lock.Unlock()
|
||||
return skip, err
|
||||
}
|
||||
m.running = false
|
||||
runtime := float64(C.perfmon_getLastTimeOfGroup(evset.gid))
|
||||
// Go over events and get the results
|
||||
for eidx, counter := range evset.eorder {
|
||||
gctr := C.GoString(counter)
|
||||
for _, tid := range m.cpu2tid {
|
||||
res := C.perfmon_getLastResult(evset.gid, C.int(eidx), C.int(tid))
|
||||
fres := float64(res)
|
||||
if m.config.InvalidToZero && (math.IsNaN(fres) || math.IsInf(fres, 0)) {
|
||||
cclog.ComponentDebug(m.name, "Sanitize", gctr, "to zero")
|
||||
fres = 0.0
|
||||
}
|
||||
evset.results[tid][gctr] = fres
|
||||
}
|
||||
}
|
||||
for _, tid := range m.cpu2tid {
|
||||
evset.results[tid]["time"] = runtime
|
||||
}
|
||||
}
|
||||
m.lock.Unlock()
|
||||
return false, nil
|
||||
@@ -325,19 +365,8 @@ func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval t
|
||||
func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interval time.Duration, output chan lp.CCMetric) error {
|
||||
invClock := float64(1.0 / m.basefreq)
|
||||
|
||||
// Go over events and get the results
|
||||
for eidx, counter := range evset.eorder {
|
||||
gctr := C.GoString(counter)
|
||||
for _, tid := range m.cpu2tid {
|
||||
res := C.perfmon_getLastResult(evset.gid, C.int(eidx), C.int(tid))
|
||||
fres := float64(res)
|
||||
if m.config.InvalidToZero && (math.IsNaN(fres) || math.IsInf(fres, 0)) {
|
||||
fres = 0.0
|
||||
}
|
||||
evset.results[tid][gctr] = fres
|
||||
evset.results[tid]["time"] = interval.Seconds()
|
||||
evset.results[tid]["inverseClock"] = invClock
|
||||
}
|
||||
for _, tid := range m.cpu2tid {
|
||||
evset.results[tid]["inverseClock"] = invClock
|
||||
}
|
||||
|
||||
// Go over the event set metrics, derive the value out of the event:counter values and send it
|
||||
@@ -431,6 +460,28 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *LikwidCollector) ReInit() error {
|
||||
C.perfmon_finalize()
|
||||
ret := C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
|
||||
if ret != 0 {
|
||||
return nil
|
||||
}
|
||||
for i, evset := range m.config.Eventsets {
|
||||
var gid C.int
|
||||
if len(evset.Events) > 0 {
|
||||
//skip := false
|
||||
likwidGroup := genLikwidEventSet(evset)
|
||||
gid = C.perfmon_addEventSet(likwidGroup.estr)
|
||||
if gid >= 0 {
|
||||
likwidGroup.gid = gid
|
||||
likwidGroup.internal = i
|
||||
m.likwidGroups[gid] = likwidGroup
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *LikwidCollector) LateInit() error {
|
||||
var ret C.int
|
||||
if m.initialized {
|
||||
@@ -445,6 +496,9 @@ func (m *LikwidCollector) LateInit() error {
|
||||
os.Setenv("PATH", m.config.DaemonPath+":"+p)
|
||||
}
|
||||
C.HPMmode(1)
|
||||
for _, c := range m.cpulist {
|
||||
C.HPMaddThread(c)
|
||||
}
|
||||
}
|
||||
cclog.ComponentDebug(m.name, "initialize LIKWID topology")
|
||||
ret = C.topology_init()
|
||||
@@ -468,48 +522,53 @@ func (m *LikwidCollector) LateInit() error {
|
||||
m.basefreq = getBaseFreq()
|
||||
cclog.ComponentDebug(m.name, "BaseFreq", m.basefreq)
|
||||
|
||||
cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module")
|
||||
ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
|
||||
if ret != 0 {
|
||||
var err error = nil
|
||||
C.topology_finalize()
|
||||
if ret != -22 {
|
||||
err = errors.New("failed to initialize LIKWID perfmon")
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
} else {
|
||||
err = errors.New("access to LIKWID perfmon locked")
|
||||
}
|
||||
return err
|
||||
if m.needs_reinit {
|
||||
m.ReInit()
|
||||
m.needs_reinit = false
|
||||
}
|
||||
|
||||
// While adding the events, we test the metrics whether they can be computed at all
|
||||
for i, evset := range m.config.Eventsets {
|
||||
var gid C.int
|
||||
if len(evset.Events) > 0 {
|
||||
skip := false
|
||||
likwidGroup := genLikwidEventSet(evset)
|
||||
for _, g := range m.likwidGroups {
|
||||
if likwidGroup.go_estr == g.go_estr {
|
||||
skip = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if skip {
|
||||
continue
|
||||
}
|
||||
// Now we add the list of events to likwid
|
||||
gid = C.perfmon_addEventSet(likwidGroup.estr)
|
||||
if gid >= 0 {
|
||||
likwidGroup.gid = gid
|
||||
likwidGroup.internal = i
|
||||
m.likwidGroups[gid] = likwidGroup
|
||||
}
|
||||
} else {
|
||||
cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given")
|
||||
continue
|
||||
}
|
||||
// cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module")
|
||||
// ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
|
||||
// if ret != 0 {
|
||||
// var err error = nil
|
||||
// C.topology_finalize()
|
||||
// if ret != -22 {
|
||||
// err = errors.New("failed to initialize LIKWID perfmon")
|
||||
// cclog.ComponentError(m.name, err.Error())
|
||||
// } else {
|
||||
// err = errors.New("access to LIKWID perfmon locked")
|
||||
// }
|
||||
// return err
|
||||
// }
|
||||
|
||||
}
|
||||
// // While adding the events, we test the metrics whether they can be computed at all
|
||||
// for i, evset := range m.config.Eventsets {
|
||||
// var gid C.int
|
||||
// if len(evset.Events) > 0 {
|
||||
// //skip := false
|
||||
// likwidGroup := genLikwidEventSet(evset)
|
||||
// // for _, g := range m.likwidGroups {
|
||||
// // if likwidGroup.go_estr == g.go_estr {
|
||||
// // skip = true
|
||||
// // break
|
||||
// // }
|
||||
// // }
|
||||
// // if skip {
|
||||
// // continue
|
||||
// // }
|
||||
// // Now we add the list of events to likwid
|
||||
// gid = C.perfmon_addEventSet(likwidGroup.estr)
|
||||
// if gid >= 0 {
|
||||
// likwidGroup.gid = gid
|
||||
// likwidGroup.internal = i
|
||||
// m.likwidGroups[gid] = likwidGroup
|
||||
// }
|
||||
// } else {
|
||||
// cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given")
|
||||
// continue
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
// If no event set could be added, shut down LikwidCollector
|
||||
if len(m.likwidGroups) == 0 {
|
||||
@@ -540,38 +599,48 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
||||
return
|
||||
}
|
||||
|
||||
if !m.initialized {
|
||||
m.lock.Lock()
|
||||
err = m.LateInit()
|
||||
if err != nil {
|
||||
m.measureThread.Call(func() {
|
||||
if !m.initialized {
|
||||
m.lock.Lock()
|
||||
err = m.LateInit()
|
||||
if err != nil {
|
||||
m.lock.Unlock()
|
||||
cclog.ComponentError(m.name, "lateinit failed")
|
||||
return
|
||||
}
|
||||
m.initialized = true
|
||||
m.lock.Unlock()
|
||||
return
|
||||
skip = true
|
||||
}
|
||||
m.initialized = true
|
||||
m.lock.Unlock()
|
||||
}
|
||||
|
||||
if m.initialized && !skip {
|
||||
for _, evset := range m.likwidGroups {
|
||||
if !skip {
|
||||
// measure event set 'i' for 'interval' seconds
|
||||
skip, err = m.takeMeasurement(evset, interval)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return
|
||||
if m.initialized && !skip {
|
||||
time := interval
|
||||
for _, evset := range m.likwidGroups {
|
||||
if !skip {
|
||||
// measure event set 'i' for 'interval' seconds
|
||||
skip, err = m.takeMeasurement(evset, interval)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if !skip {
|
||||
// read measurements and derive event set metrics
|
||||
m.calcEventsetMetrics(evset, time, output)
|
||||
}
|
||||
}
|
||||
|
||||
if !skip {
|
||||
// read measurements and derive event set metrics
|
||||
m.calcEventsetMetrics(evset, interval, output)
|
||||
// use the event set metrics to derive the global metrics
|
||||
m.calcGlobalMetrics(time, output)
|
||||
}
|
||||
if skip {
|
||||
m.needs_reinit = true
|
||||
m.initialized = false
|
||||
}
|
||||
}
|
||||
if !skip {
|
||||
// use the event set metrics to derive the global metrics
|
||||
m.calcGlobalMetrics(interval, output)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (m *LikwidCollector) Close() {
|
||||
|
Reference in New Issue
Block a user