From ed62e952ce98dab0dc8c9deb2567b3ab5534b155 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Wed, 2 Feb 2022 14:52:07 +0100 Subject: [PATCH] Use MetricAggregator to calculate metrics in LIKWID collector. --- collectors/likwidMetric.go | 401 ++++++++++++++++++------------------- 1 file changed, 199 insertions(+), 202 deletions(-) diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index e3be810..82e241d 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -24,7 +24,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" - "github.com/PaesslerAG/gval" + mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) type MetricScope string @@ -43,16 +43,32 @@ func (ms MetricScope) String() string { return string(ms) } +func (ms MetricScope) Likwid() string { + LikwidDomains := map[string]string{ + "hwthread": "", + "core": "", + "llc": "C", + "numadomain": "M", + "die": "D", + "socket": "S", + "node": "N", + } + return LikwidDomains[string(ms)] +} + func (ms MetricScope) Granularity() int { - grans := []string{"hwthread", "core", "llc", "numadomain", "die", "socket", "node"} - for i, g := range grans { - if ms.String() == g { + for i, g := range GetAllMetricScopes() { + if ms == g { return i } } return -1 } +func GetAllMetricScopes() []MetricScope { + return []MetricScope{"hwthread" /*, "core", "llc", "numadomain", "die",*/, "socket", "node"} +} + type LikwidCollectorMetricConfig struct { Name string `json:"name"` // Name of the metric Calc string `json:"calc"` // Calculation for the metric using @@ -77,16 +93,18 @@ type LikwidCollectorConfig struct { type LikwidCollector struct { metricCollector - cpulist []C.int - sock2tid map[int]int - metrics map[C.int]map[string]int - groups []C.int - config LikwidCollectorConfig - results map[int]map[int]map[string]interface{} - mresults map[int]map[int]map[string]float64 - gmresults map[int]map[string]float64 - basefreq float64 - running bool + cpulist []C.int + cpu2tid map[int]int + sock2tid map[int]int + scopeRespTids map[MetricScope]map[int]int + metrics map[C.int]map[string]int + groups []C.int + config LikwidCollectorConfig + results map[int]map[int]map[string]interface{} + mresults map[int]map[int]map[string]float64 + gmresults map[int]map[string]float64 + basefreq float64 + running bool } type LikwidMetric struct { @@ -138,28 +156,8 @@ func getBaseFreq() float64 { return freq } -func getSocketCpus() map[C.int]int { - slist := SocketList() - var cpu C.int - outmap := make(map[C.int]int) - for _, s := range slist { - t := C.CString(fmt.Sprintf("S%d", s)) - clen := C.cpustr_to_cpulist(t, &cpu, 1) - if int(clen) == 1 { - outmap[cpu] = s - } - } - return outmap -} - -func (m *LikwidCollector) CatchGvalPanic() { - if rerr := recover(); rerr != nil { - cclog.ComponentError(m.name, "Gval failed to calculate a metric", rerr) - m.init = false - } -} - func (m *LikwidCollector) initGranularity() { + splitRegex := regexp.MustCompile("[+-/*()]") for _, evset := range m.config.Eventsets { evset.granulatity = make(map[string]MetricScope) for counter, event := range evset.Events { @@ -169,7 +167,7 @@ func (m *LikwidCollector) initGranularity() { } } for i, metric := range evset.Metrics { - s := regexp.MustCompile("[+-/*()]").Split(metric.Calc, -1) + s := splitRegex.Split(metric.Calc, -1) gran := MetricScope("hwthread") evset.Metrics[i].granulatity = gran for _, x := range s { @@ -183,7 +181,7 @@ func (m *LikwidCollector) initGranularity() { } } for i, metric := range m.config.Metrics { - s := regexp.MustCompile("[+-/*()]").Split(metric.Calc, -1) + s := splitRegex.Split(metric.Calc, -1) gran := MetricScope("hwthread") m.config.Metrics[i].granulatity = gran for _, x := range s { @@ -199,6 +197,59 @@ func (m *LikwidCollector) initGranularity() { } } +type TopoResolveFunc func(cpuid int) int + +func (m *LikwidCollector) getResponsiblities() map[MetricScope]map[int]int { + get_cpus := func(scope MetricScope) map[int]int { + var slist []int + var cpu C.int + var input func(index int) string + switch scope { + case "node": + slist = []int{0} + input = func(index int) string { return "N:0" } + case "socket": + input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) } + slist = topo.SocketList() + // case "numadomain": + // input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) } + // slist = topo.NumaNodeList() + // cclog.Debug(scope, " ", input(0), " ", slist) + // case "die": + // input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) } + // slist = topo.DieList() + // case "llc": + // input = fmt.Sprintf("%s%d:0", scope.Likwid(), s) + // slist = topo.LLCacheList() + case "hwthread": + input = func(index int) string { return fmt.Sprintf("%d", index) } + slist = topo.CpuList() + } + outmap := make(map[int]int) + for _, s := range slist { + t := C.CString(input(s)) + clen := C.cpustr_to_cpulist(t, &cpu, 1) + if int(clen) == 1 { + outmap[s] = m.cpu2tid[int(cpu)] + } else { + cclog.Error(fmt.Sprintf("Cannot determine responsible CPU for %s", input(s))) + outmap[s] = -1 + } + C.free(unsafe.Pointer(t)) + } + return outmap + } + + scopes := GetAllMetricScopes() + complete := make(map[MetricScope]map[int]int) + for _, s := range scopes { + cclog.Debug("Start ", s) + complete[s] = get_cpus(s) + cclog.Debug("End ", s) + } + return complete +} + func (m *LikwidCollector) Init(config json.RawMessage) error { var ret C.int m.name = "LikwidCollector" @@ -208,40 +259,39 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { return err } } - m.initGranularity() if m.config.ForceOverwrite { + cclog.ComponentDebug(m.name, "Set LIKWID_FORCE=1") os.Setenv("LIKWID_FORCE", "1") } m.setup() - // in some cases, gval causes a panic. We catch it with the handler and deactivate - // the collector (m.init = false). - defer m.CatchGvalPanic() m.meta = map[string]string{"source": m.name, "group": "PerfCounter"} + cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists") cpulist := topo.CpuList() m.cpulist = make([]C.int, len(cpulist)) - - cclog.ComponentDebug(m.name, "Create maps for socket, numa, core and die metrics") - m.sock2tid = make(map[int]int) - // m.numa2tid = make(map[int]int) - // m.core2tid = make(map[int]int) - // m.die2tid = make(map[int]int) + m.cpu2tid = make(map[int]int) for i, c := range cpulist { m.cpulist[i] = C.int(c) - m.sock2tid[topo.GetCpuSocket(c)] = i - // m.numa2tid[topo.GetCpuNumaDomain(c)] = i - // m.core2tid[topo.GetCpuCore(c)] = i - // m.die2tid[topo.GetCpuDie(c)] = i + m.cpu2tid[c] = i + } m.results = make(map[int]map[int]map[string]interface{}) m.mresults = make(map[int]map[int]map[string]float64) m.gmresults = make(map[int]map[string]float64) + cclog.ComponentDebug(m.name, "initialize LIKWID topology") ret = C.topology_init() if ret != 0 { err := errors.New("failed to initialize LIKWID topology") cclog.ComponentError(m.name, err.Error()) return err } + + // Determine which counter works at which level. PMC*: hwthread, *BOX*: socket, ... + m.initGranularity() + // Generate map for MetricScope -> scope_id (like socket id) -> responsible id (offset in cpulist) + m.scopeRespTids = m.getResponsiblities() + + cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module") ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0]) if ret != 0 { C.topology_finalize() @@ -250,28 +300,33 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { return err } + // This is for the global metrics computation test globalParams := make(map[string]interface{}) globalParams["time"] = float64(1.0) globalParams["inverseClock"] = float64(1.0) - + // While adding the events, we test the metrics whether they can be computed at all for i, evset := range m.config.Eventsets { estr := eventsToEventStr(evset.Events) + // Generate parameter list for the metric computing test params := make(map[string]interface{}) params["time"] = float64(1.0) params["inverseClock"] = float64(1.0) - for counter, _ := range evset.Events { + for counter := range evset.Events { params[counter] = float64(1.0) } for _, metric := range evset.Metrics { - _, err := gval.Evaluate(metric.Calc, params, gval.Full()) + // Try to evaluate the metric + _, err := mr.EvalFloat64Condition(metric.Calc, params) if err != nil { cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) continue } + // If the metric is not in the parameter list for the global metrics, add it if _, ok := globalParams[metric.Name]; !ok { globalParams[metric.Name] = float64(1.0) } } + // Now we add the list of events to likwid cstr := C.CString(estr) gid := C.perfmon_addEventSet(cstr) if gid >= 0 { @@ -283,17 +338,21 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { for tid := range m.cpulist { m.results[i][tid] = make(map[string]interface{}) m.mresults[i][tid] = make(map[string]float64) - m.gmresults[tid] = make(map[string]float64) + if i == 0 { + m.gmresults[tid] = make(map[string]float64) + } } } for _, metric := range m.config.Metrics { - _, err := gval.Evaluate(metric.Calc, globalParams, gval.Full()) + // Try to evaluate the global metric + _, err := mr.EvalFloat64Condition(metric.Calc, globalParams) if err != nil { cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) continue } } + // If no event set could be added, shut down LikwidCollector if len(m.groups) == 0 { C.perfmon_finalize() C.topology_finalize() @@ -306,6 +365,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { return nil } +// take a measurement for 'interval' seconds of event set index 'group' func (m *LikwidCollector) takeMeasurement(group int, interval time.Duration) error { var ret C.int gid := m.groups[group] @@ -336,101 +396,104 @@ func (m *LikwidCollector) takeMeasurement(group int, interval time.Duration) err return nil } -func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration) error { +// Get all measurement results for an event set, derive the metric values out of the measurement results and send it +func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration, output chan lp.CCMetric) error { var eidx C.int evset := m.config.Eventsets[group] gid := m.groups[group] - for tid := range m.cpulist { - for eidx = 0; int(eidx) < len(evset.Events); eidx++ { - ctr := C.perfmon_getCounterName(gid, eidx) - gctr := C.GoString(ctr) - res := C.perfmon_getLastResult(gid, eidx, C.int(tid)) - m.results[group][tid][gctr] = float64(res) - if m.results[group][tid][gctr] == 0 { - m.results[group][tid][gctr] = 1.0 + + // Go over events and get the results + for eidx = 0; int(eidx) < len(evset.Events); eidx++ { + ctr := C.perfmon_getCounterName(gid, eidx) + ev := C.perfmon_getEventName(gid, eidx) + gctr := C.GoString(ctr) + gev := C.GoString(ev) + // MetricScope for the counter (and if needed the event) + scope := getGranularity(gctr, gev) + // Get the map scope-id -> tids + // This way we read less counters like only the responsible hardware thread for a socket + scopemap := m.scopeRespTids[scope] + for _, tid := range scopemap { + if tid >= 0 { + m.results[group][tid]["time"] = interval.Seconds() + m.results[group][tid]["inverseClock"] = float64(1.0 / m.basefreq) + res := C.perfmon_getLastResult(gid, eidx, C.int(tid)) + m.results[group][tid][gctr] = float64(res) } } - m.results[group][tid]["time"] = interval.Seconds() - m.results[group][tid]["inverseClock"] = float64(1.0 / m.basefreq) - for _, metric := range evset.Metrics { - value, err := gval.Evaluate(metric.Calc, m.results[group][tid], gval.Full()) - if err != nil { - cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) - continue - } - m.mresults[group][tid][metric.Name] = value.(float64) - } } - return nil -} -func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration) error { - for _, metric := range m.config.Metrics { - for tid := range m.cpulist { - params := make(map[string]interface{}) - for j := range m.groups { - for mname, mres := range m.mresults[j][tid] { - params[mname] = mres + // Go over the event set metrics, derive the value out of the event:counter values and send it + for _, metric := range evset.Metrics { + // The metric scope is determined in the Init() function + // Get the map scope-id -> tids + scopemap := m.scopeRespTids[metric.Scope] + for domain, tid := range scopemap { + if tid >= 0 { + value, err := mr.EvalFloat64Condition(metric.Calc, m.results[group][tid]) + if err != nil { + cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) + continue + } + m.mresults[group][tid][metric.Name] = value + // Now we have the result, send it with the proper tags + tags := map[string]string{"type": metric.Scope.String()} + if metric.Scope != "node" { + tags["type-id"] = fmt.Sprintf("%d", domain) + } + fields := map[string]interface{}{"value": value} + y, err := lp.New(metric.Name, tags, m.meta, fields, time.Now()) + if err == nil { + output <- y } } - value, err := gval.Evaluate(metric.Calc, params, gval.Full()) - if err != nil { - cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) - continue + } + } + + return nil +} + +// Go over the global metrics, derive the value out of the event sets' metric values and send it +func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan lp.CCMetric) error { + for _, metric := range m.config.Metrics { + scopemap := m.scopeRespTids[metric.Scope] + for domain, tid := range scopemap { + if tid >= 0 { + // Here we generate parameter list + params := make(map[string]interface{}) + for j := range m.groups { + for mname, mres := range m.mresults[j][tid] { + params[mname] = mres + } + } + // Evaluate the metric + value, err := mr.EvalFloat64Condition(metric.Calc, params) + if err != nil { + cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) + continue + } + m.gmresults[tid][metric.Name] = value + // Now we have the result, send it with the proper tags + tags := map[string]string{"type": metric.Scope.String()} + if metric.Scope != "node" { + tags["type-id"] = fmt.Sprintf("%d", domain) + } + fields := map[string]interface{}{"value": value} + y, err := lp.New(metric.Name, tags, m.meta, fields, time.Now()) + if err == nil { + output <- y + } } - m.gmresults[tid][metric.Name] = value.(float64) } } return nil } -// func (m *LikwidCollector) calcResultMetrics(interval time.Duration) ([]lp.CCMetric, error) { -// var err error = nil -// metrics := make([]lp.CCMetric, 0) -// for i := range m.groups { -// evset := m.config.Eventsets[i] -// for _, metric := range evset.Metrics { -// log.Print(metric.Name, " ", metric.Scope, " ", metric.granulatity) -// if metric.Scope.Granularity() > metric.granulatity.Granularity() { -// log.Print("Different granularity wanted for ", metric.Name, ": ", metric.Scope, " vs ", metric.granulatity) -// var idlist []int -// idfunc := func(cpuid int) int { return cpuid } -// switch metric.Scope { -// case "socket": -// idlist = topo.SocketList() -// idfunc = topo.GetCpuSocket -// case "numa": -// idlist = topo.NumaNodeList() -// idfunc = topo.GetCpuNumaDomain -// case "core": -// idlist = topo.CoreList() -// idfunc = topo.GetCpuCore -// case "die": -// idlist = topo.DieList() -// idfunc = topo.GetCpuDie -// case "node": -// idlist = topo.CpuList() -// } -// for i := 0; i < num_results; i++ { - -// } -// } -// } -// } -// for _, metric := range m.config.Metrics { -// log.Print(metric.Name, " ", metric.Scope, " ", metric.granulatity) -// if metric.Scope.Granularity() > metric.granulatity.Granularity() { -// log.Print("Different granularity wanted for ", metric.Name, ": ", metric.Scope, " vs ", metric.granulatity) -// } -// } -// return metrics, err -// } - +// main read function taking multiple measurement rounds, each 'interval' seconds long func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) { if !m.init { return } - defer m.CatchGvalPanic() for i, _ := range m.groups { // measure event set 'i' for 'interval' seconds @@ -439,77 +502,11 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) cclog.ComponentError(m.name, err.Error()) continue } - m.calcEventsetMetrics(i, interval) - } - - m.calcGlobalMetrics(interval) - - //metrics, err = m.calcResultMetrics(interval) - - for i := range m.groups { - evset := m.config.Eventsets[i] - for _, metric := range evset.Metrics { - - _, skip := stringArrayContains(m.config.ExcludeMetrics, metric.Name) - if metric.Publish && !skip { - if metric.Scope == "socket" { - for sid, tid := range m.sock2tid { - y, err := lp.New(metric.Name, - map[string]string{"type": "socket", - "type-id": fmt.Sprintf("%d", int(sid))}, - m.meta, - map[string]interface{}{"value": m.mresults[i][tid][metric.Name]}, - time.Now()) - if err == nil { - output <- y - } - } - } else if metric.Scope == "hwthread" { - for tid, cpu := range m.cpulist { - y, err := lp.New(metric.Name, - map[string]string{"type": "cpu", - "type-id": fmt.Sprintf("%d", int(cpu))}, - m.meta, - map[string]interface{}{"value": m.mresults[i][tid][metric.Name]}, - time.Now()) - if err == nil { - output <- y - } - } - } - } - } - } - for _, metric := range m.config.Metrics { - _, skip := stringArrayContains(m.config.ExcludeMetrics, metric.Name) - if metric.Publish && !skip { - if metric.Scope == "socket" { - for sid, tid := range m.sock2tid { - y, err := lp.New(metric.Name, - map[string]string{"type": "socket", - "type-id": fmt.Sprintf("%d", int(sid))}, - m.meta, - map[string]interface{}{"value": m.gmresults[tid][metric.Name]}, - time.Now()) - if err == nil { - output <- y - } - } - } else if metric.Scope == "hwthread" { - for tid, cpu := range m.cpulist { - y, err := lp.New(metric.Name, - map[string]string{"type": "cpu", - "type-id": fmt.Sprintf("%d", int(cpu))}, - m.meta, - map[string]interface{}{"value": m.gmresults[tid][metric.Name]}, - time.Now()) - if err == nil { - output <- y - } - } - } - } + // read measurements and derive event set metrics + m.calcEventsetMetrics(i, interval, output) } + // use the event set metrics to derive the global metrics + m.calcGlobalMetrics(interval, output) } func (m *LikwidCollector) Close() {