From e550226416579b7f6109775890ac4c1027c456d3 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Tue, 1 Feb 2022 16:01:31 +0100 Subject: [PATCH] Use gval in LikwidCollector --- collectors/likwidMetric.go | 378 ++++++++++++++++++++++++++++--------- 1 file changed, 289 insertions(+), 89 deletions(-) diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index 430a09b..e3be810 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -13,40 +13,59 @@ import ( "errors" "fmt" "io/ioutil" - "log" "math" "os" + "regexp" "strconv" "strings" "time" "unsafe" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - "gopkg.in/Knetic/govaluate.v2" + topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" + "github.com/PaesslerAG/gval" ) -type MetricScope int +type MetricScope string const ( METRIC_SCOPE_HWTHREAD = iota - METRIC_SCOPE_SOCKET + METRIC_SCOPE_CORE + METRIC_SCOPE_LLC METRIC_SCOPE_NUMA + METRIC_SCOPE_DIE + METRIC_SCOPE_SOCKET METRIC_SCOPE_NODE ) func (ms MetricScope) String() string { - return []string{"Head", "Shoulder", "Knee", "Toe"}[ms] + return string(ms) +} + +func (ms MetricScope) Granularity() int { + grans := []string{"hwthread", "core", "llc", "numadomain", "die", "socket", "node"} + for i, g := range grans { + if ms.String() == g { + return i + } + } + return -1 } type LikwidCollectorMetricConfig struct { - Name string `json:"name"` - Calc string `json:"calc"` - Scope MetricScope `json:"socket_scope"` - Publish bool `json:"publish"` + Name string `json:"name"` // Name of the metric + Calc string `json:"calc"` // Calculation for the metric using + Aggr string `json:"aggregation"` // if scope unequal to LIKWID metric scope, the values are combined (sum, min, max, mean or avg, median) + Scope MetricScope `json:"scope"` // scope for calculation. subscopes are aggregated using the 'aggregation' function + Publish bool `json:"publish"` + granulatity MetricScope } type LikwidCollectorEventsetConfig struct { - Events map[string]string `json:"events"` - Metrics []LikwidCollectorMetricConfig `json:"metrics"` + Events map[string]string `json:"events"` + granulatity map[string]MetricScope + Metrics []LikwidCollectorMetricConfig `json:"metrics"` } type LikwidCollectorConfig struct { @@ -67,13 +86,14 @@ type LikwidCollector struct { mresults map[int]map[int]map[string]float64 gmresults map[int]map[string]float64 basefreq float64 + running bool } type LikwidMetric struct { - name string - search string - socket_scope bool - group_idx int + name string + search string + scope MetricScope + group_idx int } func eventsToEventStr(events map[string]string) string { @@ -84,6 +104,21 @@ func eventsToEventStr(events map[string]string) string { return strings.Join(elist, ",") } +func getGranularity(counter, event string) MetricScope { + if strings.HasPrefix(counter, "PMC") || strings.HasPrefix(counter, "FIXC") { + return "hwthread" + } else if strings.Contains(counter, "BOX") || strings.Contains(counter, "DEV") { + return "socket" + } else if strings.HasPrefix(counter, "PWR") { + if event == "RAPL_CORE_ENERGY" { + return "hwthread" + } else { + return "socket" + } + } + return "unknown" +} + func getBaseFreq() float64 { var freq float64 = math.NaN() C.power_init(0) @@ -117,6 +152,53 @@ func getSocketCpus() map[C.int]int { return outmap } +func (m *LikwidCollector) CatchGvalPanic() { + if rerr := recover(); rerr != nil { + cclog.ComponentError(m.name, "Gval failed to calculate a metric", rerr) + m.init = false + } +} + +func (m *LikwidCollector) initGranularity() { + for _, evset := range m.config.Eventsets { + evset.granulatity = make(map[string]MetricScope) + for counter, event := range evset.Events { + gran := getGranularity(counter, event) + if gran.Granularity() >= 0 { + evset.granulatity[counter] = gran + } + } + for i, metric := range evset.Metrics { + s := regexp.MustCompile("[+-/*()]").Split(metric.Calc, -1) + gran := MetricScope("hwthread") + evset.Metrics[i].granulatity = gran + for _, x := range s { + if _, ok := evset.Events[x]; ok { + if evset.granulatity[x].Granularity() > gran.Granularity() { + gran = evset.granulatity[x] + } + } + } + evset.Metrics[i].granulatity = gran + } + } + for i, metric := range m.config.Metrics { + s := regexp.MustCompile("[+-/*()]").Split(metric.Calc, -1) + gran := MetricScope("hwthread") + m.config.Metrics[i].granulatity = gran + for _, x := range s { + for _, evset := range m.config.Eventsets { + for _, m := range evset.Metrics { + if m.Name == x && m.granulatity.Granularity() > gran.Granularity() { + gran = m.granulatity + } + } + } + } + m.config.Metrics[i].granulatity = gran + } +} + func (m *LikwidCollector) Init(config json.RawMessage) error { var ret C.int m.name = "LikwidCollector" @@ -126,38 +208,70 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { return err } } + m.initGranularity() + if m.config.ForceOverwrite { + os.Setenv("LIKWID_FORCE", "1") + } m.setup() - m.meta = map[string]string{"source": m.name, "group": "PerfCounter"} - cpulist := CpuList() - m.cpulist = make([]C.int, len(cpulist)) - slist := getSocketCpus() + // in some cases, gval causes a panic. We catch it with the handler and deactivate + // the collector (m.init = false). + defer m.CatchGvalPanic() + m.meta = map[string]string{"source": m.name, "group": "PerfCounter"} + cpulist := topo.CpuList() + m.cpulist = make([]C.int, len(cpulist)) + + cclog.ComponentDebug(m.name, "Create maps for socket, numa, core and die metrics") m.sock2tid = make(map[int]int) - // m.numa2tid = make(map[int]int) + // m.numa2tid = make(map[int]int) + // m.core2tid = make(map[int]int) + // m.die2tid = make(map[int]int) for i, c := range cpulist { m.cpulist[i] = C.int(c) - if sid, found := slist[m.cpulist[i]]; found { - m.sock2tid[sid] = i - } + m.sock2tid[topo.GetCpuSocket(c)] = i + // m.numa2tid[topo.GetCpuNumaDomain(c)] = i + // m.core2tid[topo.GetCpuCore(c)] = i + // m.die2tid[topo.GetCpuDie(c)] = i } m.results = make(map[int]map[int]map[string]interface{}) m.mresults = make(map[int]map[int]map[string]float64) m.gmresults = make(map[int]map[string]float64) ret = C.topology_init() if ret != 0 { - return errors.New("Failed to initialize LIKWID topology") - } - if m.config.ForceOverwrite { - os.Setenv("LIKWID_FORCE", "1") + err := errors.New("failed to initialize LIKWID topology") + cclog.ComponentError(m.name, err.Error()) + return err } ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0]) if ret != 0 { C.topology_finalize() - return errors.New("Failed to initialize LIKWID topology") + err := errors.New("failed to initialize LIKWID topology") + cclog.ComponentError(m.name, err.Error()) + return err } + globalParams := make(map[string]interface{}) + globalParams["time"] = float64(1.0) + globalParams["inverseClock"] = float64(1.0) + for i, evset := range m.config.Eventsets { estr := eventsToEventStr(evset.Events) + params := make(map[string]interface{}) + params["time"] = float64(1.0) + params["inverseClock"] = float64(1.0) + for counter, _ := range evset.Events { + params[counter] = float64(1.0) + } + for _, metric := range evset.Metrics { + _, err := gval.Evaluate(metric.Calc, params, gval.Full()) + if err != nil { + cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) + continue + } + if _, ok := globalParams[metric.Name]; !ok { + globalParams[metric.Name] = float64(1.0) + } + } cstr := C.CString(estr) gid := C.perfmon_addEventSet(cstr) if gid >= 0 { @@ -172,95 +286,173 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { m.gmresults[tid] = make(map[string]float64) } } + for _, metric := range m.config.Metrics { + _, err := gval.Evaluate(metric.Calc, globalParams, gval.Full()) + if err != nil { + cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) + continue + } + } if len(m.groups) == 0 { C.perfmon_finalize() C.topology_finalize() - return errors.New("No LIKWID performance group initialized") + err := errors.New("no LIKWID performance group initialized") + cclog.ComponentError(m.name, err.Error()) + return err } m.basefreq = getBaseFreq() m.init = true return nil } -func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) { - if !m.init { - return - } +func (m *LikwidCollector) takeMeasurement(group int, interval time.Duration) error { var ret C.int - - for i, gid := range m.groups { - evset := m.config.Eventsets[i] - ret = C.perfmon_setupCounters(gid) - if ret != 0 { - log.Print("Failed to setup performance group ", C.perfmon_getGroupName(gid)) - continue - } - ret = C.perfmon_startCounters() - if ret != 0 { - log.Print("Failed to start performance group ", C.perfmon_getGroupName(gid)) - continue - } - time.Sleep(interval) - ret = C.perfmon_stopCounters() - if ret != 0 { - log.Print("Failed to stop performance group ", C.perfmon_getGroupName(gid)) - continue - } - var eidx C.int - for tid := range m.cpulist { - for eidx = 0; int(eidx) < len(evset.Events); eidx++ { - ctr := C.perfmon_getCounterName(gid, eidx) - gctr := C.GoString(ctr) - res := C.perfmon_getLastResult(gid, eidx, C.int(tid)) - m.results[i][tid][gctr] = float64(res) - } - m.results[i][tid]["time"] = interval.Seconds() - m.results[i][tid]["inverseClock"] = float64(1.0 / m.basefreq) - for _, metric := range evset.Metrics { - expression, err := govaluate.NewEvaluableExpression(metric.Calc) - if err != nil { - log.Print(err.Error()) - continue - } - result, err := expression.Evaluate(m.results[i][tid]) - if err != nil { - log.Print(err.Error()) - continue - } - m.mresults[i][tid][metric.Name] = float64(result.(float64)) - } - } + gid := m.groups[group] + ret = C.perfmon_setupCounters(gid) + if ret != 0 { + gctr := C.GoString(C.perfmon_getGroupName(gid)) + err := fmt.Errorf("failed to setup performance group %s", gctr) + cclog.ComponentError(m.name, err.Error()) + return err } + ret = C.perfmon_startCounters() + if ret != 0 { + gctr := C.GoString(C.perfmon_getGroupName(gid)) + err := fmt.Errorf("failed to start performance group %s", gctr) + cclog.ComponentError(m.name, err.Error()) + return err + } + m.running = true + time.Sleep(interval) + m.running = false + ret = C.perfmon_stopCounters() + if ret != 0 { + gctr := C.GoString(C.perfmon_getGroupName(gid)) + err := fmt.Errorf("failed to stop performance group %s", gctr) + cclog.ComponentError(m.name, err.Error()) + return err + } + return nil +} - for _, metric := range m.config.Metrics { - for tid := range m.cpulist { - var params map[string]interface{} - expression, err := govaluate.NewEvaluableExpression(metric.Calc) +func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration) error { + var eidx C.int + evset := m.config.Eventsets[group] + gid := m.groups[group] + for tid := range m.cpulist { + for eidx = 0; int(eidx) < len(evset.Events); eidx++ { + ctr := C.perfmon_getCounterName(gid, eidx) + gctr := C.GoString(ctr) + res := C.perfmon_getLastResult(gid, eidx, C.int(tid)) + m.results[group][tid][gctr] = float64(res) + if m.results[group][tid][gctr] == 0 { + m.results[group][tid][gctr] = 1.0 + } + } + m.results[group][tid]["time"] = interval.Seconds() + m.results[group][tid]["inverseClock"] = float64(1.0 / m.basefreq) + for _, metric := range evset.Metrics { + value, err := gval.Evaluate(metric.Calc, m.results[group][tid], gval.Full()) if err != nil { - log.Print(err.Error()) + cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) continue } - params = make(map[string]interface{}) + m.mresults[group][tid][metric.Name] = value.(float64) + } + } + return nil +} + +func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration) error { + for _, metric := range m.config.Metrics { + for tid := range m.cpulist { + params := make(map[string]interface{}) for j := range m.groups { for mname, mres := range m.mresults[j][tid] { params[mname] = mres } } - result, err := expression.Evaluate(params) + value, err := gval.Evaluate(metric.Calc, params, gval.Full()) if err != nil { - log.Print(err.Error()) + cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) continue } - m.gmresults[tid][metric.Name] = float64(result.(float64)) + m.gmresults[tid][metric.Name] = value.(float64) } } + return nil +} + +// func (m *LikwidCollector) calcResultMetrics(interval time.Duration) ([]lp.CCMetric, error) { +// var err error = nil +// metrics := make([]lp.CCMetric, 0) +// for i := range m.groups { +// evset := m.config.Eventsets[i] +// for _, metric := range evset.Metrics { +// log.Print(metric.Name, " ", metric.Scope, " ", metric.granulatity) +// if metric.Scope.Granularity() > metric.granulatity.Granularity() { +// log.Print("Different granularity wanted for ", metric.Name, ": ", metric.Scope, " vs ", metric.granulatity) +// var idlist []int +// idfunc := func(cpuid int) int { return cpuid } +// switch metric.Scope { +// case "socket": +// idlist = topo.SocketList() +// idfunc = topo.GetCpuSocket +// case "numa": +// idlist = topo.NumaNodeList() +// idfunc = topo.GetCpuNumaDomain +// case "core": +// idlist = topo.CoreList() +// idfunc = topo.GetCpuCore +// case "die": +// idlist = topo.DieList() +// idfunc = topo.GetCpuDie +// case "node": +// idlist = topo.CpuList() +// } +// for i := 0; i < num_results; i++ { + +// } +// } +// } +// } +// for _, metric := range m.config.Metrics { +// log.Print(metric.Name, " ", metric.Scope, " ", metric.granulatity) +// if metric.Scope.Granularity() > metric.granulatity.Granularity() { +// log.Print("Different granularity wanted for ", metric.Name, ": ", metric.Scope, " vs ", metric.granulatity) +// } +// } +// return metrics, err +// } + +func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) { + if !m.init { + return + } + defer m.CatchGvalPanic() + + for i, _ := range m.groups { + // measure event set 'i' for 'interval' seconds + err := m.takeMeasurement(i, interval) + if err != nil { + cclog.ComponentError(m.name, err.Error()) + continue + } + m.calcEventsetMetrics(i, interval) + } + + m.calcGlobalMetrics(interval) + + //metrics, err = m.calcResultMetrics(interval) + for i := range m.groups { evset := m.config.Eventsets[i] for _, metric := range evset.Metrics { + _, skip := stringArrayContains(m.config.ExcludeMetrics, metric.Name) if metric.Publish && !skip { - if metric.Scope.String() == "socket" { + if metric.Scope == "socket" { for sid, tid := range m.sock2tid { y, err := lp.New(metric.Name, map[string]string{"type": "socket", @@ -272,7 +464,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) output <- y } } - } else if metric.Scope.String() == "hwthread" { + } else if metric.Scope == "hwthread" { for tid, cpu := range m.cpulist { y, err := lp.New(metric.Name, map[string]string{"type": "cpu", @@ -291,7 +483,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) for _, metric := range m.config.Metrics { _, skip := stringArrayContains(m.config.ExcludeMetrics, metric.Name) if metric.Publish && !skip { - if metric.Scope.String() == "socket" { + if metric.Scope == "socket" { for sid, tid := range m.sock2tid { y, err := lp.New(metric.Name, map[string]string{"type": "socket", @@ -303,7 +495,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) output <- y } } - } else { + } else if metric.Scope == "hwthread" { for tid, cpu := range m.cpulist { y, err := lp.New(metric.Name, map[string]string{"type": "cpu", @@ -322,8 +514,16 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) func (m *LikwidCollector) Close() { if m.init { + cclog.ComponentDebug(m.name, "Closing ...") m.init = false + if m.running { + cclog.ComponentDebug(m.name, "Stopping counters") + C.perfmon_stopCounters() + } + cclog.ComponentDebug(m.name, "Finalize LIKWID perfmon module") C.perfmon_finalize() + cclog.ComponentDebug(m.name, "Finalize LIKWID topology module") C.topology_finalize() + cclog.ComponentDebug(m.name, "Closing done") } }