mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-10-20 21:05:06 +02:00 
			
		
		
		
	Compare commits
	
		
			17 Commits
		
	
	
		
			likwidMetr
			...
			perf_colle
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 1edddc3dc2 | ||
|  | 7b343d0bab | ||
|  | 7d3180b526 | ||
|  | 70a6afc549 | ||
|  | e02a018327 | ||
|  | bcecdd033b | ||
|  | 2645ffeff3 | ||
|  | ee4e1baf5b | ||
|  | 94c80307e8 | ||
|  | e968aa1991 | ||
|  | d2a38e3844 | ||
|  | 1f35f6d3ca | ||
|  | 7e6870c7b3 | ||
|  | d881093524 | ||
|  | c01096c157 | ||
|  | 3d70c8afc9 | ||
|  | 7ee85a07dc | 
							
								
								
									
										20
									
								
								.github/workflows/Release.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										20
									
								
								.github/workflows/Release.yml
									
									
									
									
										vendored
									
									
								
							| @@ -73,21 +73,21 @@ jobs: | ||||
|         NEW_SRPM=${OLD_SRPM/el8/alma8} | ||||
|         mv "${OLD_RPM}" "${NEW_RPM}" | ||||
|         mv "${OLD_SRPM}" "${NEW_SRPM}" | ||||
|         echo "EL8_SRPM=${NEW_SRPM}" >> $GITHUB_OUTPUT | ||||
|         echo "EL8_RPM=${NEW_RPM}" >> $GITHUB_OUTPUT | ||||
|         echo "SRPM=${NEW_SRPM}" >> $GITHUB_OUTPUT | ||||
|         echo "RPM=${NEW_RPM}" >> $GITHUB_OUTPUT | ||||
|  | ||||
|     # See: https://github.com/actions/upload-artifact | ||||
|     - name: Save RPM as artifact | ||||
|       uses: actions/upload-artifact@v4 | ||||
|       with: | ||||
|         name: cc-metric-collector RPM for AlmaLinux 8 | ||||
|         path: ${{ steps.rpmrename.outputs.EL8_RPM }} | ||||
|         path: ${{ steps.rpmrename.outputs.RPM }} | ||||
|         overwrite: true | ||||
|     - name: Save SRPM as artifact | ||||
|       uses: actions/upload-artifact@v4 | ||||
|       with: | ||||
|         name: cc-metric-collector SRPM for AlmaLinux 8 | ||||
|         path: ${{ steps.rpmrename.outputs.EL8_SRPM }} | ||||
|         path: ${{ steps.rpmrename.outputs.SRPM }} | ||||
|         overwrite: true | ||||
|  | ||||
|   # | ||||
| @@ -152,21 +152,21 @@ jobs: | ||||
|         NEW_SRPM=${OLD_SRPM/el9/alma9} | ||||
|         mv "${OLD_RPM}" "${NEW_RPM}" | ||||
|         mv "${OLD_SRPM}" "${NEW_SRPM}" | ||||
|         echo "EL9_SRPM=${NEW_SRPM}" >> $GITHUB_OUTPUT | ||||
|         echo "EL9_RPM=${NEW_RPM}" >> $GITHUB_OUTPUT | ||||
|         echo "SRPM=${NEW_SRPM}" >> $GITHUB_OUTPUT | ||||
|         echo "RPM=${NEW_RPM}" >> $GITHUB_OUTPUT | ||||
|  | ||||
|     # See: https://github.com/actions/upload-artifact | ||||
|     - name: Save RPM as artifact | ||||
|       uses: actions/upload-artifact@v4 | ||||
|       with: | ||||
|         name: cc-metric-collector RPM for AlmaLinux 9 | ||||
|         path: ${{ steps.rpmrename.outputs.EL9_RPM }} | ||||
|         path: ${{ steps.rpmrename.outputs.RPM }} | ||||
|         overwrite: true | ||||
|     - name: Save SRPM as artifact | ||||
|       uses: actions/upload-artifact@v4 | ||||
|       with: | ||||
|         name: cc-metric-collector SRPM for AlmaLinux 9 | ||||
|         path: ${{ steps.rpmrename.outputs.EL9_SRPM }} | ||||
|         path: ${{ steps.rpmrename.outputs.SRPM }} | ||||
|         overwrite: true | ||||
|  | ||||
|   # | ||||
| @@ -235,6 +235,10 @@ jobs: | ||||
|     # See: https://catalog.redhat.com/software/containers/ubi8/ubi/5c359854d70cc534b3a3784e?container-tabs=gti | ||||
|     container: redhat/ubi9 | ||||
|     # The job outputs link to the outputs of the 'rpmbuild' step | ||||
|     # The job outputs link to the outputs of the 'rpmbuild' step | ||||
|     outputs: | ||||
|       rpm : ${{steps.rpmbuild.outputs.RPM}} | ||||
|       srpm : ${{steps.rpmbuild.outputs.SRPM}} | ||||
|     steps: | ||||
|  | ||||
|     # Use dnf to install development packages | ||||
|   | ||||
| @@ -41,6 +41,8 @@ var AvailableCollectors = map[string]MetricCollector{ | ||||
| 	"self":            new(SelfCollector), | ||||
| 	"schedstat":       new(SchedstatCollector), | ||||
| 	"nfsiostat":       new(NfsIOStatCollector), | ||||
| 	"perf_event":      new(PerfEventCollector), | ||||
| 	"perf_cmd":        new(PerfCmdCollector), | ||||
| } | ||||
|  | ||||
| // Metric collector manager data structure | ||||
|   | ||||
							
								
								
									
										384
									
								
								collectors/perfCmdMetric.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										384
									
								
								collectors/perfCmdMetric.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,384 @@ | ||||
| package collectors | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"math" | ||||
| 	"os" | ||||
| 	"os/exec" | ||||
| 	"regexp" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" | ||||
| 	topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology" | ||||
| ) | ||||
|  | ||||
| var perf_number_regex = regexp.MustCompile(`(\d+),(\d+)`) | ||||
|  | ||||
| const PERF_NOT_COUNTED = "<not counted>" | ||||
| const PERF_UNIT_NULL = "(null)" | ||||
|  | ||||
| var VALID_METRIC_TYPES = []string{ | ||||
| 	"hwthread", | ||||
| 	"core", | ||||
| 	"llc", | ||||
| 	"socket", | ||||
| 	"die", | ||||
| 	"node", | ||||
| 	"memoryDomain", | ||||
| } | ||||
|  | ||||
| type PerfCmdCollectorEventConfig struct { | ||||
| 	Metric      string            `json:"metric"`                     // metric name | ||||
| 	Event       string            `json:"event"`                      // perf event configuration | ||||
| 	Type        string            `json:"type"`                       // Metric type (aka node, socket, hwthread, ...) | ||||
| 	Tags        map[string]string `json:"tags,omitempty"`             // extra tags for the metric | ||||
| 	Meta        map[string]string `json:"meta,omitempty"`             // extra meta information for the metric | ||||
| 	Unit        string            `json:"unit,omitempty"`             // unit of metric (if any) | ||||
| 	UsePerfUnit bool              `json:"use_perf_unit,omitempty"`    // for some events perf tells a metric | ||||
| 	TypeAgg     string            `json:"type_aggregation,omitempty"` // how to aggregate cpu-data to metric type | ||||
| 	Publish     bool              `json:"publish,omitempty"` | ||||
| 	//lastCounterValue float64 | ||||
| 	//lastMetricValue  float64 | ||||
| 	collectorTags *map[string]string | ||||
| 	collectorMeta *map[string]string | ||||
| 	useCpus       map[int][]int | ||||
| } | ||||
|  | ||||
| type PerfCmdCollectorExpression struct { | ||||
| 	Metric     string `json:"metric"`                     // metric name | ||||
| 	Expression string `json:"expression"`                 // expression based on metrics | ||||
| 	Type       string `json:"type"`                       // Metric type (aka node, socket, hwthread, ...) | ||||
| 	TypeAgg    string `json:"type_aggregation,omitempty"` // how to aggregate cpu-data to metric type | ||||
| 	Publish    bool   `json:"publish,omitempty"` | ||||
| } | ||||
|  | ||||
| // These are the fields we read from the JSON configuration | ||||
| type PerfCmdCollectorConfig struct { | ||||
| 	Metrics     []PerfCmdCollectorEventConfig `json:"metrics"` | ||||
| 	Expressions []PerfCmdCollectorExpression  `json:"expressions"` | ||||
| 	PerfCmd     string                        `json:"perf_command,omitempty"` | ||||
| } | ||||
|  | ||||
| // This contains all variables we need during execution and the variables | ||||
| // defined by metricCollector (name, init, ...) | ||||
| type PerfCmdCollector struct { | ||||
| 	metricCollector | ||||
| 	config          PerfCmdCollectorConfig                  // the configuration structure | ||||
| 	meta            map[string]string                       // default meta information | ||||
| 	tags            map[string]string                       // default tags | ||||
| 	metrics         map[string]*PerfCmdCollectorEventConfig // list of events for internal data | ||||
| 	perfEventString string | ||||
| } | ||||
|  | ||||
| // Functions to implement MetricCollector interface | ||||
| // Init(...), Read(...), Close() | ||||
| // See: metricCollector.go | ||||
|  | ||||
| // Init initializes the sample collector | ||||
| // Called once by the collector manager | ||||
| // All tags, meta data tags and metrics that do not change over the runtime should be set here | ||||
| func (m *PerfCmdCollector) Init(config json.RawMessage) error { | ||||
| 	var err error = nil | ||||
| 	// Always set the name early in Init() to use it in cclog.Component* functions | ||||
| 	m.name = "PerfCmdCollector" | ||||
| 	m.parallel = false | ||||
| 	// This is for later use, also call it early | ||||
| 	m.setup() | ||||
| 	// Tell whether the collector should be run in parallel with others (reading files, ...) | ||||
| 	// or it should be run serially, mostly for collectors actually doing measurements | ||||
| 	// because they should not measure the execution of the other collectors | ||||
| 	m.parallel = true | ||||
| 	// Define meta information sent with each metric | ||||
| 	// (Can also be dynamic or this is the basic set with extension through AddMeta()) | ||||
| 	m.meta = map[string]string{"source": m.name, "group": "PerfCounter"} | ||||
| 	// Define tags sent with each metric | ||||
| 	// The 'type' tag is always needed, it defines the granularity of the metric | ||||
| 	// node -> whole system | ||||
| 	// socket -> CPU socket (requires socket ID as 'type-id' tag) | ||||
| 	// die -> CPU die (requires CPU die ID as 'type-id' tag) | ||||
| 	// memoryDomain -> NUMA domain (requires NUMA domain ID as 'type-id' tag) | ||||
| 	// llc -> Last level cache (requires last level cache ID as 'type-id' tag) | ||||
| 	// core -> single CPU core that may consist of multiple hardware threads (SMT) (requires core ID as 'type-id' tag) | ||||
| 	// hwthtread -> single CPU hardware thread (requires hardware thread ID as 'type-id' tag) | ||||
| 	// accelerator -> A accelerator device like GPU or FPGA (requires an accelerator ID as 'type-id' tag) | ||||
| 	m.tags = map[string]string{"type": "node"} | ||||
| 	// Read in the JSON configuration | ||||
| 	if len(config) > 0 { | ||||
| 		err = json.Unmarshal(config, &m.config) | ||||
| 		if err != nil { | ||||
| 			cclog.ComponentError(m.name, "Error reading config:", err.Error()) | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	m.config.PerfCmd = "perf" | ||||
| 	if len(m.config.PerfCmd) > 0 { | ||||
| 		_, err := os.Stat(m.config.PerfCmd) | ||||
| 		if err != nil { | ||||
| 			abs, err := exec.LookPath(m.config.PerfCmd) | ||||
| 			if err != nil { | ||||
| 				cclog.ComponentError(m.name, "Error looking up perf command", m.config.PerfCmd, ":", err.Error()) | ||||
| 				return err | ||||
| 			} | ||||
| 			m.config.PerfCmd = abs | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Set up everything that the collector requires during the Read() execution | ||||
| 	// Check files required, test execution of some commands, create data structure | ||||
| 	// for all topological entities (sockets, NUMA domains, ...) | ||||
| 	// Return some useful error message in case of any failures | ||||
|  | ||||
| 	valid_metrics := make([]*PerfCmdCollectorEventConfig, 0) | ||||
| 	valid_events := make([]string, 0) | ||||
| 	test_type := func(Type string) bool { | ||||
| 		for _, t := range VALID_METRIC_TYPES { | ||||
| 			if Type == t { | ||||
| 				return true | ||||
| 			} | ||||
| 		} | ||||
| 		return false | ||||
| 	} | ||||
| 	for i, metric := range m.config.Metrics { | ||||
| 		if !test_type(metric.Type) { | ||||
| 			cclog.ComponentError(m.name, "Metric", metric.Metric, "has an invalid type") | ||||
| 			continue | ||||
| 		} | ||||
| 		cmd := exec.Command(m.config.PerfCmd, "stat", "--null", "-e", metric.Event, "hostname") | ||||
| 		cclog.ComponentDebug(m.name, "Running", cmd.String()) | ||||
| 		err := cmd.Run() | ||||
| 		if err != nil { | ||||
| 			cclog.ComponentError(m.name, "Event", metric.Event, "not available in perf", err.Error()) | ||||
| 		} else { | ||||
| 			valid_metrics = append(valid_metrics, &m.config.Metrics[i]) | ||||
| 		} | ||||
| 	} | ||||
| 	if len(valid_metrics) == 0 { | ||||
| 		return errors.New("no configured metric available through perf") | ||||
| 	} | ||||
|  | ||||
| 	IntToStringList := func(ilist []int) []string { | ||||
| 		list := make([]string, 0) | ||||
| 		for _, i := range ilist { | ||||
| 			list = append(list, fmt.Sprintf("%v", i)) | ||||
| 		} | ||||
| 		return list | ||||
| 	} | ||||
|  | ||||
| 	m.metrics = make(map[string]*PerfCmdCollectorEventConfig, 0) | ||||
| 	for _, metric := range valid_metrics { | ||||
| 		metric.collectorMeta = &m.meta | ||||
| 		metric.collectorTags = &m.tags | ||||
| 		metric.useCpus = make(map[int][]int) | ||||
| 		tlist := topo.GetTypeList(metric.Type) | ||||
| 		cclog.ComponentDebug(m.name, "Metric", metric.Metric, "with type", metric.Type, ":", strings.Join(IntToStringList(tlist), ",")) | ||||
|  | ||||
| 		for _, t := range tlist { | ||||
| 			metric.useCpus[t] = topo.GetTypeHwthreads(metric.Type, t) | ||||
| 			cclog.ComponentDebug(m.name, "Metric", metric.Metric, "with type", metric.Type, "and ID", t, ":", strings.Join(IntToStringList(metric.useCpus[t]), ",")) | ||||
| 		} | ||||
|  | ||||
| 		m.metrics[metric.Event] = metric | ||||
| 		valid_events = append(valid_events, metric.Event) | ||||
| 	} | ||||
| 	m.perfEventString = strings.Join(valid_events, ",") | ||||
| 	cclog.ComponentDebug(m.name, "perfEventString", m.perfEventString) | ||||
|  | ||||
| 	// Set this flag only if everything is initialized properly, all required files exist, ... | ||||
| 	m.init = true | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| type PerfEventJson struct { | ||||
| 	CounterValue string `json:"counter-value"` | ||||
| 	counterValue float64 | ||||
| 	MetricValue  string `json:"metric-value"` | ||||
| 	metricValue  float64 | ||||
| 	CounterUnit  string `json:"unit"` | ||||
| 	counterUnit  string | ||||
| 	MetricUnit   string `json:"metric-unit"` | ||||
| 	metricUnit   string | ||||
| 	Cpu          string `json:"cpu,omitempty"` | ||||
| 	cpu          int | ||||
| 	Event        string  `json:"event"` | ||||
| 	Runtime      uint64  `json:"event-runtime"` | ||||
| 	PcntRunning  float64 `json:"pcnt-running"` | ||||
| 	metrictypeid string | ||||
| 	metrictype   string | ||||
| 	metricname   string | ||||
| 	publish      bool | ||||
| } | ||||
|  | ||||
| func parseEvent(line string) (*PerfEventJson, error) { | ||||
| 	data := PerfEventJson{} | ||||
|  | ||||
| 	tmp := perf_number_regex.ReplaceAllString(line, `$1.$2`) | ||||
| 	err := json.Unmarshal([]byte(tmp), &data) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if len(data.CounterValue) > 0 && data.CounterValue != PERF_NOT_COUNTED { | ||||
| 		val, err := strconv.ParseFloat(data.CounterValue, 64) | ||||
| 		if err == nil { | ||||
| 			if data.PcntRunning != 100.0 { | ||||
| 				val = (val / data.PcntRunning) * 100 | ||||
| 			} | ||||
| 			data.counterValue = val | ||||
| 		} | ||||
| 	} | ||||
| 	if len(data.MetricValue) > 0 && data.MetricValue != PERF_NOT_COUNTED { | ||||
| 		val, err := strconv.ParseFloat(data.MetricValue, 64) | ||||
| 		if err == nil { | ||||
| 			if data.PcntRunning != 100.0 { | ||||
| 				val = (val / data.PcntRunning) * 100 | ||||
| 			} | ||||
| 			data.metricValue = val | ||||
| 		} | ||||
| 	} | ||||
| 	if len(data.CounterUnit) > 0 && data.CounterUnit != PERF_UNIT_NULL { | ||||
| 		data.counterUnit = data.CounterUnit | ||||
| 	} | ||||
| 	if len(data.MetricUnit) > 0 && data.MetricUnit != PERF_UNIT_NULL { | ||||
| 		data.metricUnit = data.MetricUnit | ||||
| 	} | ||||
| 	if len(data.Cpu) > 0 { | ||||
| 		val, err := strconv.ParseInt(data.Cpu, 10, 64) | ||||
| 		if err == nil { | ||||
| 			data.cpu = int(val) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return &data, nil | ||||
| } | ||||
|  | ||||
| func perfdataToMetric(data *PerfEventJson, config *PerfCmdCollectorEventConfig, timestamp time.Time) (lp.CCMetric, error) { | ||||
| 	metric, err := lp.NewMetric(config.Metric, *config.collectorTags, *config.collectorMeta, data.counterValue, timestamp) | ||||
| 	if err == nil { | ||||
| 		metric.AddTag("type", data.metrictype) | ||||
| 		if data.metrictype != "node" { | ||||
| 			metric.AddTag("type-id", data.metrictypeid) | ||||
| 		} | ||||
| 		for k, v := range config.Tags { | ||||
| 			metric.AddTag(k, v) | ||||
| 		} | ||||
| 		for k, v := range config.Meta { | ||||
| 			metric.AddMeta(k, v) | ||||
| 		} | ||||
| 		if len(config.Unit) > 0 { | ||||
| 			metric.AddMeta("unit", config.Unit) | ||||
| 		} | ||||
| 		if config.UsePerfUnit && (!metric.HasMeta("unit")) && (!metric.HasTag("unit")) { | ||||
| 			var unit string = "" | ||||
| 			if len(data.counterUnit) > 0 { | ||||
| 				unit = data.counterUnit | ||||
| 			} else if len(data.metricUnit) > 0 { | ||||
| 				unit = data.metricUnit | ||||
| 			} | ||||
| 			if len(unit) > 0 { | ||||
| 				metric.AddMeta("unit", unit) | ||||
| 			} | ||||
| 		} | ||||
| 		return metric, nil | ||||
| 	} | ||||
| 	return nil, err | ||||
| } | ||||
|  | ||||
| // Read collects all metrics belonging to the sample collector | ||||
| // and sends them through the output channel to the collector manager | ||||
| func (m *PerfCmdCollector) Read(interval time.Duration, output chan lp.CCMessage) { | ||||
| 	perfdata := make([]*PerfEventJson, 0) | ||||
| 	// Create a sample metric | ||||
| 	timestamp := time.Now() | ||||
|  | ||||
| 	cmd := exec.Command(m.config.PerfCmd, "stat", "-A", "-a", "-j", "-e", m.perfEventString, "/usr/bin/sleep", fmt.Sprintf("%d", int(interval.Seconds()))) | ||||
|  | ||||
| 	cclog.ComponentDebug(m.name, "Running", cmd.String()) | ||||
| 	out, err := cmd.CombinedOutput() | ||||
| 	if err == nil { | ||||
| 		sout := strings.TrimSpace(string(out)) | ||||
| 		for _, l := range strings.Split(sout, "\n") { | ||||
| 			d, err := parseEvent(l) | ||||
| 			if err == nil { | ||||
| 				perfdata = append(perfdata, d) | ||||
| 			} | ||||
| 		} | ||||
| 	} else { | ||||
| 		cclog.ComponentError(m.name, "Execution of", cmd.String(), "failed with", err.Error()) | ||||
| 	} | ||||
|  | ||||
| 	metricData := make([]*PerfEventJson, 0) | ||||
| 	for _, metricTmp := range m.config.Metrics { | ||||
| 		metricConfig := m.metrics[metricTmp.Event] | ||||
| 		for t, clist := range metricConfig.useCpus { | ||||
| 			val := float64(0) | ||||
| 			sum := float64(0) | ||||
| 			min := math.MaxFloat64 | ||||
| 			max := float64(0) | ||||
| 			count := 0 | ||||
| 			cunit := "" | ||||
| 			munit := "" | ||||
| 			for _, c := range clist { | ||||
| 				for _, d := range perfdata { | ||||
| 					if strings.HasPrefix(d.Event, metricConfig.Event) && d.cpu == c { | ||||
| 						//cclog.ComponentDebug(m.name, "do calc on CPU", c, ":", d.counterValue) | ||||
| 						sum += d.counterValue | ||||
| 						if d.counterValue < min { | ||||
| 							min = d.counterValue | ||||
| 						} | ||||
| 						if d.counterValue > max { | ||||
| 							max = d.counterValue | ||||
| 						} | ||||
| 						count++ | ||||
| 						cunit = d.counterUnit | ||||
| 						munit = d.metricUnit | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 			if metricConfig.TypeAgg == "sum" { | ||||
| 				val = sum | ||||
| 			} else if metricConfig.TypeAgg == "min" { | ||||
| 				val = min | ||||
| 			} else if metricConfig.TypeAgg == "max" { | ||||
| 				val = max | ||||
| 			} else if metricConfig.TypeAgg == "avg" || metricConfig.TypeAgg == "mean" { | ||||
| 				val = sum / float64(count) | ||||
| 			} else { | ||||
| 				val = sum | ||||
| 			} | ||||
| 			//cclog.ComponentDebug(m.name, "Metric", metricConfig.Metric, "type", metricConfig.Type, "ID", t, ":", val) | ||||
| 			metricData = append(metricData, &PerfEventJson{ | ||||
| 				Event:        metricConfig.Event, | ||||
| 				metricname:   metricConfig.Metric, | ||||
| 				metrictype:   metricConfig.Type, | ||||
| 				metrictypeid: fmt.Sprintf("%v", t), | ||||
| 				counterValue: val, | ||||
| 				metricValue:  0, | ||||
| 				metricUnit:   munit, | ||||
| 				counterUnit:  cunit, | ||||
| 				publish:      metricConfig.Publish, | ||||
| 			}) | ||||
| 		} | ||||
|  | ||||
| 	} | ||||
|  | ||||
| 	for _, d := range metricData { | ||||
| 		if d.publish { | ||||
| 			m, err := perfdataToMetric(d, m.metrics[d.Event], timestamp) | ||||
| 			if err == nil { | ||||
| 				output <- m | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| } | ||||
|  | ||||
| // Close metric collector: close network connection, close files, close libraries, ... | ||||
| // Called once by the collector manager | ||||
| func (m *PerfCmdCollector) Close() { | ||||
| 	// Unset flag | ||||
| 	m.init = false | ||||
| } | ||||
							
								
								
									
										54
									
								
								collectors/perfCmdMetric.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										54
									
								
								collectors/perfCmdMetric.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,54 @@ | ||||
| # PerfCmdMetric collector | ||||
|  | ||||
|  | ||||
| ## Configuration | ||||
|  | ||||
| ```json | ||||
| { | ||||
|     "perf_command": "perf", | ||||
|     "metrics" : [ | ||||
|         { | ||||
|             "name": "cpu_cycles", | ||||
|             "event": "cycles", | ||||
|             "unit": "Hz", | ||||
|             "type": "hwthread", | ||||
|             "publish": true, | ||||
|             "use_perf_unit": false, | ||||
|             "type_aggregation": "socket", | ||||
|             "tags": { | ||||
|                 "tags_just" : "for_the_event" | ||||
|             }, | ||||
|             "meta": { | ||||
|                 "meta_info_just" : "for_the_event" | ||||
|             } | ||||
|         } | ||||
|     ], | ||||
|     "expressions": [ | ||||
|         { | ||||
|             "metric": "avg_cycles_per_second", | ||||
|             "expression": "cpu_cycles / time", | ||||
|             "type": "node", | ||||
|             "type_aggregation": "avg", | ||||
|             "publish": true | ||||
|         } | ||||
|     ] | ||||
| } | ||||
| ``` | ||||
|  | ||||
| - `perf_command`: Path to the `perf` command. If it is not an absolute path, the command is looked up in `$PATH`. | ||||
| - `metrics`: List of metrics to measure | ||||
|     - `name`: Name of metric for output and expressions | ||||
|     - `event`: Event as supplied to `perf stat -e <event>` like `cycles` or `uncore_imc_0/event=0x01,umask=0x00/` | ||||
|     - `unit` : Unit for the metric. Will be added as meta information thus similar then adding `"meta" : {"unit": "myunit"}`. | ||||
|     - `type`: Do measurments at this level (`hwthread` and `socket` are the most common ones). | ||||
|     - `publish`: Publish the metric or use it only for expressions. | ||||
|     - `use_perf_unit`: For some events, `perf` outputs a unit. With this switch, the unit provided by `perf` is added as meta informations. | ||||
|     - `type_aggregation`: Sum the metric values to the given type | ||||
|     - `tags`: Tags just for this metric | ||||
|     - `meta`: Meta informations just for this metric | ||||
| - `expressions`: Calculate metrics out of multiple measurements | ||||
|     - `metric`: Name of metric for output | ||||
|     - `expression`: What should be calculated | ||||
|     - `type`: Aggregate the expression results to this level | ||||
|     - `type_aggregation`: Aggregate the expression results with `sum`, `min`, `max`, `avg` or `mean` | ||||
|     - `publish`: Publish metric | ||||
							
								
								
									
										481
									
								
								collectors/perfEventMetric.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										481
									
								
								collectors/perfEventMetric.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,481 @@ | ||||
| package collectors | ||||
|  | ||||
| /* | ||||
| #cgo CFLAGS: -I/usr/include | ||||
| #cgo LDFLAGS: -Wl,--unresolved-symbols=ignore-in-object-files | ||||
| #include <stdlib.h> | ||||
| #include <unistd.h> | ||||
| #include <stdint.h> | ||||
| #include <linux/perf_event.h> | ||||
| #include <linux/hw_breakpoint.h> | ||||
| #include <sys/ioctl.h> | ||||
| #include <syscall.h> | ||||
| #include <string.h> | ||||
| #include <errno.h> | ||||
|  | ||||
| typedef enum { | ||||
| 	PERF_EVENT_WITH_CONFIG1 = (1<<0), | ||||
| 	PERF_EVENT_WITH_CONFIG2 = (1<<1), | ||||
| 	PERF_EVENT_WITH_EXCLUDE_KERNEL = (1<<2), | ||||
| 	PERF_EVENT_WITH_EXCLUDE_HV = (1<<3), | ||||
| } PERF_EVENT_FLAG; | ||||
|  | ||||
| int perf_event_open(int type, uint64_t config, int cpu, uint64_t config1, uint64_t config2, int uncore) | ||||
| { | ||||
| 	int ret; | ||||
| 	struct perf_event_attr attr; | ||||
|  | ||||
| 	memset(&attr, 0, sizeof(struct perf_event_attr)); | ||||
| 	attr.type = type; | ||||
| 	attr.config = config; | ||||
| 	if (!uncore) { | ||||
| 		attr.exclude_kernel = 1; | ||||
| 		attr.exclude_hv = 1; | ||||
| 	} | ||||
| 	//attr.disabled = 1; | ||||
| 	// | ||||
| 	// if (config1 > 0) | ||||
| 	// { | ||||
| 	// 	attr.config1 = config1; | ||||
| 	// } | ||||
| 	// if (config2 > 0) | ||||
| 	// { | ||||
| 	// 	attr.config2 = config2; | ||||
| 	// } | ||||
| 	// if (flags & PERF_EVENT_WITH_CONFIG1) | ||||
| 	// { | ||||
| 	// 	attr.config1 = config1; | ||||
| 	// } | ||||
| 	// if (flags & PERF_EVENT_WITH_CONFIG2) | ||||
| 	// { | ||||
| 	// 	attr.config2 = config2; | ||||
| 	// } | ||||
| 	// if (flags & PERF_EVENT_WITH_EXCLUDE_KERNEL) | ||||
| 	// { | ||||
| 	// 	attr.exclude_kernel = 1; | ||||
| 	// } | ||||
| 	// if (flags & PERF_EVENT_WITH_EXCLUDE_HV) | ||||
| 	// { | ||||
| 	// 	attr.exclude_hv = 1; | ||||
| 	// } | ||||
|  | ||||
|  | ||||
|  | ||||
| 	ret = syscall(__NR_perf_event_open, &attr, -1, cpu, -1, 0); | ||||
| 	if (ret < 0) | ||||
| 	{ | ||||
| 		return -errno; | ||||
| 	} | ||||
| 	return 0; | ||||
| } | ||||
|  | ||||
| int perf_event_stop(int fd) | ||||
| { | ||||
| 	return ioctl(fd, PERF_EVENT_IOC_DISABLE, 0); | ||||
| } | ||||
|  | ||||
|  | ||||
| int perf_event_start(int fd) | ||||
| { | ||||
| 	return ioctl(fd, PERF_EVENT_IOC_ENABLE, 0); | ||||
| } | ||||
|  | ||||
| int perf_event_reset(int fd) | ||||
| { | ||||
| 	return ioctl(fd, PERF_EVENT_IOC_RESET, 0); | ||||
| } | ||||
|  | ||||
| int perf_event_read(int fd, uint64_t *data) | ||||
| { | ||||
| 	int ret = 0; | ||||
|  | ||||
| 	ret = read(fd, data, sizeof(uint64_t)); | ||||
| 	if (ret != sizeof(uint64_t)) | ||||
| 	{ | ||||
| 		return -errno; | ||||
| 	} | ||||
| 	return 0; | ||||
| } | ||||
|  | ||||
| int perf_event_close(int fd) | ||||
| { | ||||
| 	close(fd); | ||||
| } | ||||
|  | ||||
| */ | ||||
| import "C" | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"os" | ||||
| 	"path" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" | ||||
| 	"github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology" | ||||
| ) | ||||
|  | ||||
| const SYSFS_PERF_EVENT_PATH = `/sys/devices` | ||||
|  | ||||
| type PerfEventCollectorEventConfig struct { | ||||
| 	Name              string `json:"name"` | ||||
| 	Unit              string `json:"unit,omitempty"` | ||||
| 	unitType          int | ||||
| 	Config            string `json:"config"` | ||||
| 	config            C.uint64_t | ||||
| 	Config1           string `json:"config1,omitempty"` | ||||
| 	config1           C.uint64_t | ||||
| 	Config2           string `json:"config2,omitempty"` | ||||
| 	config2           C.uint64_t | ||||
| 	ExcludeKernel     bool              `json:"exclude_kernel,omitempty"` | ||||
| 	ExcludeHypervisor bool              `json:"exclude_hypervisor,omitempty"` | ||||
| 	Tags              map[string]string `json:"tags,omitempty"` | ||||
| 	Meta              map[string]string `json:"meta,omitempty"` | ||||
| 	PerHwthread       bool              `json:"per_hwthread,omitempty"` | ||||
| 	PerSocket         bool              `json:"per_socket,omitempty"` | ||||
| 	ScaleFile         string            `json:"scale_file,omitempty"` | ||||
| 	scaling_factor    float64 | ||||
| 	flags             uint64 | ||||
| 	valid             bool | ||||
| 	cpumask           []int | ||||
| } | ||||
|  | ||||
| type PerfEventCollectorEventData struct { | ||||
| 	fd        C.int | ||||
| 	last      uint64 | ||||
| 	last_diff uint64 | ||||
| 	idx       int | ||||
| } | ||||
|  | ||||
| type PerfEventCollectorConfig struct { | ||||
| 	Events []PerfEventCollectorEventConfig `json:"events"` | ||||
| 	events []PerfEventCollectorEventConfig | ||||
| } | ||||
|  | ||||
| type PerfEventCollector struct { | ||||
| 	metricCollector | ||||
| 	config PerfEventCollectorConfig // the configuration structure | ||||
| 	meta   map[string]string        // default meta information | ||||
| 	tags   map[string]string        // default tags | ||||
| 	events map[int]map[int]PerfEventCollectorEventData | ||||
| } | ||||
|  | ||||
| func UpdateEventConfig(event *PerfEventCollectorEventConfig) error { | ||||
| 	parseHexNumber := func(number string) (uint64, error) { | ||||
| 		snum := strings.Trim(number, "\n") | ||||
| 		snum = strings.Replace(snum, "0x", "", -1) | ||||
| 		snum = strings.Replace(snum, "0X", "", -1) | ||||
| 		return strconv.ParseUint(snum, 16, 64) | ||||
| 	} | ||||
| 	if len(event.Unit) == 0 { | ||||
| 		event.Unit = "cpu" | ||||
| 	} | ||||
|  | ||||
| 	unitpath := path.Join(SYSFS_PERF_EVENT_PATH, event.Unit) | ||||
| 	if _, err := os.Stat(unitpath); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	typefile := path.Join(unitpath, "type") | ||||
| 	if _, err := os.Stat(typefile); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	typebytes, err := os.ReadFile(typefile) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	typestring := string(typebytes) | ||||
| 	ut, err := strconv.ParseUint(strings.Trim(typestring, "\n"), 10, 64) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	event.unitType = int(ut) | ||||
|  | ||||
| 	if len(event.Config) > 0 { | ||||
| 		x, err := parseHexNumber(event.Config) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		event.config = C.uint64_t(x) | ||||
| 	} | ||||
| 	if len(event.Config1) > 0 { | ||||
| 		x, err := parseHexNumber(event.Config1) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		event.config1 = C.uint64_t(x) | ||||
| 	} | ||||
| 	if len(event.Config2) > 0 { | ||||
| 		x, err := parseHexNumber(event.Config2) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		event.config2 = C.uint64_t(x) | ||||
| 	} | ||||
| 	if len(event.ScaleFile) > 0 { | ||||
| 		if _, err := os.Stat(event.ScaleFile); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		scalebytes, err := os.ReadFile(event.ScaleFile) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		x, err := strconv.ParseFloat(string(scalebytes), 64) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		event.scaling_factor = x | ||||
| 	} | ||||
| 	event.cpumask = make([]int, 0) | ||||
| 	cpumaskfile := path.Join(unitpath, "cpumask") | ||||
| 	if _, err := os.Stat(cpumaskfile); err == nil { | ||||
|  | ||||
| 		cpumaskbytes, err := os.ReadFile(cpumaskfile) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		cpumaskstring := strings.Trim(string(cpumaskbytes), "\n") | ||||
| 		cclog.Debug("cpumask", cpumaskstring) | ||||
| 		for _, part := range strings.Split(cpumaskstring, ",") { | ||||
| 			start := 0 | ||||
| 			end := 0 | ||||
| 			count, _ := fmt.Sscanf(part, "%d-%d", &start, &end) | ||||
| 			cclog.Debug("scanf", count, " s ", start, " e ", end) | ||||
|  | ||||
| 			if count == 1 { | ||||
| 				cclog.Debug("adding ", start) | ||||
| 				event.cpumask = append(event.cpumask, start) | ||||
| 			} else if count == 2 { | ||||
| 				for i := start; i <= end; i++ { | ||||
| 					cclog.Debug("adding ", i) | ||||
| 					event.cpumask = append(event.cpumask, i) | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 		} | ||||
| 	} else { | ||||
| 		event.cpumask = append(event.cpumask, ccTopology.CpuList()...) | ||||
| 	} | ||||
|  | ||||
| 	event.valid = true | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (m *PerfEventCollector) Init(config json.RawMessage) error { | ||||
| 	var err error = nil | ||||
|  | ||||
| 	m.name = "PerfEventCollector" | ||||
|  | ||||
| 	m.setup() | ||||
|  | ||||
| 	m.parallel = false | ||||
|  | ||||
| 	m.meta = map[string]string{"source": m.name, "group": "PerfCounter"} | ||||
|  | ||||
| 	m.tags = map[string]string{"type": "node"} | ||||
|  | ||||
| 	cpudata := ccTopology.CpuData() | ||||
|  | ||||
| 	if len(config) > 0 { | ||||
| 		err = json.Unmarshal(config, &m.config) | ||||
| 		if err != nil { | ||||
| 			cclog.ComponentError(m.name, "Error reading config:", err.Error()) | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	for i, e := range m.config.Events { | ||||
| 		err = UpdateEventConfig(&e) | ||||
| 		if err != nil { | ||||
| 			cclog.ComponentError(m.name, "Checks for event unit", e.Name, "failed:", err.Error()) | ||||
| 		} | ||||
| 		m.config.Events[i] = e | ||||
| 	} | ||||
| 	total := 0 | ||||
| 	m.events = make(map[int]map[int]PerfEventCollectorEventData) | ||||
| 	for _, hwt := range cpudata { | ||||
| 		cclog.ComponentDebug(m.name, "Adding events for cpuid", hwt.CpuID) | ||||
| 		hwt_events := make(map[int]PerfEventCollectorEventData) | ||||
| 		for j, e := range m.config.Events { | ||||
| 			if e.valid { | ||||
| 				if _, ok := intArrayContains(e.cpumask, hwt.CpuID); ok { | ||||
| 					cclog.ComponentDebug(m.name, "Adding event", e.Name, fmt.Sprintf("(cpuid %d unit %s(%d) config %s config1 %s config2 %s)", | ||||
| 						hwt.CpuID, | ||||
| 						e.Unit, | ||||
| 						e.unitType, | ||||
| 						e.Config, | ||||
| 						e.Config1, | ||||
| 						e.Config2, | ||||
| 					)) | ||||
| 					// (int type, uint64_t config, int cpu, uint64_t config1, uint64_t config2, int uncore) | ||||
| 					fd := C.perf_event_open(C.int(e.unitType), e.config, C.int(hwt.CpuID), e.config1, e.config2, C.int(1)) | ||||
| 					if fd < 0 { | ||||
| 						cclog.ComponentError(m.name, "Failed to create event", e.Name, ":", fd) | ||||
| 						continue | ||||
| 					} | ||||
| 					hwt_events[j] = PerfEventCollectorEventData{ | ||||
| 						idx:  j, | ||||
| 						fd:   fd, | ||||
| 						last: 0, | ||||
| 					} | ||||
| 					total++ | ||||
| 				} else { | ||||
| 					cclog.ComponentDebug(m.name, "Cpu not in cpumask of unit", e.cpumask) | ||||
| 					hwt_events[j] = PerfEventCollectorEventData{ | ||||
| 						idx:  j, | ||||
| 						fd:   -1, | ||||
| 						last: 0, | ||||
| 					} | ||||
| 				} | ||||
| 			} else { | ||||
| 				cclog.ComponentError(m.name, "Event", e.Name, "not valid") | ||||
| 			} | ||||
| 		} | ||||
| 		cclog.ComponentDebug(m.name, "Adding", len(hwt_events), "events for cpuid", hwt.CpuID) | ||||
| 		m.events[hwt.CpuID] = hwt_events | ||||
| 	} | ||||
| 	if total == 0 { | ||||
| 		cclog.ComponentError(m.name, "Failed to add events") | ||||
| 		return errors.New("failed to add events") | ||||
| 	} | ||||
|  | ||||
| 	m.init = true | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| func (m *PerfEventCollector) CalcSocketData() map[int]map[int]interface{} { | ||||
| 	out := make(map[int]map[int]interface{}) | ||||
|  | ||||
| 	for cpuid, cpudata := range m.events { | ||||
| 		for i, eventdata := range cpudata { | ||||
| 			eventconfig := m.config.Events[i] | ||||
| 			sid := ccTopology.GetHwthreadSocket(cpuid) | ||||
| 			if _, ok := out[sid]; !ok { | ||||
| 				out[sid] = make(map[int]interface{}) | ||||
| 				for i := range cpudata { | ||||
| 					out[sid][i] = 0 | ||||
| 				} | ||||
| 			} | ||||
| 			if eventconfig.scaling_factor != 0 { | ||||
| 				out[sid][i] = out[sid][i].(float64) + (float64(eventdata.last_diff) * eventconfig.scaling_factor) | ||||
| 			} else { | ||||
| 				out[sid][i] = out[sid][i].(uint64) + eventdata.last_diff | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return out | ||||
| } | ||||
|  | ||||
| func (m *PerfEventCollector) Read(interval time.Duration, output chan lp.CCMessage) { | ||||
|  | ||||
| 	timestamp := time.Now() | ||||
|  | ||||
| 	var wg sync.WaitGroup | ||||
|  | ||||
| 	for cpuid := range m.events { | ||||
| 		wg.Add(1) | ||||
| 		go func(cpuid int, data map[int]map[int]PerfEventCollectorEventData, wg *sync.WaitGroup) { | ||||
| 			var err error = nil | ||||
| 			var events map[int]PerfEventCollectorEventData = data[cpuid] | ||||
| 			for i, e := range events { | ||||
|  | ||||
| 				var data C.uint64_t = 0 | ||||
| 				if e.fd < 0 { | ||||
| 					continue | ||||
| 				} | ||||
| 				ret := C.perf_event_read(e.fd, &data) | ||||
| 				if ret < 0 { | ||||
| 					event := m.config.Events[i] | ||||
| 					cclog.ComponentError(m.name, "Failed to read event", event.Name, ":", ret) | ||||
| 				} | ||||
| 				if e.last == 0 { | ||||
| 					cclog.ComponentDebug(m.name, "Updating last value on first iteration") | ||||
| 					e.last = uint64(data) | ||||
|  | ||||
| 				} else { | ||||
| 					var metric lp.CCMetric | ||||
| 					event := m.config.Events[i] | ||||
| 					value := uint64(data) - e.last | ||||
| 					cclog.ComponentDebug(m.name, "Calculating difference", uint64(data), "-", e.last, "=", uint64(data)-e.last) | ||||
| 					e.last = uint64(data) | ||||
| 					e.last_diff = value | ||||
|  | ||||
| 					if event.scaling_factor == 0 { | ||||
| 						metric, err = lp.NewMetric(event.Name, m.tags, m.meta, value, timestamp) | ||||
| 					} else { | ||||
| 						var f64_value float64 = float64(value) * event.scaling_factor | ||||
| 						metric, err = lp.NewMetric(event.Name, m.tags, m.meta, f64_value, timestamp) | ||||
| 					} | ||||
| 					//if event.PerHwthread { | ||||
| 					if err == nil { | ||||
| 						metric.AddTag("type", "hwthread") | ||||
| 						metric.AddTag("type-id", fmt.Sprintf("%d", cpuid)) | ||||
| 						for k, v := range event.Tags { | ||||
| 							metric.AddTag(k, v) | ||||
| 						} | ||||
| 						for k, v := range event.Meta { | ||||
| 							metric.AddMeta(k, v) | ||||
| 						} | ||||
| 						output <- metric | ||||
| 					} else { | ||||
| 						cclog.ComponentError(m.name, "Failed to create CCMetric for event", event.Name) | ||||
| 					} | ||||
| 					//} | ||||
| 				} | ||||
| 				events[i] = e | ||||
| 			} | ||||
| 			data[cpuid] = events | ||||
| 			wg.Done() | ||||
| 		}(cpuid, m.events, &wg) | ||||
| 	} | ||||
| 	wg.Wait() | ||||
|  | ||||
| 	// 	var data C.uint64_t = 0 | ||||
| 	// 	event := m.config.Events[e.idx] | ||||
| 	// 	cclog.ComponentDebug(m.name, "Reading event", event.Name) | ||||
| 	// 	ret := C.perf_event_read(e.fd, &data) | ||||
| 	// 	if ret < 0 { | ||||
| 	// 		cclog.ComponentError(m.name, "Failed to read event", event.Name, ":", ret) | ||||
| 	// 	} | ||||
| 	// 	if e.last == 0 { | ||||
| 	// 		cclog.ComponentDebug(m.name, "Updating last value on first iteration") | ||||
| 	// 		e.last = uint64(data) | ||||
|  | ||||
| 	// 	} else { | ||||
| 	// 		value := uint64(data) - e.last | ||||
| 	// 		cclog.ComponentDebug(m.name, "Calculating difference", uint64(data), "-", e.last, "=", uint64(data)-e.last) | ||||
| 	// 		e.last = uint64(data) | ||||
|  | ||||
| 	// 		y, err := lp.NewMetric(event.Name, m.tags, m.meta, value, timestamp) | ||||
| 	// 		if err == nil { | ||||
| 	// 			for k, v := range event.Tags { | ||||
| 	// 				y.AddTag(k, v) | ||||
| 	// 			} | ||||
| 	// 			for k, v := range event.Meta { | ||||
| 	// 				y.AddMeta(k, v) | ||||
| 	// 			} | ||||
| 	// 			output <- y | ||||
| 	// 		} else { | ||||
| 	// 			cclog.ComponentError(m.name, "Failed to create CCMetric for event", event.Name) | ||||
| 	// 		} | ||||
| 	// 	} | ||||
| 	// 	m.events[i] = e | ||||
| 	// } | ||||
|  | ||||
| } | ||||
|  | ||||
| func (m *PerfEventCollector) Close() { | ||||
|  | ||||
| 	for _, events := range m.events { | ||||
| 		for _, e := range events { | ||||
| 			C.perf_event_close(e.fd) | ||||
| 		} | ||||
| 	} | ||||
| 	m.init = false | ||||
| } | ||||
							
								
								
									
										44
									
								
								collectors/perfEventMetric.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										44
									
								
								collectors/perfEventMetric.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,44 @@ | ||||
| # `perf_event` collector | ||||
|  | ||||
| This collector uses directly the `perf_event_open` system call to measure events. There is no name to event translation, the configuration has to be as low-level as required by the system call. It allows to aggregate the measurements to topological entities like socket or the whole node. | ||||
|  | ||||
| ## Configuration | ||||
|  | ||||
| ```json | ||||
| { | ||||
|     "events" : [ | ||||
|         { | ||||
|             "name" : "instructions", | ||||
|             "unit" : "uncore_imc_0", | ||||
|             "config": "0x01", | ||||
|             "scale_file" : "/sys/devices/<unit>/events/<event>.scale", | ||||
|             "per_hwthread": true, | ||||
|             "per_socket": true, | ||||
|             "exclude_kernel": true, | ||||
|             "exclude_hypervisor": true, | ||||
|             "tags": { | ||||
|                 "tags": "just_for_the_event" | ||||
|             }, | ||||
|             "meta": { | ||||
|                 "meta_info": "just_for_the_event" | ||||
|             }, | ||||
|             "config1": "0x00", | ||||
|             "config2": "0x00", | ||||
|         } | ||||
|     ] | ||||
| } | ||||
| ``` | ||||
|  | ||||
| - `events`: List of events to measure | ||||
| - `name`: Name for the metric | ||||
| - `unit`: Unit of the event or `cpu` if not given. The unit type ID is resolved by reading the file `/sys/devices/<unit>/type`. The unit type ID is then written to the `perf_event_attr` struct member `type`. | ||||
| - `config`: Hex value written to the `perf_event_attr` struct member `config`. | ||||
| - `config1`: Hex value written to the `perf_event_attr` struct member `config1` (optional). | ||||
| - `config2`: Hex value written to the `perf_event_attr` struct member `config1` (optional). | ||||
| - `scale_file`: If a measurement requires scaling, like the `power` unit aka RAPL, it is provided by the kernel in a `.scale` file at `/sys/devices/<unit>/events/<event>.scale`. | ||||
| - `exclude_kernel`: Exclude the kernel from measurements (default: `true`). It sets the `perf_event_attr` struct member `exclude_kernel`. | ||||
| - `exclude_hypervisor`: Exclude the hypervisors from measurements (default: `true`). It sets the `perf_event_attr` struct member `exclude_hypervisor`. | ||||
| - `per_hwthread`: Generate metrics per hardware thread (default: `false`) | ||||
| - `per_socket`: Generate metrics per hardware thread (default: `false`) | ||||
| - `tags`: Tags just for the event. | ||||
| - `meta`: Meta information just for the event, often a `unit` | ||||
| @@ -1,11 +1,9 @@ | ||||
| # Running average power limit (RAPL) metric collector | ||||
| ## `rapl` collector | ||||
|  | ||||
| This collector reads running average power limit (RAPL) monitoring attributes to compute average power consumption metrics. See <https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes>. | ||||
|  | ||||
| The Likwid metric collector provides similar functionality. | ||||
|  | ||||
| ## Configuration | ||||
|  | ||||
| ```json | ||||
|   "rapl": { | ||||
|     "exclude_device_by_id": ["0:1", "0:2"], | ||||
| @@ -13,6 +11,5 @@ The Likwid metric collector provides similar functionality. | ||||
|   } | ||||
| ``` | ||||
|  | ||||
| ## Metrics | ||||
|  | ||||
| Metrics: | ||||
| * `rapl_average_power`: average power consumption in Watt. The average is computed over the entire runtime from the last measurement to the current measurement | ||||
|   | ||||
| @@ -29,20 +29,20 @@ type messageProcessorConfig struct { | ||||
| 	RenameMessagesIf map[string]string           `json:"rename_messages_if,omitempty"`   // Map to rename metric name based on a condition | ||||
| 	NormalizeUnits   bool                        `json:"normalize_units,omitempty"`      // Check unit meta flag and normalize it using cc-units | ||||
| 	ChangeUnitPrefix map[string]string           `json:"change_unit_prefix,omitempty"`   // Add prefix that should be applied to the messages | ||||
| 	AddTagsIf        []messageProcessorTagConfig `json:"add_tags_if"`                  // List of tags that are added when the condition is met | ||||
| 	DelTagsIf        []messageProcessorTagConfig `json:"delete_tags_if"`               // List of tags that are removed when the condition is met | ||||
| 	AddMetaIf        []messageProcessorTagConfig `json:"add_meta_if"`                  // List of meta infos that are added when the condition is met | ||||
| 	DelMetaIf        []messageProcessorTagConfig `json:"delete_meta_if"`               // List of meta infos that are removed when the condition is met | ||||
| 	AddFieldIf       []messageProcessorTagConfig `json:"add_field_if"`                 // List of fields that are added when the condition is met | ||||
| 	DelFieldIf       []messageProcessorTagConfig `json:"delete_field_if"`              // List of fields that are removed when the condition is met | ||||
| 	DropByType       []string                    `json:"drop_by_message_type"`         // List of message types that should be dropped | ||||
| 	MoveTagToMeta    []messageProcessorTagConfig `json:"move_tag_to_meta_if"` | ||||
| 	MoveTagToField   []messageProcessorTagConfig `json:"move_tag_to_field_if"` | ||||
| 	MoveMetaToTag    []messageProcessorTagConfig `json:"move_meta_to_tag_if"` | ||||
| 	MoveMetaToField  []messageProcessorTagConfig `json:"move_meta_to_field_if"` | ||||
| 	MoveFieldToTag   []messageProcessorTagConfig `json:"move_field_to_tag_if"` | ||||
| 	MoveFieldToMeta  []messageProcessorTagConfig `json:"move_field_to_meta_if"` | ||||
| 	AddBaseEnv       map[string]interface{}      `json:"add_base_env"` | ||||
| 	AddTagsIf        []messageProcessorTagConfig `json:"add_tags_if,omitempty"`          // List of tags that are added when the condition is met | ||||
| 	DelTagsIf        []messageProcessorTagConfig `json:"delete_tags_if,omitempty"`       // List of tags that are removed when the condition is met | ||||
| 	AddMetaIf        []messageProcessorTagConfig `json:"add_meta_if,omitempty"`          // List of meta infos that are added when the condition is met | ||||
| 	DelMetaIf        []messageProcessorTagConfig `json:"delete_meta_if,omitempty"`       // List of meta infos that are removed when the condition is met | ||||
| 	AddFieldIf       []messageProcessorTagConfig `json:"add_field_if,omitempty"`         // List of fields that are added when the condition is met | ||||
| 	DelFieldIf       []messageProcessorTagConfig `json:"delete_field_if,omitempty"`      // List of fields that are removed when the condition is met | ||||
| 	DropByType       []string                    `json:"drop_by_message_type,omitempty"` // List of message types that should be dropped | ||||
| 	MoveTagToMeta    []messageProcessorTagConfig `json:"move_tag_to_meta_if,omitempty"` | ||||
| 	MoveTagToField   []messageProcessorTagConfig `json:"move_tag_to_field_if,omitempty"` | ||||
| 	MoveMetaToTag    []messageProcessorTagConfig `json:"move_meta_to_tag_if,omitempty"` | ||||
| 	MoveMetaToField  []messageProcessorTagConfig `json:"move_meta_to_field_if,omitempty"` | ||||
| 	MoveFieldToTag   []messageProcessorTagConfig `json:"move_field_to_tag_if,omitempty"` | ||||
| 	MoveFieldToMeta  []messageProcessorTagConfig `json:"move_field_to_meta_if,omitempty"` | ||||
| 	AddBaseEnv       map[string]interface{}      `json:"add_base_env,omitempty"` | ||||
| } | ||||
|  | ||||
| type messageProcessor struct { | ||||
|   | ||||
| @@ -13,7 +13,6 @@ import ( | ||||
| 	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" | ||||
| 	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" | ||||
| 	influx "github.com/influxdata/line-protocol/v2/lineprotocol" | ||||
| ) | ||||
|  | ||||
| const HTTP_RECEIVER_PORT = "8080" | ||||
| @@ -151,80 +150,22 @@ func (r *HttpReceiver) ServerHttp(w http.ResponseWriter, req *http.Request) { | ||||
| 		} | ||||
| 	} | ||||
| 	if r.sink != nil { | ||||
| 		d := influx.NewDecoder(req.Body) | ||||
| 		for d.Next() { | ||||
|  | ||||
| 			// Decode measurement name | ||||
| 			measurement, err := d.Measurement() | ||||
| 		buf := make([]byte, 0, req.ContentLength) | ||||
| 		len, err := req.Body.Read(buf) | ||||
| 		if err == nil && len > 0 { | ||||
| 			messages, err := lp.FromBytes(buf) | ||||
| 			if err != nil { | ||||
| 				msg := "ServerHttp: Failed to decode measurement: " + err.Error() | ||||
| 				msg := "ServerHttp: Failed to decode messages: " + err.Error() | ||||
| 				cclog.ComponentError(r.name, msg) | ||||
| 				http.Error(w, msg, http.StatusInternalServerError) | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			// Decode tags | ||||
| 			tags := make(map[string]string) | ||||
| 			for { | ||||
| 				key, value, err := d.NextTag() | ||||
| 				if err != nil { | ||||
| 					msg := "ServerHttp: Failed to decode tag: " + err.Error() | ||||
| 					cclog.ComponentError(r.name, msg) | ||||
| 					http.Error(w, msg, http.StatusInternalServerError) | ||||
| 					return | ||||
| 				} | ||||
| 				if key == nil { | ||||
| 					break | ||||
| 				} | ||||
| 				tags[string(key)] = string(value) | ||||
| 			} | ||||
|  | ||||
| 			// Decode fields | ||||
| 			fields := make(map[string]interface{}) | ||||
| 			for { | ||||
| 				key, value, err := d.NextField() | ||||
| 				if err != nil { | ||||
| 					msg := "ServerHttp: Failed to decode field: " + err.Error() | ||||
| 					cclog.ComponentError(r.name, msg) | ||||
| 					http.Error(w, msg, http.StatusInternalServerError) | ||||
| 					return | ||||
| 				} | ||||
| 				if key == nil { | ||||
| 					break | ||||
| 				} | ||||
| 				fields[string(key)] = value.Interface() | ||||
| 			} | ||||
|  | ||||
| 			// Decode time stamp | ||||
| 			t, err := d.Time(influx.Nanosecond, time.Time{}) | ||||
| 			if err != nil { | ||||
| 				msg := "ServerHttp: Failed to decode time stamp: " + err.Error() | ||||
| 				cclog.ComponentError(r.name, msg) | ||||
| 				http.Error(w, msg, http.StatusInternalServerError) | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			y, _ := lp.NewMessage( | ||||
| 				string(measurement), | ||||
| 				tags, | ||||
| 				nil, | ||||
| 				fields, | ||||
| 				t, | ||||
| 			) | ||||
|  | ||||
| 			for _, y := range messages { | ||||
| 				m, err := r.mp.ProcessMessage(y) | ||||
| 				if err == nil && m != nil { | ||||
| 					r.sink <- m | ||||
| 				} | ||||
|  | ||||
| 			} | ||||
| 		// Check for IO errors | ||||
| 		err := d.Err() | ||||
| 		if err != nil { | ||||
| 			msg := "ServerHttp: Failed to decode: " + err.Error() | ||||
| 			cclog.ComponentError(r.name, msg) | ||||
| 			http.Error(w, msg, http.StatusInternalServerError) | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
|   | ||||
| @@ -5,12 +5,10 @@ import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"os" | ||||
| 	"time" | ||||
|  | ||||
| 	lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" | ||||
| 	mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" | ||||
| 	influx "github.com/influxdata/line-protocol/v2/lineprotocol" | ||||
| 	nats "github.com/nats-io/nats.go" | ||||
| ) | ||||
|  | ||||
| @@ -42,65 +40,14 @@ func (r *NatsReceiver) Start() { | ||||
| func (r *NatsReceiver) _NatsReceive(m *nats.Msg) { | ||||
|  | ||||
| 	if r.sink != nil { | ||||
| 		d := influx.NewDecoderWithBytes(m.Data) | ||||
| 		for d.Next() { | ||||
|  | ||||
| 			// Decode measurement name | ||||
| 			measurement, err := d.Measurement() | ||||
| 		messages, err := lp.FromBytes(m.Data) | ||||
| 		if err != nil { | ||||
| 				msg := "_NatsReceive: Failed to decode measurement: " + err.Error() | ||||
| 			msg := "_NatsReceive: Failed to decode messages: " + err.Error() | ||||
| 			cclog.ComponentError(r.name, msg) | ||||
| 				return | ||||
| 		} | ||||
|  | ||||
| 			// Decode tags | ||||
| 			tags := make(map[string]string) | ||||
| 			for { | ||||
| 				key, value, err := d.NextTag() | ||||
| 				if err != nil { | ||||
| 					msg := "_NatsReceive: Failed to decode tag: " + err.Error() | ||||
| 					cclog.ComponentError(r.name, msg) | ||||
| 					return | ||||
| 				} | ||||
| 				if key == nil { | ||||
| 					break | ||||
| 				} | ||||
| 				tags[string(key)] = string(value) | ||||
| 			} | ||||
|  | ||||
| 			// Decode fields | ||||
| 			fields := make(map[string]interface{}) | ||||
| 			for { | ||||
| 				key, value, err := d.NextField() | ||||
| 				if err != nil { | ||||
| 					msg := "_NatsReceive: Failed to decode field: " + err.Error() | ||||
| 					cclog.ComponentError(r.name, msg) | ||||
| 					return | ||||
| 				} | ||||
| 				if key == nil { | ||||
| 					break | ||||
| 				} | ||||
| 				fields[string(key)] = value.Interface() | ||||
| 			} | ||||
|  | ||||
| 			// Decode time stamp | ||||
| 			t, err := d.Time(influx.Nanosecond, time.Time{}) | ||||
| 			if err != nil { | ||||
| 				msg := "_NatsReceive: Failed to decode time: " + err.Error() | ||||
| 				cclog.ComponentError(r.name, msg) | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			y, _ := lp.NewMessage( | ||||
| 				string(measurement), | ||||
| 				tags, | ||||
| 				nil, | ||||
| 				fields, | ||||
| 				t, | ||||
| 			) | ||||
|  | ||||
| 		for _, y := range messages { | ||||
| 			m, err := r.mp.ProcessMessage(y) | ||||
| 			if err == nil && m != nil { | ||||
| 			if err == nil && m != nil && r.sink != nil { | ||||
| 				r.sink <- m | ||||
| 			} | ||||
| 		} | ||||
|   | ||||
							
								
								
									
										175
									
								
								scripts/generate_docs.sh
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										175
									
								
								scripts/generate_docs.sh
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,175 @@ | ||||
| #!/bin/bash -l | ||||
|  | ||||
| SRCDIR="$(pwd)" | ||||
| DESTDIR="$1" | ||||
|  | ||||
| if [ -z "$DESTDIR" ]; then | ||||
|     echo "Destination folder not provided" | ||||
|     exit 1 | ||||
| fi | ||||
|  | ||||
|  | ||||
| COLLECTORS=$(find "${SRCDIR}/collectors" -name "*Metric.md") | ||||
| SINKS=$(find "${SRCDIR}/sinks"  -name "*Sink.md") | ||||
| RECEIVERS=$(find "${SRCDIR}/receivers"  -name "*Receiver.md") | ||||
|  | ||||
|  | ||||
|  | ||||
| # Collectors | ||||
| mkdir -p "${DESTDIR}/collectors" | ||||
| for F in $COLLECTORS; do | ||||
|     echo "$F" | ||||
|     FNAME=$(basename "$F") | ||||
|     TITLE=$(grep -E "^##" "$F" | head -n 1 | sed -e 's+## ++g') | ||||
|     echo "'${TITLE//\`/}'" | ||||
|     if [ "${TITLE}" == "" ]; then continue; fi | ||||
|     rm --force "${DESTDIR}/collectors/${FNAME}" | ||||
|     cat << EOF >> "${DESTDIR}/collectors/${FNAME}" | ||||
| --- | ||||
| title: ${TITLE//\`/} | ||||
| description: > | ||||
|   Toplevel ${FNAME/.md/} | ||||
| categories: [cc-metric-collector] | ||||
| tags: [cc-metric-collector, Collector, ${FNAME/Metric.md/}] | ||||
| weight: 2 | ||||
| --- | ||||
|  | ||||
| EOF | ||||
|     cat "$F" >> "${DESTDIR}/collectors/${FNAME}" | ||||
| done | ||||
|  | ||||
| if [ -e "${SRCDIR}/collectors/README.md" ]; then | ||||
|     cat << EOF > "${DESTDIR}/collectors/_index.md" | ||||
| --- | ||||
| title: cc-metric-collector's collectors | ||||
| description: Documentation of cc-metric-collector's collectors | ||||
| categories: [cc-metric-collector] | ||||
| tags: [cc-metric-collector, Collector, General] | ||||
| weight: 40 | ||||
| --- | ||||
|  | ||||
| EOF | ||||
|     cat "${SRCDIR}/collectors/README.md" >> "${DESTDIR}/collectors/_index.md" | ||||
| fi | ||||
|  | ||||
| # Sinks | ||||
| mkdir -p "${DESTDIR}/sinks" | ||||
| for F in $SINKS; do | ||||
|     echo "$F" | ||||
|     FNAME=$(basename "$F") | ||||
|     TITLE=$(grep -E "^##" "$F" | head -n 1 | sed -e 's+## ++g') | ||||
|     echo "'${TITLE//\`/}'" | ||||
|     if [ "${TITLE}" == "" ]; then continue; fi | ||||
|     rm --force "${DESTDIR}/sinks/${FNAME}" | ||||
|     cat << EOF >> "${DESTDIR}/sinks/${FNAME}" | ||||
| --- | ||||
| title: ${TITLE//\`/} | ||||
| description: > | ||||
|   Toplevel ${FNAME/.md/} | ||||
| categories: [cc-metric-collector] | ||||
| tags: [cc-metric-collector, Sink, ${FNAME/Sink.md/}] | ||||
| weight: 2 | ||||
| --- | ||||
|  | ||||
| EOF | ||||
|     cat "$F" >> "${DESTDIR}/sinks/${FNAME}" | ||||
| done | ||||
|  | ||||
| if [ -e "${SRCDIR}/collectors/README.md" ]; then | ||||
|     cat << EOF > "${DESTDIR}/sinks/_index.md" | ||||
| --- | ||||
| title: cc-metric-collector's sinks | ||||
| description: Documentation of cc-metric-collector's sinks | ||||
| categories: [cc-metric-collector] | ||||
| tags: [cc-metric-collector, Sink, General] | ||||
| weight: 40 | ||||
| --- | ||||
|  | ||||
| EOF | ||||
|     cat "${SRCDIR}/sinks/README.md" >> "${DESTDIR}/sinks/_index.md" | ||||
| fi | ||||
|  | ||||
|  | ||||
| # Receivers | ||||
| mkdir -p "${DESTDIR}/receivers" | ||||
| for F in $RECEIVERS; do | ||||
|     echo "$F" | ||||
|     FNAME=$(basename "$F") | ||||
|     TITLE=$(grep -E "^##" "$F" | head -n 1 | sed -e 's+## ++g') | ||||
|     echo "'${TITLE//\`/}'" | ||||
|     if [ "${TITLE}" == "" ]; then continue; fi | ||||
|     rm --force "${DESTDIR}/receivers/${FNAME}" | ||||
|     cat << EOF >> "${DESTDIR}/receivers/${FNAME}" | ||||
| --- | ||||
| title: ${TITLE//\`/} | ||||
| description: > | ||||
|   Toplevel ${FNAME/.md/} | ||||
| categories: [cc-metric-collector] | ||||
| tags: [cc-metric-collector, Receiver, ${FNAME/Receiver.md/}] | ||||
| weight: 2 | ||||
| --- | ||||
|  | ||||
| EOF | ||||
|     cat "$F" >> "${DESTDIR}/receivers/${FNAME}" | ||||
| done | ||||
|  | ||||
| if [ -e "${SRCDIR}/receivers/README.md" ]; then | ||||
|     cat << EOF > "${DESTDIR}/receivers/_index.md" | ||||
| --- | ||||
| title: cc-metric-collector's receivers | ||||
| description: Documentation of cc-metric-collector's receivers | ||||
| categories: [cc-metric-collector] | ||||
| tags: [cc-metric-collector, Receiver, General] | ||||
| weight: 40 | ||||
| --- | ||||
|  | ||||
| EOF | ||||
|     cat "${SRCDIR}/receivers/README.md" >> "${DESTDIR}/receivers/_index.md" | ||||
| fi | ||||
|  | ||||
| mkdir -p "${DESTDIR}/internal/metricRouter" | ||||
| if [ -e "${SRCDIR}/internal/metricRouter/README.md" ]; then | ||||
|     cat << EOF > "${DESTDIR}/internal/metricRouter/_index.md" | ||||
| --- | ||||
| title: cc-metric-collector's router | ||||
| description: Documentation of cc-metric-collector's router | ||||
| categories: [cc-metric-collector] | ||||
| tags: [cc-metric-collector, Router, General] | ||||
| weight: 40 | ||||
| --- | ||||
|  | ||||
| EOF | ||||
|     cat "${SRCDIR}/internal/metricRouter/README.md" >> "${DESTDIR}/internal/metricRouter/_index.md" | ||||
| fi | ||||
|  | ||||
| if [ -e "${SRCDIR}/README.md" ]; then | ||||
|     cat << EOF > "${DESTDIR}/_index.md" | ||||
| --- | ||||
| title: cc-metric-collector | ||||
| description: Documentation of cc-metric-collector | ||||
| categories: [cc-metric-collector] | ||||
| tags: [cc-metric-collector, General] | ||||
| weight: 40 | ||||
| --- | ||||
|  | ||||
| EOF | ||||
|     cat "${SRCDIR}/README.md" >> "${DESTDIR}/_index.md" | ||||
|     sed -i -e 's+README.md+_index.md+g' "${DESTDIR}/_index.md" | ||||
| fi | ||||
|  | ||||
|  | ||||
| mkdir -p "${DESTDIR}/pkg/messageProcessor" | ||||
| if [ -e "${SRCDIR}/pkg/messageProcessor/README.md" ]; then | ||||
|     cat << EOF > "${DESTDIR}/pkg/messageProcessor/_index.md" | ||||
| --- | ||||
| title: cc-metric-collector's message processor | ||||
| description: Documentation of cc-metric-collector's message processor | ||||
| categories: [cc-metric-collector] | ||||
| tags: [cc-metric-collector, Message Processor] | ||||
| weight: 40 | ||||
| --- | ||||
|  | ||||
| EOF | ||||
|     cat "${SRCDIR}/pkg/messageProcessor/README.md" >> "${DESTDIR}/pkg/messageProcessor/_index.md" | ||||
| fi | ||||
|  | ||||
		Reference in New Issue
	
	Block a user