Merge develop branch into main (#106)

* Add cpu_used (all-cpu_idle) to CpustatCollector

* Update to line-protocol/v2

* Update runonce.yml with Golang 1.20

* Update fsnotify in LIKWID Collector

* Use not a pointer to line-protocol.Encoder

* Simplify Makefile

* Use only as many arguments as required

* Allow sum function to handle non float types

* Allow values to be a slice of type float64, float32, int, int64, int32, bool

* Use generic function to simplify code

* Add missing case for type []int32

* Use generic function to compute minimum

* Use generic function to compute maximum

* Use generic function to compute average

* Add error value to sumAnyType

* Use generic function to compute median

* For older versions of go slices is not part of the installation

* Remove old entries from go.sum

* Use simpler sort function

* Compute metrics ib_total and ib_total_pkts

* Add aggregated metrics.
Add missing units

* Update likwidMetric.go

Fixes a potential bug when `fsnotify.NewWatcher()` fails with an error

* Completly avoid memory allocations in infinibandMetric read()

* Fixed initialization: Initalization and measurements should run in the same thread

---------

Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>
This commit is contained in:
Thomas Gruber
2023-08-29 14:12:49 +02:00
committed by GitHub
parent 3d7bb4cdd7
commit 195d0794b0
17 changed files with 746 additions and 839 deletions

View File

@@ -3,12 +3,11 @@ package metricAggregator
import (
"errors"
"fmt"
"math"
"regexp"
"sort"
"strings"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
"golang.org/x/exp/slices"
topo "github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
)
@@ -16,149 +15,155 @@ import (
* Arithmetic functions on value arrays
*/
// Sum up values
func sumfunc(args ...interface{}) (interface{}, error) {
s := 0.0
values, ok := args[0].([]float64)
if ok {
cclog.ComponentDebug("MetricCache", "SUM FUNC START")
for _, x := range values {
s += x
}
cclog.ComponentDebug("MetricCache", "SUM FUNC END", s)
} else {
cclog.ComponentDebug("MetricCache", "SUM FUNC CAST FAILED")
func sumAnyType[T float64 | float32 | int | int32 | int64](values []T) (T, error) {
if len(values) == 0 {
return 0.0, errors.New("sum function requires at least one argument")
}
return s, nil
var sum T
for _, value := range values {
sum += value
}
return sum, nil
}
// Get the minimum value
func minfunc(args ...interface{}) (interface{}, error) {
var err error = nil
switch values := args[0].(type) {
// Sum up values
func sumfunc(args interface{}) (interface{}, error) {
var err error
switch values := args.(type) {
case []float64:
var s float64 = math.MaxFloat64
for _, x := range values {
if x < s {
s = x
}
}
return s, nil
return sumAnyType(values)
case []float32:
var s float32 = math.MaxFloat32
for _, x := range values {
if x < s {
s = x
}
}
return s, nil
return sumAnyType(values)
case []int:
var s int = int(math.MaxInt32)
for _, x := range values {
if x < s {
s = x
}
}
return s, nil
return sumAnyType(values)
case []int64:
var s int64 = math.MaxInt64
for _, x := range values {
if x < s {
s = x
}
}
return s, nil
return sumAnyType(values)
case []int32:
var s int32 = math.MaxInt32
for _, x := range values {
if x < s {
s = x
}
}
return s, nil
return sumAnyType(values)
default:
err = errors.New("function 'min' only on list of values (float64, float32, int, int32, int64)")
err = errors.New("function 'sum' only on list of values (float64, float32, int, int32, int64)")
}
return 0.0, err
}
// Get the average or mean value
func avgfunc(args ...interface{}) (interface{}, error) {
switch values := args[0].(type) {
case []float64:
var s float64 = 0
for _, x := range values {
s += x
}
return s / float64(len(values)), nil
case []float32:
var s float32 = 0
for _, x := range values {
s += x
}
return s / float32(len(values)), nil
case []int:
var s int = 0
for _, x := range values {
s += x
}
return s / len(values), nil
case []int64:
var s int64 = 0
for _, x := range values {
s += x
}
return s / int64(len(values)), nil
func minAnyType[T float64 | float32 | int | int32 | int64](values []T) (T, error) {
if len(values) == 0 {
return 0.0, errors.New("min function requires at least one argument")
}
return 0.0, nil
return slices.Min(values), nil
}
// Get the minimum value
func minfunc(args interface{}) (interface{}, error) {
switch values := args.(type) {
case []float64:
return minAnyType(values)
case []float32:
return minAnyType(values)
case []int:
return minAnyType(values)
case []int64:
return minAnyType(values)
case []int32:
return minAnyType(values)
default:
return 0.0, errors.New("function 'min' only on list of values (float64, float32, int, int32, int64)")
}
}
func avgAnyType[T float64 | float32 | int | int32 | int64](values []T) (float64, error) {
if len(values) == 0 {
return 0.0, errors.New("average function requires at least one argument")
}
sum, err := sumAnyType[T](values)
return float64(sum) / float64(len(values)), err
}
// Get the average or mean value
func avgfunc(args interface{}) (interface{}, error) {
switch values := args.(type) {
case []float64:
return avgAnyType(values)
case []float32:
return avgAnyType(values)
case []int:
return avgAnyType(values)
case []int64:
return avgAnyType(values)
case []int32:
return avgAnyType(values)
default:
return 0.0, errors.New("function 'average' only on list of values (float64, float32, int, int32, int64)")
}
}
func maxAnyType[T float64 | float32 | int | int32 | int64](values []T) (T, error) {
if len(values) == 0 {
return 0.0, errors.New("max function requires at least one argument")
}
return slices.Max(values), nil
}
// Get the maximum value
func maxfunc(args ...interface{}) (interface{}, error) {
s := 0.0
values, ok := args[0].([]float64)
if ok {
for _, x := range values {
if x > s {
s = x
}
}
func maxfunc(args interface{}) (interface{}, error) {
switch values := args.(type) {
case []float64:
return maxAnyType(values)
case []float32:
return maxAnyType(values)
case []int:
return maxAnyType(values)
case []int64:
return maxAnyType(values)
case []int32:
return maxAnyType(values)
default:
return 0.0, errors.New("function 'max' only on list of values (float64, float32, int, int32, int64)")
}
return s, nil
}
func medianAnyType[T float64 | float32 | int | int32 | int64](values []T) (T, error) {
if len(values) == 0 {
return 0.0, errors.New("median function requires at least one argument")
}
slices.Sort(values)
var median T
if midPoint := len(values) % 2; midPoint == 0 {
median = (values[midPoint-1] + values[midPoint]) / 2
} else {
median = values[midPoint]
}
return median, nil
}
// Get the median value
func medianfunc(args ...interface{}) (interface{}, error) {
switch values := args[0].(type) {
func medianfunc(args interface{}) (interface{}, error) {
switch values := args.(type) {
case []float64:
sort.Float64s(values)
return values[len(values)/2], nil
// case []float32:
// sort.Float64s(values)
// return values[len(values)/2], nil
return medianAnyType(values)
case []float32:
return medianAnyType(values)
case []int:
sort.Ints(values)
return values[len(values)/2], nil
// case []int64:
// sort.Ints(values)
// return values[len(values)/2], nil
// case []int32:
// sort.Ints(values)
// return values[len(values)/2], nil
return medianAnyType(values)
case []int64:
return medianAnyType(values)
case []int32:
return medianAnyType(values)
default:
return 0.0, errors.New("function 'median' only on list of values (float64, float32, int, int32, int64)")
}
return 0.0, errors.New("function 'median()' only on lists of type float64 and int")
}
/*
* Get number of values in list. Returns always an int
*/
func lenfunc(args ...interface{}) (interface{}, error) {
func lenfunc(args interface{}) (interface{}, error) {
var err error = nil
var length int = 0
switch values := args[0].(type) {
switch values := args.(type) {
case []float64:
length = len(values)
case []float32:
@@ -243,8 +248,8 @@ func matchfunc(args ...interface{}) (interface{}, error) {
*/
// for a given cpuid, it returns the core id
func getCpuCoreFunc(args ...interface{}) (interface{}, error) {
switch cpuid := args[0].(type) {
func getCpuCoreFunc(args interface{}) (interface{}, error) {
switch cpuid := args.(type) {
case int:
return topo.GetHwthreadCore(cpuid), nil
}
@@ -252,8 +257,8 @@ func getCpuCoreFunc(args ...interface{}) (interface{}, error) {
}
// for a given cpuid, it returns the socket id
func getCpuSocketFunc(args ...interface{}) (interface{}, error) {
switch cpuid := args[0].(type) {
func getCpuSocketFunc(args interface{}) (interface{}, error) {
switch cpuid := args.(type) {
case int:
return topo.GetHwthreadSocket(cpuid), nil
}
@@ -261,8 +266,8 @@ func getCpuSocketFunc(args ...interface{}) (interface{}, error) {
}
// for a given cpuid, it returns the id of the NUMA node
func getCpuNumaDomainFunc(args ...interface{}) (interface{}, error) {
switch cpuid := args[0].(type) {
func getCpuNumaDomainFunc(args interface{}) (interface{}, error) {
switch cpuid := args.(type) {
case int:
return topo.GetHwthreadNumaDomain(cpuid), nil
}
@@ -270,8 +275,8 @@ func getCpuNumaDomainFunc(args ...interface{}) (interface{}, error) {
}
// for a given cpuid, it returns the id of the CPU die
func getCpuDieFunc(args ...interface{}) (interface{}, error) {
switch cpuid := args[0].(type) {
func getCpuDieFunc(args interface{}) (interface{}, error) {
switch cpuid := args.(type) {
case int:
return topo.GetHwthreadDie(cpuid), nil
}
@@ -279,9 +284,9 @@ func getCpuDieFunc(args ...interface{}) (interface{}, error) {
}
// for a given core id, it returns the list of cpuids
func getCpuListOfCoreFunc(args ...interface{}) (interface{}, error) {
func getCpuListOfCoreFunc(args interface{}) (interface{}, error) {
cpulist := make([]int, 0)
switch in := args[0].(type) {
switch in := args.(type) {
case int:
for _, c := range topo.CpuData() {
if c.Core == in {
@@ -293,9 +298,9 @@ func getCpuListOfCoreFunc(args ...interface{}) (interface{}, error) {
}
// for a given socket id, it returns the list of cpuids
func getCpuListOfSocketFunc(args ...interface{}) (interface{}, error) {
func getCpuListOfSocketFunc(args interface{}) (interface{}, error) {
cpulist := make([]int, 0)
switch in := args[0].(type) {
switch in := args.(type) {
case int:
for _, c := range topo.CpuData() {
if c.Socket == in {
@@ -307,9 +312,9 @@ func getCpuListOfSocketFunc(args ...interface{}) (interface{}, error) {
}
// for a given id of a NUMA domain, it returns the list of cpuids
func getCpuListOfNumaDomainFunc(args ...interface{}) (interface{}, error) {
func getCpuListOfNumaDomainFunc(args interface{}) (interface{}, error) {
cpulist := make([]int, 0)
switch in := args[0].(type) {
switch in := args.(type) {
case int:
for _, c := range topo.CpuData() {
if c.Numadomain == in {
@@ -321,9 +326,9 @@ func getCpuListOfNumaDomainFunc(args ...interface{}) (interface{}, error) {
}
// for a given CPU die id, it returns the list of cpuids
func getCpuListOfDieFunc(args ...interface{}) (interface{}, error) {
func getCpuListOfDieFunc(args interface{}) (interface{}, error) {
cpulist := make([]int, 0)
switch in := args[0].(type) {
switch in := args.(type) {
case int:
for _, c := range topo.CpuData() {
if c.Die == in {
@@ -335,7 +340,7 @@ func getCpuListOfDieFunc(args ...interface{}) (interface{}, error) {
}
// wrapper function to get a list of all cpuids of the node
func getCpuListOfNode(args ...interface{}) (interface{}, error) {
func getCpuListOfNode() (interface{}, error) {
return topo.HwthreadList(), nil
}