From e0e91844bced050459e310d97e0e74c7bb4bd6e9 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Thu, 24 Mar 2022 17:56:51 +0100 Subject: [PATCH] Use late initialization of LIKWID and catch access daemon death. Fixes #70 and fixes #71. --- collectors/likwidMetric.go | 381 ++++++++++++++++++++++++------------- 1 file changed, 251 insertions(+), 130 deletions(-) diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index 85bd932..7113b4f 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -15,8 +15,11 @@ import ( "io/ioutil" "math" "os" + "os/signal" "strconv" "strings" + "sync" + "syscall" "time" "unsafe" @@ -46,6 +49,15 @@ type LikwidCollectorEventsetConfig struct { Metrics []LikwidCollectorMetricConfig `json:"metrics"` } +type LikwidEventsetConfig struct { + internal int + gid C.int + eorder []*C.char + estr *C.char + results map[int]map[string]interface{} + metrics map[int]map[string]float64 +} + type LikwidCollectorConfig struct { Eventsets []LikwidCollectorEventsetConfig `json:"eventsets"` Metrics []LikwidCollectorMetricConfig `json:"globalmetrics,omitempty"` @@ -58,17 +70,18 @@ type LikwidCollectorConfig struct { type LikwidCollector struct { metricCollector - cpulist []C.int - cpu2tid map[int]int - sock2tid map[int]int - metrics map[C.int]map[string]int - groups []C.int - config LikwidCollectorConfig - results map[int]map[int]map[string]interface{} - mresults map[int]map[int]map[string]float64 - gmresults map[int]map[string]float64 - basefreq float64 - running bool + cpulist []C.int + cpu2tid map[int]int + sock2tid map[int]int + metrics map[C.int]map[string]int + groups []C.int + config LikwidCollectorConfig + gmresults map[int]map[string]float64 + basefreq float64 + running bool + initialized bool + likwidGroups map[C.int]LikwidEventsetConfig + lock sync.Mutex } type LikwidMetric struct { @@ -86,6 +99,45 @@ func eventsToEventStr(events map[string]string) string { return strings.Join(elist, ",") } +func genLikwidEventSet(input LikwidCollectorEventsetConfig) LikwidEventsetConfig { + tmplist := make([]string, 0) + elist := make([]*C.char, 0) + for k, v := range input.Events { + tmplist = append(tmplist, fmt.Sprintf("%s:%s", v, k)) + c_counter := C.CString(k) + elist = append(elist, c_counter) + } + estr := strings.Join(tmplist, ",") + res := make(map[int]map[string]interface{}) + met := make(map[int]map[string]float64) + for _, i := range topo.CpuList() { + res[i] = make(map[string]interface{}) + for k := range input.Events { + res[i][k] = 0.0 + } + met[i] = make(map[string]float64) + for _, v := range input.Metrics { + res[i][v.Name] = 0.0 + } + } + return LikwidEventsetConfig{ + gid: -1, + eorder: elist, + estr: C.CString(estr), + results: res, + metrics: met, + } +} + +func testLikwidMetricFormula(formula string, params []string) bool { + myparams := make(map[string]interface{}) + for _, p := range params { + myparams[p] = float64(1.0) + } + _, err := agg.EvalFloat64Condition(formula, myparams) + return err == nil +} + func getBaseFreq() float64 { var freq float64 = math.NaN() C.power_init(0) @@ -108,6 +160,8 @@ func getBaseFreq() float64 { func (m *LikwidCollector) Init(config json.RawMessage) error { var ret C.int m.name = "LikwidCollector" + m.initialized = false + m.running = false m.config.AccessMode = LIKWID_DEF_ACCESSMODE m.config.LibraryPath = LIKWID_LIB_NAME if len(config) > 0 { @@ -140,6 +194,15 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { m.cpulist[i] = C.int(c) m.cpu2tid[c] = i } + + cclog.ComponentDebug(m.name, "initialize LIKWID topology") + ret = C.topology_init() + if ret != 0 { + err := errors.New("failed to initialize LIKWID topology") + cclog.ComponentError(m.name, err.Error()) + return err + } + m.sock2tid = make(map[int]int) tmp := make([]C.int, 1) for _, sid := range topo.SocketList() { @@ -150,15 +213,13 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { } C.free(unsafe.Pointer(cstr)) } - m.results = make(map[int]map[int]map[string]interface{}) - m.mresults = make(map[int]map[int]map[string]float64) + m.likwidGroups = make(map[C.int]LikwidEventsetConfig) + + // m.results = make(map[int]map[int]map[string]interface{}) + // m.mresults = make(map[int]map[int]map[string]float64) m.gmresults = make(map[int]map[string]float64) - cclog.ComponentDebug(m.name, "initialize LIKWID topology") - ret = C.topology_init() - if ret != 0 { - err := errors.New("failed to initialize LIKWID topology") - cclog.ComponentError(m.name, err.Error()) - return err + for _, tid := range m.cpu2tid { + m.gmresults[tid] = make(map[string]float64) } switch m.config.AccessMode { @@ -172,79 +233,50 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { C.HPMmode(1) } - cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module") - ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0]) - if ret != 0 { - C.topology_finalize() - err := errors.New("failed to initialize LIKWID topology") - cclog.ComponentError(m.name, err.Error()) - return err - } - // This is for the global metrics computation test - globalParams := make(map[string]interface{}) - globalParams["time"] = float64(1.0) - globalParams["inverseClock"] = float64(1.0) - // While adding the events, we test the metrics whether they can be computed at all - for i, evset := range m.config.Eventsets { - var gid C.int - var cstr *C.char + totalMetrics := 0 + // Generate parameter list for the metric computing test + params := make([]string, 0) + params = append(params, "time", "inverseClock") + // Generate parameter list for the global metric computing test + globalParams := make([]string, 0) + globalParams = append(globalParams, "time", "inverseClock") + // We test the eventset metrics whether they can be computed at all + for _, evset := range m.config.Eventsets { if len(evset.Events) > 0 { - estr := eventsToEventStr(evset.Events) - // Generate parameter list for the metric computing test - params := make(map[string]interface{}) - params["time"] = float64(1.0) - params["inverseClock"] = float64(1.0) + params = params[:2] for counter := range evset.Events { - params[counter] = float64(1.0) + params = append(params, counter) } for _, metric := range evset.Metrics { // Try to evaluate the metric - _, err := agg.EvalFloat64Condition(metric.Calc, params) - if err != nil { - cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) - continue - } - // If the metric is not in the parameter list for the global metrics, add it - if _, ok := globalParams[metric.Name]; !ok { - globalParams[metric.Name] = float64(1.0) + if testLikwidMetricFormula(metric.Calc, params) { + // Add the computable metric to the parameter list for the global metrics + globalParams = append(globalParams, metric.Name) + totalMetrics++ + } else { + metric.Calc = "" } } - // Now we add the list of events to likwid - cstr = C.CString(estr) - gid = C.perfmon_addEventSet(cstr) } else { cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given") continue } - if gid >= 0 { - m.groups = append(m.groups, gid) - } - C.free(unsafe.Pointer(cstr)) - m.results[i] = make(map[int]map[string]interface{}) - m.mresults[i] = make(map[int]map[string]float64) - for tid := range m.cpulist { - m.results[i][tid] = make(map[string]interface{}) - m.mresults[i][tid] = make(map[string]float64) - if i == 0 { - m.gmresults[tid] = make(map[string]float64) - } - } } for _, metric := range m.config.Metrics { // Try to evaluate the global metric - _, err := agg.EvalFloat64Condition(metric.Calc, globalParams) - if err != nil { - cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) - continue + if !testLikwidMetricFormula(metric.Calc, globalParams) { + cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed") + metric.Calc = "" + } else { + totalMetrics++ } } // If no event set could be added, shut down LikwidCollector - if len(m.groups) == 0 { - C.perfmon_finalize() + if totalMetrics == 0 { C.topology_finalize() - err := errors.New("no LIKWID performance group initialized") + err := errors.New("no LIKWID eventset or metric usable") cclog.ComponentError(m.name, err.Error()) return err } @@ -255,57 +287,71 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { } // take a measurement for 'interval' seconds of event set index 'group' -func (m *LikwidCollector) takeMeasurement(group int, interval time.Duration) error { +func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval time.Duration) (bool, error) { var ret C.int - gid := m.groups[group] - ret = C.perfmon_setupCounters(gid) - if ret != 0 { - gctr := C.GoString(C.perfmon_getGroupName(gid)) - err := fmt.Errorf("failed to setup performance group %d (%s)", gid, gctr) - return err + m.lock.Lock() + if m.initialized { + ret = C.perfmon_setupCounters(evset.gid) + if ret != 0 { + var err error = nil + var skip bool = false + if ret == -37 { + skip = true + } else { + err = fmt.Errorf("failed to setup performance group %d", evset.gid) + } + m.lock.Unlock() + return skip, err + } + ret = C.perfmon_startCounters() + if ret != 0 { + var err error = nil + var skip bool = false + if ret == -37 { + skip = true + } else { + err = fmt.Errorf("failed to setup performance group %d", evset.gid) + } + m.lock.Unlock() + return skip, err + } + m.running = true + time.Sleep(interval) + m.running = false + ret = C.perfmon_stopCounters() + if ret != 0 { + var err error = nil + var skip bool = false + if ret == -37 { + skip = true + } else { + err = fmt.Errorf("failed to setup performance group %d", evset.gid) + } + m.lock.Unlock() + return skip, err + } } - ret = C.perfmon_startCounters() - if ret != 0 { - gctr := C.GoString(C.perfmon_getGroupName(gid)) - err := fmt.Errorf("failed to start performance group %d (%s)", gid, gctr) - return err - } - m.running = true - time.Sleep(interval) - m.running = false - ret = C.perfmon_stopCounters() - if ret != 0 { - gctr := C.GoString(C.perfmon_getGroupName(gid)) - err := fmt.Errorf("failed to stop performance group %d (%s)", gid, gctr) - return err - } - return nil + m.lock.Unlock() + return false, nil } // Get all measurement results for an event set, derive the metric values out of the measurement results and send it -func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration, output chan lp.CCMetric) error { - var eidx C.int - evset := m.config.Eventsets[group] - gid := m.groups[group] +func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interval time.Duration, output chan lp.CCMetric) error { invClock := float64(1.0 / m.basefreq) // Go over events and get the results - for eidx = 0; int(eidx) < len(evset.Events); eidx++ { - ctr := C.perfmon_getCounterName(gid, eidx) - gctr := C.GoString(ctr) - + for eidx, counter := range evset.eorder { + gctr := C.GoString(counter) for _, tid := range m.cpu2tid { - if tid >= 0 { - m.results[group][tid]["time"] = interval.Seconds() - m.results[group][tid]["inverseClock"] = invClock - res := C.perfmon_getLastResult(gid, eidx, C.int(tid)) - m.results[group][tid][gctr] = float64(res) - } + res := C.perfmon_getLastResult(evset.gid, C.int(eidx), C.int(tid)) + evset.results[tid][gctr] = float64(res) + evset.results[tid]["time"] = interval.Seconds() + evset.results[tid]["inverseClock"] = invClock } } // Go over the event set metrics, derive the value out of the event:counter values and send it - for _, metric := range evset.Metrics { + for _, metric := range m.config.Eventsets[evset.internal].Metrics { // The metric scope is determined in the Init() function // Get the map scope-id -> tids scopemap := m.cpu2tid @@ -313,13 +359,13 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration, scopemap = m.sock2tid } for domain, tid := range scopemap { - if tid >= 0 { - value, err := agg.EvalFloat64Condition(metric.Calc, m.results[group][tid]) + if tid >= 0 && len(metric.Calc) > 0 { + value, err := agg.EvalFloat64Condition(metric.Calc, evset.results[tid]) if err != nil { cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) continue } - m.mresults[group][tid][metric.Name] = value + evset.metrics[tid][metric.Name] = value if m.config.InvalidToZero && math.IsNaN(value) { value = 0.0 } @@ -360,8 +406,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan if tid >= 0 { // Here we generate parameter list params := make(map[string]interface{}) - for j := range m.groups { - for mname, mres := range m.mresults[j][tid] { + for _, evset := range m.likwidGroups { + for mname, mres := range evset.metrics[tid] { params[mname] = mres } } @@ -401,38 +447,113 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan return nil } +func (m *LikwidCollector) LateInit() error { + var ret C.int + cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module") + ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0]) + if ret != 0 { + var err error = nil + C.topology_finalize() + if ret != -22 { + err = errors.New("failed to initialize LIKWID perfmon") + cclog.ComponentError(m.name, err.Error()) + } else { + err = errors.New("access to LIKWID perfmon locked") + } + return err + } + + // While adding the events, we test the metrics whether they can be computed at all + for i, evset := range m.config.Eventsets { + var gid C.int + if len(evset.Events) > 0 { + likwidGroup := genLikwidEventSet(evset) + // Now we add the list of events to likwid + gid = C.perfmon_addEventSet(likwidGroup.estr) + if gid >= 0 { + likwidGroup.gid = gid + likwidGroup.internal = i + m.likwidGroups[gid] = likwidGroup + } + } else { + cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given") + continue + } + + } + + // If no event set could be added, shut down LikwidCollector + if len(m.likwidGroups) == 0 { + C.perfmon_finalize() + C.topology_finalize() + err := errors.New("no LIKWID performance group initialized") + cclog.ComponentError(m.name, err.Error()) + return err + } + sigchan := make(chan os.Signal, 1) + signal.Notify(sigchan, syscall.SIGCHLD) + signal.Notify(sigchan, os.Interrupt) + go func() { + <-sigchan + + signal.Stop(sigchan) + m.initialized = false + }() + m.initialized = true + return nil +} + // main read function taking multiple measurement rounds, each 'interval' seconds long func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) { + var skip bool = false + var err error if !m.init { return } - for i := range m.groups { - // measure event set 'i' for 'interval' seconds - err := m.takeMeasurement(i, interval) - if err != nil { - cclog.ComponentError(m.name, err.Error()) + if !m.initialized { + if m.LateInit() != nil { return } - // read measurements and derive event set metrics - m.calcEventsetMetrics(i, interval, output) } - // use the event set metrics to derive the global metrics - m.calcGlobalMetrics(interval, output) + + if m.initialized && !skip { + for _, evset := range m.likwidGroups { + if !skip { + // measure event set 'i' for 'interval' seconds + skip, err = m.takeMeasurement(evset, interval) + if err != nil { + cclog.ComponentError(m.name, err.Error()) + return + } + } + + if !skip { + // read measurements and derive event set metrics + m.calcEventsetMetrics(evset, interval, output) + } + } + if !skip { + // use the event set metrics to derive the global metrics + m.calcGlobalMetrics(interval, output) + } + } } func (m *LikwidCollector) Close() { if m.init { - cclog.ComponentDebug(m.name, "Closing ...") m.init = false - if m.running { - cclog.ComponentDebug(m.name, "Stopping counters") - C.perfmon_stopCounters() + cclog.ComponentDebug(m.name, "Closing ...") + m.lock.Lock() + if m.initialized { + cclog.ComponentDebug(m.name, "Finalize LIKWID perfmon module") + C.perfmon_finalize() + m.initialized = false } - cclog.ComponentDebug(m.name, "Finalize LIKWID perfmon module") - C.perfmon_finalize() + m.lock.Unlock() cclog.ComponentDebug(m.name, "Finalize LIKWID topology module") C.topology_finalize() + cclog.ComponentDebug(m.name, "Closing done") } }