From 1f55aa247faa1eb03d84fe4c340042cea880cfb2 Mon Sep 17 00:00:00 2001 From: Thomas Gruber Date: Mon, 31 Jan 2022 13:29:14 +0100 Subject: [PATCH 01/12] Run rpmbuild workflow only for new tags --- .github/workflows/rpmbuild.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/rpmbuild.yml b/.github/workflows/rpmbuild.yml index a7aee22..8d16e37 100644 --- a/.github/workflows/rpmbuild.yml +++ b/.github/workflows/rpmbuild.yml @@ -1,5 +1,8 @@ name: Run RPM Build -on: push +on: + push: + tags: + - '**' jobs: build-centos8: From fd3c7ed573739222182d7205b952869c8efd4a5b Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Mon, 31 Jan 2022 14:02:00 +0100 Subject: [PATCH 02/12] Add documentation --- internal/ccMetric/ccMetric.go | 85 +++++++++++++++++++++++++++-------- 1 file changed, 66 insertions(+), 19 deletions(-) diff --git a/internal/ccMetric/ccMetric.go b/internal/ccMetric/ccMetric.go index 05f81ff..ae7ab1b 100644 --- a/internal/ccMetric/ccMetric.go +++ b/internal/ccMetric/ccMetric.go @@ -11,27 +11,30 @@ import ( // Most functions are derived from github.com/influxdata/line-protocol/metric.go // The metric type is extended with an extra meta information list re-using the Tag // type. - +// +// See: https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/ type ccMetric struct { - name string - tags []*lp.Tag - fields []*lp.Field - tm time.Time - meta []*lp.Tag + name string // Measurement name + tags []*lp.Tag // ordered list of of tags + fields []*lp.Field // unordered list of of fields + tm time.Time // timestamp + meta []*lp.Tag // odered list of meta data tags } +// ccmetric access functions type CCMetric interface { - lp.MutableMetric - AddMeta(key, value string) - MetaList() []*lp.Tag - RemoveTag(key string) - GetTag(key string) (string, bool) - GetMeta(key string) (string, bool) - GetField(key string) (interface{}, bool) - HasField(key string) bool - RemoveField(key string) + lp.MutableMetric // SetTime, AddTag, AddField + AddMeta(key, value string) // Add a meta data tag + MetaList() []*lp.Tag // Returns the meta data list + RemoveTag(key string) // Remove a tag addressed by its key + GetTag(key string) (string, bool) // Get a tag addressed by its key + GetMeta(key string) (string, bool) // Get a meta data tab addressed by its key + GetField(key string) (interface{}, bool) // Get a field addressed by its key + HasField(key string) bool // Check if a field key is present + RemoveField(key string) // Remove a field addressed by its key } +// Meta returns the list of meta data tags as key-value mapping func (m *ccMetric) Meta() map[string]string { meta := make(map[string]string, len(m.meta)) for _, m := range m.meta { @@ -40,18 +43,22 @@ func (m *ccMetric) Meta() map[string]string { return meta } +// MetaList returns the list of meta data tags func (m *ccMetric) MetaList() []*lp.Tag { return m.meta } +// String implements the stringer interface for data type ccMetric func (m *ccMetric) String() string { return fmt.Sprintf("%s %v %v %v %d", m.name, m.Tags(), m.Meta(), m.Fields(), m.tm.UnixNano()) } +// Name returns the metric name func (m *ccMetric) Name() string { return m.name } +// Tags returns the the list of tags as key-value-mapping func (m *ccMetric) Tags() map[string]string { tags := make(map[string]string, len(m.tags)) for _, tag := range m.tags { @@ -60,10 +67,12 @@ func (m *ccMetric) Tags() map[string]string { return tags } +// TagList returns the list of tags func (m *ccMetric) TagList() []*lp.Tag { return m.tags } +// Fields returns the list of fields as key-value-mapping func (m *ccMetric) Fields() map[string]interface{} { fields := make(map[string]interface{}, len(m.fields)) for _, field := range m.fields { @@ -73,18 +82,22 @@ func (m *ccMetric) Fields() map[string]interface{} { return fields } +// FieldList returns the list of fields func (m *ccMetric) FieldList() []*lp.Field { return m.fields } +// Time returns timestamp func (m *ccMetric) Time() time.Time { return m.tm } +// SetTime sets the timestamp func (m *ccMetric) SetTime(t time.Time) { m.tm = t } +// HasTag checks if a tag with key equal to is present in the list of tags func (m *ccMetric) HasTag(key string) bool { for _, tag := range m.tags { if tag.Key == key { @@ -94,6 +107,7 @@ func (m *ccMetric) HasTag(key string) bool { return false } +// GetTag returns the tag with tag's key equal to func (m *ccMetric) GetTag(key string) (string, bool) { for _, tag := range m.tags { if tag.Key == key { @@ -103,6 +117,8 @@ func (m *ccMetric) GetTag(key string) (string, bool) { return "", false } +// RemoveTag removes the tag with tag's key equal to +// and keeps the tag list ordered by the keys func (m *ccMetric) RemoveTag(key string) { for i, tag := range m.tags { if tag.Key == key { @@ -114,6 +130,8 @@ func (m *ccMetric) RemoveTag(key string) { } } +// AddTag adds a tag (consisting of key and value) +// and keeps the tag list ordered by the keys func (m *ccMetric) AddTag(key, value string) { for i, tag := range m.tags { if key > tag.Key { @@ -134,6 +152,7 @@ func (m *ccMetric) AddTag(key, value string) { m.tags = append(m.tags, &lp.Tag{Key: key, Value: value}) } +// HasTag checks if a meta data tag with meta data's key equal to is present in the list of meta data tags func (m *ccMetric) HasMeta(key string) bool { for _, tag := range m.meta { if tag.Key == key { @@ -143,6 +162,7 @@ func (m *ccMetric) HasMeta(key string) bool { return false } +// GetMeta returns the meta data tag with meta data's key equal to func (m *ccMetric) GetMeta(key string) (string, bool) { for _, tag := range m.meta { if tag.Key == key { @@ -152,6 +172,8 @@ func (m *ccMetric) GetMeta(key string) (string, bool) { return "", false } +// RemoveMeta removes the meta data tag with tag's key equal to +// and keeps the meta data tag list ordered by the keys func (m *ccMetric) RemoveMeta(key string) { for i, tag := range m.meta { if tag.Key == key { @@ -163,6 +185,8 @@ func (m *ccMetric) RemoveMeta(key string) { } } +// AddMeta adds a meta data tag (consisting of key and value) +// and keeps the meta data list ordered by the keys func (m *ccMetric) AddMeta(key, value string) { for i, tag := range m.meta { if key > tag.Key { @@ -183,6 +207,7 @@ func (m *ccMetric) AddMeta(key, value string) { m.meta = append(m.meta, &lp.Tag{Key: key, Value: value}) } +// AddField adds a field (consisting of key and value) to the unordered list of fields func (m *ccMetric) AddField(key string, value interface{}) { for i, field := range m.fields { if key == field.Key { @@ -193,6 +218,7 @@ func (m *ccMetric) AddField(key string, value interface{}) { m.fields = append(m.fields, &lp.Field{Key: key, Value: convertField(value)}) } +// GetField returns the field with field's key equal to func (m *ccMetric) GetField(key string) (interface{}, bool) { for _, field := range m.fields { if field.Key == key { @@ -202,6 +228,7 @@ func (m *ccMetric) GetField(key string) (interface{}, bool) { return "", false } +// HasField checks if a field with field's key equal to is present in the list of fields func (m *ccMetric) HasField(key string) bool { for _, field := range m.fields { if field.Key == key { @@ -211,6 +238,8 @@ func (m *ccMetric) HasField(key string) bool { return false } +// RemoveField removes the field with field's key equal to +// from the unordered list of fields func (m *ccMetric) RemoveField(key string) { for i, field := range m.fields { if field.Key == key { @@ -222,6 +251,7 @@ func (m *ccMetric) RemoveField(key string) { } } +// New creates a new measurement point func New( name string, tags map[string]string, @@ -237,6 +267,7 @@ func New( meta: nil, } + // Sorted list of tags if len(tags) > 0 { m.tags = make([]*lp.Tag, 0, len(tags)) for k, v := range tags { @@ -246,6 +277,7 @@ func New( sort.Slice(m.tags, func(i, j int) bool { return m.tags[i].Key < m.tags[j].Key }) } + // Sorted list of meta data tags if len(meta) > 0 { m.meta = make([]*lp.Tag, 0, len(meta)) for k, v := range meta { @@ -255,6 +287,7 @@ func New( sort.Slice(m.meta, func(i, j int) bool { return m.meta[i].Key < m.meta[j].Key }) } + // Unsorted list of fields if len(fields) > 0 { m.fields = make([]*lp.Field, 0, len(fields)) for k, v := range fields { @@ -269,6 +302,7 @@ func New( return m, nil } +// FromMetric copies the metric func FromMetric(other CCMetric) CCMetric { m := &ccMetric{ name: other.Name(), @@ -291,6 +325,7 @@ func FromMetric(other CCMetric) CCMetric { return m } +// FromInfluxMetric copies the influxDB line protocol metric func FromInfluxMetric(other lp.Metric) CCMetric { m := &ccMetric{ name: other.Name(), @@ -300,16 +335,28 @@ func FromInfluxMetric(other lp.Metric) CCMetric { tm: other.Time(), } - for i, tag := range other.TagList() { - m.tags[i] = &lp.Tag{Key: tag.Key, Value: tag.Value} + for i, otherTag := range other.TagList() { + m.tags[i] = &lp.Tag{ + Key: otherTag.Key, + Value: otherTag.Value, + } } - for i, field := range other.FieldList() { - m.fields[i] = &lp.Field{Key: field.Key, Value: field.Value} + for i, otherField := range other.FieldList() { + m.fields[i] = &lp.Field{ + Key: otherField.Key, + Value: otherField.Value, + } } return m } +// convertField converts data types of fields by the following schemata: +// *float32, *float64, float32, float64 -> float64 +// *int, *int8, *int16, *int32, *int64, int, int8, int16, int32, int64 -> int64 +// *uint, *uint8, *uint16, *uint32, *uint64, uint, uint8, uint16, uint32, uint64 -> uint64 +// *[]byte, *string, []byte, string -> string +// *bool, bool -> bool func convertField(v interface{}) interface{} { switch v := v.(type) { case float64: From 862630a21888d1fe860f9ec295453ffc3bcd4230 Mon Sep 17 00:00:00 2001 From: Thomas Gruber Date: Mon, 31 Jan 2022 14:42:19 +0100 Subject: [PATCH 03/12] Extend workflow to test Go 1.16 and 1.17 --- .github/workflows/runonce.yml | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/.github/workflows/runonce.yml b/.github/workflows/runonce.yml index 194710f..2a2cc8a 100644 --- a/.github/workflows/runonce.yml +++ b/.github/workflows/runonce.yml @@ -2,7 +2,7 @@ name: Run Test on: push jobs: - build: + build-1-17: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 @@ -18,3 +18,19 @@ jobs: - name: Run MetricCollector run: ./cc-metric-collector --once --config .github/ci-config.json + build-1-16: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + # See: https://github.com/marketplace/actions/setup-go-environment + - name: Setup Golang + uses: actions/setup-go@v2.1.5 + with: + go-version: '^1.16.7' # The version AlmaLinux 8.5 uses + + - name: Build MetricCollector + run: make + + - name: Run MetricCollector + run: ./cc-metric-collector --once --config .github/ci-config.json From 6ff6cb721959d7dd72e11aaa7585a7cb7142f642 Mon Sep 17 00:00:00 2001 From: Thomas Gruber Date: Tue, 1 Feb 2022 14:54:34 +0100 Subject: [PATCH 04/12] Change CCMetric's internal data structure (#22) * package ccmetric rewrite * Create deep copy in New() to avoid access conflicts * Renamed TagMap() -> Tags(), MetaMap() -> Meta Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> --- internal/ccMetric/ccMetric.go | 219 +++++++++----------------- internal/metricRouter/metricRouter.go | 8 +- sinks/gangliaSink.go | 12 +- sinks/influxSink.go | 11 +- 4 files changed, 92 insertions(+), 158 deletions(-) diff --git a/internal/ccMetric/ccMetric.go b/internal/ccMetric/ccMetric.go index ae7ab1b..20b9786 100644 --- a/internal/ccMetric/ccMetric.go +++ b/internal/ccMetric/ccMetric.go @@ -14,62 +14,73 @@ import ( // // See: https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/ type ccMetric struct { - name string // Measurement name - tags []*lp.Tag // ordered list of of tags - fields []*lp.Field // unordered list of of fields - tm time.Time // timestamp - meta []*lp.Tag // odered list of meta data tags + name string // Measurement name + meta map[string]string // map of meta data tags + tags map[string]string // map of of tags + fields []*lp.Field // unordered list of of fields + tm time.Time // timestamp } // ccmetric access functions type CCMetric interface { - lp.MutableMetric // SetTime, AddTag, AddField - AddMeta(key, value string) // Add a meta data tag - MetaList() []*lp.Tag // Returns the meta data list - RemoveTag(key string) // Remove a tag addressed by its key - GetTag(key string) (string, bool) // Get a tag addressed by its key - GetMeta(key string) (string, bool) // Get a meta data tab addressed by its key + lp.Metric // Time(), Name(), TagList(), FieldList() + + SetTime(t time.Time) + + Meta() map[string]string // Map of meta data tags + MetaList() []*lp.Tag // Ordered list of meta data + AddMeta(key, value string) // Add a meta data tag + GetMeta(key string) (string, bool) // Get a meta data tab addressed by its key + + Tags() map[string]string // Map of tags + AddTag(key, value string) // Add a tag + GetTag(key string) (string, bool) // Get a tag by its key + RemoveTag(key string) // Remove a tag by its key + GetField(key string) (interface{}, bool) // Get a field addressed by its key HasField(key string) bool // Check if a field key is present RemoveField(key string) // Remove a field addressed by its key } -// Meta returns the list of meta data tags as key-value mapping +// Meta returns the meta data tags as key-value mapping func (m *ccMetric) Meta() map[string]string { - meta := make(map[string]string, len(m.meta)) - for _, m := range m.meta { - meta[m.Key] = m.Value - } - return meta + return m.meta } -// MetaList returns the list of meta data tags +// MetaList returns the the list of meta data tags as sorted list of key value tags func (m *ccMetric) MetaList() []*lp.Tag { - return m.meta + + ml := make([]*lp.Tag, 0, len(m.meta)) + for key, value := range m.meta { + ml = append(ml, &lp.Tag{Key: key, Value: value}) + } + sort.Slice(ml, func(i, j int) bool { return ml[i].Key < ml[j].Key }) + return ml } // String implements the stringer interface for data type ccMetric func (m *ccMetric) String() string { - return fmt.Sprintf("%s %v %v %v %d", m.name, m.Tags(), m.Meta(), m.Fields(), m.tm.UnixNano()) + return fmt.Sprintf("%s %v %v %v %d", m.name, m.tags, m.meta, m.Fields(), m.tm.UnixNano()) } -// Name returns the metric name +// Name returns the measurement name func (m *ccMetric) Name() string { return m.name } // Tags returns the the list of tags as key-value-mapping func (m *ccMetric) Tags() map[string]string { - tags := make(map[string]string, len(m.tags)) - for _, tag := range m.tags { - tags[tag.Key] = tag.Value - } - return tags + return m.tags } -// TagList returns the list of tags +// TagList returns the the list of tags as sorted list of key value tags func (m *ccMetric) TagList() []*lp.Tag { - return m.tags + tl := make([]*lp.Tag, 0, len(m.tags)) + for key, value := range m.tags { + tl = append(tl, &lp.Tag{Key: key, Value: value}) + } + sort.Slice(tl, func(i, j int) bool { return tl[i].Key < tl[j].Key }) + return tl } // Fields returns the list of fields as key-value-mapping @@ -99,112 +110,50 @@ func (m *ccMetric) SetTime(t time.Time) { // HasTag checks if a tag with key equal to is present in the list of tags func (m *ccMetric) HasTag(key string) bool { - for _, tag := range m.tags { - if tag.Key == key { - return true - } - } - return false + _, ok := m.tags[key] + return ok } // GetTag returns the tag with tag's key equal to func (m *ccMetric) GetTag(key string) (string, bool) { - for _, tag := range m.tags { - if tag.Key == key { - return tag.Value, true - } - } - return "", false + value, ok := m.tags[key] + return value, ok } // RemoveTag removes the tag with tag's key equal to // and keeps the tag list ordered by the keys func (m *ccMetric) RemoveTag(key string) { - for i, tag := range m.tags { - if tag.Key == key { - copy(m.tags[i:], m.tags[i+1:]) - m.tags[len(m.tags)-1] = nil - m.tags = m.tags[:len(m.tags)-1] - return - } - } + delete(m.tags, key) } // AddTag adds a tag (consisting of key and value) // and keeps the tag list ordered by the keys func (m *ccMetric) AddTag(key, value string) { - for i, tag := range m.tags { - if key > tag.Key { - continue - } - - if key == tag.Key { - tag.Value = value - return - } - - m.tags = append(m.tags, nil) - copy(m.tags[i+1:], m.tags[i:]) - m.tags[i] = &lp.Tag{Key: key, Value: value} - return - } - - m.tags = append(m.tags, &lp.Tag{Key: key, Value: value}) + m.tags[key] = value } // HasTag checks if a meta data tag with meta data's key equal to is present in the list of meta data tags func (m *ccMetric) HasMeta(key string) bool { - for _, tag := range m.meta { - if tag.Key == key { - return true - } - } - return false + _, ok := m.meta[key] + return ok } // GetMeta returns the meta data tag with meta data's key equal to func (m *ccMetric) GetMeta(key string) (string, bool) { - for _, tag := range m.meta { - if tag.Key == key { - return tag.Value, true - } - } - return "", false + value, ok := m.meta[key] + return value, ok } // RemoveMeta removes the meta data tag with tag's key equal to // and keeps the meta data tag list ordered by the keys func (m *ccMetric) RemoveMeta(key string) { - for i, tag := range m.meta { - if tag.Key == key { - copy(m.meta[i:], m.meta[i+1:]) - m.meta[len(m.meta)-1] = nil - m.meta = m.meta[:len(m.meta)-1] - return - } - } + delete(m.meta, key) } // AddMeta adds a meta data tag (consisting of key and value) // and keeps the meta data list ordered by the keys func (m *ccMetric) AddMeta(key, value string) { - for i, tag := range m.meta { - if key > tag.Key { - continue - } - - if key == tag.Key { - tag.Value = value - return - } - - m.meta = append(m.meta, nil) - copy(m.meta[i+1:], m.meta[i:]) - m.meta[i] = &lp.Tag{Key: key, Value: value} - return - } - - m.meta = append(m.meta, &lp.Tag{Key: key, Value: value}) + m.meta[key] = value } // AddField adds a field (consisting of key and value) to the unordered list of fields @@ -261,62 +210,49 @@ func New( ) (CCMetric, error) { m := &ccMetric{ name: name, - tags: nil, - fields: nil, + tags: make(map[string]string, len(tags)), + meta: make(map[string]string, len(meta)), + fields: make([]*lp.Field, 0, len(fields)), tm: tm, - meta: nil, } - // Sorted list of tags - if len(tags) > 0 { - m.tags = make([]*lp.Tag, 0, len(tags)) - for k, v := range tags { - m.tags = append(m.tags, - &lp.Tag{Key: k, Value: v}) - } - sort.Slice(m.tags, func(i, j int) bool { return m.tags[i].Key < m.tags[j].Key }) + // deep copy tags + for k, v := range tags { + m.tags[k] = v } - // Sorted list of meta data tags - if len(meta) > 0 { - m.meta = make([]*lp.Tag, 0, len(meta)) - for k, v := range meta { - m.meta = append(m.meta, - &lp.Tag{Key: k, Value: v}) - } - sort.Slice(m.meta, func(i, j int) bool { return m.meta[i].Key < m.meta[j].Key }) + // deep copy meta data tags + for k, v := range meta { + m.meta[k] = v } // Unsorted list of fields - if len(fields) > 0 { - m.fields = make([]*lp.Field, 0, len(fields)) - for k, v := range fields { - v := convertField(v) - if v == nil { - continue - } - m.AddField(k, v) + for k, v := range fields { + v := convertField(v) + if v == nil { + continue } + m.AddField(k, v) } return m, nil } // FromMetric copies the metric -func FromMetric(other CCMetric) CCMetric { +func FromMetric(other ccMetric) CCMetric { m := &ccMetric{ name: other.Name(), - tags: make([]*lp.Tag, len(other.TagList())), + tags: make(map[string]string), fields: make([]*lp.Field, len(other.FieldList())), - meta: make([]*lp.Tag, len(other.MetaList())), + meta: make(map[string]string), tm: other.Time(), } - for i, tag := range other.TagList() { - m.tags[i] = &lp.Tag{Key: tag.Key, Value: tag.Value} + for key, value := range other.Tags() { + m.tags[key] = value } - for i, s := range other.MetaList() { - m.meta[i] = &lp.Tag{Key: s.Key, Value: s.Value} + for key, value := range other.Meta() { + m.meta[key] = value } for i, field := range other.FieldList() { @@ -329,17 +265,14 @@ func FromMetric(other CCMetric) CCMetric { func FromInfluxMetric(other lp.Metric) CCMetric { m := &ccMetric{ name: other.Name(), - tags: make([]*lp.Tag, len(other.TagList())), + tags: make(map[string]string), fields: make([]*lp.Field, len(other.FieldList())), - meta: make([]*lp.Tag, 0), + meta: make(map[string]string), tm: other.Time(), } - for i, otherTag := range other.TagList() { - m.tags[i] = &lp.Tag{ - Key: otherTag.Key, - Value: otherTag.Value, - } + for _, otherTag := range other.TagList() { + m.tags[otherTag.Key] = otherTag.Value } for i, otherField := range other.FieldList() { diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 870af02..83c14e7 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -141,11 +141,11 @@ func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, erro // Add metric name, tags, meta data, fields and timestamp to the parameter list params := make(map[string]interface{}) params["name"] = point.Name() - for _, t := range point.TagList() { - params[t.Key] = t.Value + for key, value := range point.Tags() { + params[key] = value } - for _, m := range point.MetaList() { - params[m.Key] = m.Value + for key, value := range point.Meta() { + params[key] = value } for _, f := range point.FieldList() { params[f.Key] = f.Value diff --git a/sinks/gangliaSink.go b/sinks/gangliaSink.go index 87506a0..3fd48e7 100644 --- a/sinks/gangliaSink.go +++ b/sinks/gangliaSink.go @@ -30,16 +30,16 @@ func (s *GangliaSink) Write(point lp.CCMetric) error { var err error = nil var tagsstr []string var argstr []string - for _, t := range point.TagList() { - switch t.Key { + for key, value := range point.Tags() { + switch key { case "cluster": - argstr = append(argstr, fmt.Sprintf("--cluster=%s", t.Value)) + argstr = append(argstr, fmt.Sprintf("--cluster=%s", value)) case "unit": - argstr = append(argstr, fmt.Sprintf("--units=%s", t.Value)) + argstr = append(argstr, fmt.Sprintf("--units=%s", value)) case "group": - argstr = append(argstr, fmt.Sprintf("--group=%s", t.Value)) + argstr = append(argstr, fmt.Sprintf("--group=%s", value)) default: - tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", t.Key, t.Value)) + tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value)) } } if len(tagsstr) > 0 { diff --git a/sinks/influxSink.go b/sinks/influxSink.go index dca1572..7313490 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -5,10 +5,11 @@ import ( "crypto/tls" "errors" "fmt" + "log" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" - "log" ) type InfluxSink struct { @@ -61,12 +62,12 @@ func (s *InfluxSink) Init(config sinkConfig) error { func (s *InfluxSink) Write(point lp.CCMetric) error { tags := map[string]string{} fields := map[string]interface{}{} - for _, t := range point.TagList() { - tags[t.Key] = t.Value + for key, value := range point.Tags() { + tags[key] = value } if s.meta_as_tags { - for _, m := range point.MetaList() { - tags[m.Key] = m.Value + for key, value := range point.Meta() { + tags[key] = value } } for _, f := range point.FieldList() { From e550226416579b7f6109775890ac4c1027c456d3 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Tue, 1 Feb 2022 16:01:31 +0100 Subject: [PATCH 05/12] Use gval in LikwidCollector --- collectors/likwidMetric.go | 378 ++++++++++++++++++++++++++++--------- 1 file changed, 289 insertions(+), 89 deletions(-) diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index 430a09b..e3be810 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -13,40 +13,59 @@ import ( "errors" "fmt" "io/ioutil" - "log" "math" "os" + "regexp" "strconv" "strings" "time" "unsafe" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - "gopkg.in/Knetic/govaluate.v2" + topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" + "github.com/PaesslerAG/gval" ) -type MetricScope int +type MetricScope string const ( METRIC_SCOPE_HWTHREAD = iota - METRIC_SCOPE_SOCKET + METRIC_SCOPE_CORE + METRIC_SCOPE_LLC METRIC_SCOPE_NUMA + METRIC_SCOPE_DIE + METRIC_SCOPE_SOCKET METRIC_SCOPE_NODE ) func (ms MetricScope) String() string { - return []string{"Head", "Shoulder", "Knee", "Toe"}[ms] + return string(ms) +} + +func (ms MetricScope) Granularity() int { + grans := []string{"hwthread", "core", "llc", "numadomain", "die", "socket", "node"} + for i, g := range grans { + if ms.String() == g { + return i + } + } + return -1 } type LikwidCollectorMetricConfig struct { - Name string `json:"name"` - Calc string `json:"calc"` - Scope MetricScope `json:"socket_scope"` - Publish bool `json:"publish"` + Name string `json:"name"` // Name of the metric + Calc string `json:"calc"` // Calculation for the metric using + Aggr string `json:"aggregation"` // if scope unequal to LIKWID metric scope, the values are combined (sum, min, max, mean or avg, median) + Scope MetricScope `json:"scope"` // scope for calculation. subscopes are aggregated using the 'aggregation' function + Publish bool `json:"publish"` + granulatity MetricScope } type LikwidCollectorEventsetConfig struct { - Events map[string]string `json:"events"` - Metrics []LikwidCollectorMetricConfig `json:"metrics"` + Events map[string]string `json:"events"` + granulatity map[string]MetricScope + Metrics []LikwidCollectorMetricConfig `json:"metrics"` } type LikwidCollectorConfig struct { @@ -67,13 +86,14 @@ type LikwidCollector struct { mresults map[int]map[int]map[string]float64 gmresults map[int]map[string]float64 basefreq float64 + running bool } type LikwidMetric struct { - name string - search string - socket_scope bool - group_idx int + name string + search string + scope MetricScope + group_idx int } func eventsToEventStr(events map[string]string) string { @@ -84,6 +104,21 @@ func eventsToEventStr(events map[string]string) string { return strings.Join(elist, ",") } +func getGranularity(counter, event string) MetricScope { + if strings.HasPrefix(counter, "PMC") || strings.HasPrefix(counter, "FIXC") { + return "hwthread" + } else if strings.Contains(counter, "BOX") || strings.Contains(counter, "DEV") { + return "socket" + } else if strings.HasPrefix(counter, "PWR") { + if event == "RAPL_CORE_ENERGY" { + return "hwthread" + } else { + return "socket" + } + } + return "unknown" +} + func getBaseFreq() float64 { var freq float64 = math.NaN() C.power_init(0) @@ -117,6 +152,53 @@ func getSocketCpus() map[C.int]int { return outmap } +func (m *LikwidCollector) CatchGvalPanic() { + if rerr := recover(); rerr != nil { + cclog.ComponentError(m.name, "Gval failed to calculate a metric", rerr) + m.init = false + } +} + +func (m *LikwidCollector) initGranularity() { + for _, evset := range m.config.Eventsets { + evset.granulatity = make(map[string]MetricScope) + for counter, event := range evset.Events { + gran := getGranularity(counter, event) + if gran.Granularity() >= 0 { + evset.granulatity[counter] = gran + } + } + for i, metric := range evset.Metrics { + s := regexp.MustCompile("[+-/*()]").Split(metric.Calc, -1) + gran := MetricScope("hwthread") + evset.Metrics[i].granulatity = gran + for _, x := range s { + if _, ok := evset.Events[x]; ok { + if evset.granulatity[x].Granularity() > gran.Granularity() { + gran = evset.granulatity[x] + } + } + } + evset.Metrics[i].granulatity = gran + } + } + for i, metric := range m.config.Metrics { + s := regexp.MustCompile("[+-/*()]").Split(metric.Calc, -1) + gran := MetricScope("hwthread") + m.config.Metrics[i].granulatity = gran + for _, x := range s { + for _, evset := range m.config.Eventsets { + for _, m := range evset.Metrics { + if m.Name == x && m.granulatity.Granularity() > gran.Granularity() { + gran = m.granulatity + } + } + } + } + m.config.Metrics[i].granulatity = gran + } +} + func (m *LikwidCollector) Init(config json.RawMessage) error { var ret C.int m.name = "LikwidCollector" @@ -126,38 +208,70 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { return err } } + m.initGranularity() + if m.config.ForceOverwrite { + os.Setenv("LIKWID_FORCE", "1") + } m.setup() - m.meta = map[string]string{"source": m.name, "group": "PerfCounter"} - cpulist := CpuList() - m.cpulist = make([]C.int, len(cpulist)) - slist := getSocketCpus() + // in some cases, gval causes a panic. We catch it with the handler and deactivate + // the collector (m.init = false). + defer m.CatchGvalPanic() + m.meta = map[string]string{"source": m.name, "group": "PerfCounter"} + cpulist := topo.CpuList() + m.cpulist = make([]C.int, len(cpulist)) + + cclog.ComponentDebug(m.name, "Create maps for socket, numa, core and die metrics") m.sock2tid = make(map[int]int) - // m.numa2tid = make(map[int]int) + // m.numa2tid = make(map[int]int) + // m.core2tid = make(map[int]int) + // m.die2tid = make(map[int]int) for i, c := range cpulist { m.cpulist[i] = C.int(c) - if sid, found := slist[m.cpulist[i]]; found { - m.sock2tid[sid] = i - } + m.sock2tid[topo.GetCpuSocket(c)] = i + // m.numa2tid[topo.GetCpuNumaDomain(c)] = i + // m.core2tid[topo.GetCpuCore(c)] = i + // m.die2tid[topo.GetCpuDie(c)] = i } m.results = make(map[int]map[int]map[string]interface{}) m.mresults = make(map[int]map[int]map[string]float64) m.gmresults = make(map[int]map[string]float64) ret = C.topology_init() if ret != 0 { - return errors.New("Failed to initialize LIKWID topology") - } - if m.config.ForceOverwrite { - os.Setenv("LIKWID_FORCE", "1") + err := errors.New("failed to initialize LIKWID topology") + cclog.ComponentError(m.name, err.Error()) + return err } ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0]) if ret != 0 { C.topology_finalize() - return errors.New("Failed to initialize LIKWID topology") + err := errors.New("failed to initialize LIKWID topology") + cclog.ComponentError(m.name, err.Error()) + return err } + globalParams := make(map[string]interface{}) + globalParams["time"] = float64(1.0) + globalParams["inverseClock"] = float64(1.0) + for i, evset := range m.config.Eventsets { estr := eventsToEventStr(evset.Events) + params := make(map[string]interface{}) + params["time"] = float64(1.0) + params["inverseClock"] = float64(1.0) + for counter, _ := range evset.Events { + params[counter] = float64(1.0) + } + for _, metric := range evset.Metrics { + _, err := gval.Evaluate(metric.Calc, params, gval.Full()) + if err != nil { + cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) + continue + } + if _, ok := globalParams[metric.Name]; !ok { + globalParams[metric.Name] = float64(1.0) + } + } cstr := C.CString(estr) gid := C.perfmon_addEventSet(cstr) if gid >= 0 { @@ -172,95 +286,173 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { m.gmresults[tid] = make(map[string]float64) } } + for _, metric := range m.config.Metrics { + _, err := gval.Evaluate(metric.Calc, globalParams, gval.Full()) + if err != nil { + cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) + continue + } + } if len(m.groups) == 0 { C.perfmon_finalize() C.topology_finalize() - return errors.New("No LIKWID performance group initialized") + err := errors.New("no LIKWID performance group initialized") + cclog.ComponentError(m.name, err.Error()) + return err } m.basefreq = getBaseFreq() m.init = true return nil } -func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) { - if !m.init { - return - } +func (m *LikwidCollector) takeMeasurement(group int, interval time.Duration) error { var ret C.int - - for i, gid := range m.groups { - evset := m.config.Eventsets[i] - ret = C.perfmon_setupCounters(gid) - if ret != 0 { - log.Print("Failed to setup performance group ", C.perfmon_getGroupName(gid)) - continue - } - ret = C.perfmon_startCounters() - if ret != 0 { - log.Print("Failed to start performance group ", C.perfmon_getGroupName(gid)) - continue - } - time.Sleep(interval) - ret = C.perfmon_stopCounters() - if ret != 0 { - log.Print("Failed to stop performance group ", C.perfmon_getGroupName(gid)) - continue - } - var eidx C.int - for tid := range m.cpulist { - for eidx = 0; int(eidx) < len(evset.Events); eidx++ { - ctr := C.perfmon_getCounterName(gid, eidx) - gctr := C.GoString(ctr) - res := C.perfmon_getLastResult(gid, eidx, C.int(tid)) - m.results[i][tid][gctr] = float64(res) - } - m.results[i][tid]["time"] = interval.Seconds() - m.results[i][tid]["inverseClock"] = float64(1.0 / m.basefreq) - for _, metric := range evset.Metrics { - expression, err := govaluate.NewEvaluableExpression(metric.Calc) - if err != nil { - log.Print(err.Error()) - continue - } - result, err := expression.Evaluate(m.results[i][tid]) - if err != nil { - log.Print(err.Error()) - continue - } - m.mresults[i][tid][metric.Name] = float64(result.(float64)) - } - } + gid := m.groups[group] + ret = C.perfmon_setupCounters(gid) + if ret != 0 { + gctr := C.GoString(C.perfmon_getGroupName(gid)) + err := fmt.Errorf("failed to setup performance group %s", gctr) + cclog.ComponentError(m.name, err.Error()) + return err } + ret = C.perfmon_startCounters() + if ret != 0 { + gctr := C.GoString(C.perfmon_getGroupName(gid)) + err := fmt.Errorf("failed to start performance group %s", gctr) + cclog.ComponentError(m.name, err.Error()) + return err + } + m.running = true + time.Sleep(interval) + m.running = false + ret = C.perfmon_stopCounters() + if ret != 0 { + gctr := C.GoString(C.perfmon_getGroupName(gid)) + err := fmt.Errorf("failed to stop performance group %s", gctr) + cclog.ComponentError(m.name, err.Error()) + return err + } + return nil +} - for _, metric := range m.config.Metrics { - for tid := range m.cpulist { - var params map[string]interface{} - expression, err := govaluate.NewEvaluableExpression(metric.Calc) +func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration) error { + var eidx C.int + evset := m.config.Eventsets[group] + gid := m.groups[group] + for tid := range m.cpulist { + for eidx = 0; int(eidx) < len(evset.Events); eidx++ { + ctr := C.perfmon_getCounterName(gid, eidx) + gctr := C.GoString(ctr) + res := C.perfmon_getLastResult(gid, eidx, C.int(tid)) + m.results[group][tid][gctr] = float64(res) + if m.results[group][tid][gctr] == 0 { + m.results[group][tid][gctr] = 1.0 + } + } + m.results[group][tid]["time"] = interval.Seconds() + m.results[group][tid]["inverseClock"] = float64(1.0 / m.basefreq) + for _, metric := range evset.Metrics { + value, err := gval.Evaluate(metric.Calc, m.results[group][tid], gval.Full()) if err != nil { - log.Print(err.Error()) + cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) continue } - params = make(map[string]interface{}) + m.mresults[group][tid][metric.Name] = value.(float64) + } + } + return nil +} + +func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration) error { + for _, metric := range m.config.Metrics { + for tid := range m.cpulist { + params := make(map[string]interface{}) for j := range m.groups { for mname, mres := range m.mresults[j][tid] { params[mname] = mres } } - result, err := expression.Evaluate(params) + value, err := gval.Evaluate(metric.Calc, params, gval.Full()) if err != nil { - log.Print(err.Error()) + cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) continue } - m.gmresults[tid][metric.Name] = float64(result.(float64)) + m.gmresults[tid][metric.Name] = value.(float64) } } + return nil +} + +// func (m *LikwidCollector) calcResultMetrics(interval time.Duration) ([]lp.CCMetric, error) { +// var err error = nil +// metrics := make([]lp.CCMetric, 0) +// for i := range m.groups { +// evset := m.config.Eventsets[i] +// for _, metric := range evset.Metrics { +// log.Print(metric.Name, " ", metric.Scope, " ", metric.granulatity) +// if metric.Scope.Granularity() > metric.granulatity.Granularity() { +// log.Print("Different granularity wanted for ", metric.Name, ": ", metric.Scope, " vs ", metric.granulatity) +// var idlist []int +// idfunc := func(cpuid int) int { return cpuid } +// switch metric.Scope { +// case "socket": +// idlist = topo.SocketList() +// idfunc = topo.GetCpuSocket +// case "numa": +// idlist = topo.NumaNodeList() +// idfunc = topo.GetCpuNumaDomain +// case "core": +// idlist = topo.CoreList() +// idfunc = topo.GetCpuCore +// case "die": +// idlist = topo.DieList() +// idfunc = topo.GetCpuDie +// case "node": +// idlist = topo.CpuList() +// } +// for i := 0; i < num_results; i++ { + +// } +// } +// } +// } +// for _, metric := range m.config.Metrics { +// log.Print(metric.Name, " ", metric.Scope, " ", metric.granulatity) +// if metric.Scope.Granularity() > metric.granulatity.Granularity() { +// log.Print("Different granularity wanted for ", metric.Name, ": ", metric.Scope, " vs ", metric.granulatity) +// } +// } +// return metrics, err +// } + +func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) { + if !m.init { + return + } + defer m.CatchGvalPanic() + + for i, _ := range m.groups { + // measure event set 'i' for 'interval' seconds + err := m.takeMeasurement(i, interval) + if err != nil { + cclog.ComponentError(m.name, err.Error()) + continue + } + m.calcEventsetMetrics(i, interval) + } + + m.calcGlobalMetrics(interval) + + //metrics, err = m.calcResultMetrics(interval) + for i := range m.groups { evset := m.config.Eventsets[i] for _, metric := range evset.Metrics { + _, skip := stringArrayContains(m.config.ExcludeMetrics, metric.Name) if metric.Publish && !skip { - if metric.Scope.String() == "socket" { + if metric.Scope == "socket" { for sid, tid := range m.sock2tid { y, err := lp.New(metric.Name, map[string]string{"type": "socket", @@ -272,7 +464,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) output <- y } } - } else if metric.Scope.String() == "hwthread" { + } else if metric.Scope == "hwthread" { for tid, cpu := range m.cpulist { y, err := lp.New(metric.Name, map[string]string{"type": "cpu", @@ -291,7 +483,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) for _, metric := range m.config.Metrics { _, skip := stringArrayContains(m.config.ExcludeMetrics, metric.Name) if metric.Publish && !skip { - if metric.Scope.String() == "socket" { + if metric.Scope == "socket" { for sid, tid := range m.sock2tid { y, err := lp.New(metric.Name, map[string]string{"type": "socket", @@ -303,7 +495,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) output <- y } } - } else { + } else if metric.Scope == "hwthread" { for tid, cpu := range m.cpulist { y, err := lp.New(metric.Name, map[string]string{"type": "cpu", @@ -322,8 +514,16 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) func (m *LikwidCollector) Close() { if m.init { + cclog.ComponentDebug(m.name, "Closing ...") m.init = false + if m.running { + cclog.ComponentDebug(m.name, "Stopping counters") + C.perfmon_stopCounters() + } + cclog.ComponentDebug(m.name, "Finalize LIKWID perfmon module") C.perfmon_finalize() + cclog.ComponentDebug(m.name, "Finalize LIKWID topology module") C.topology_finalize() + cclog.ComponentDebug(m.name, "Closing done") } } From 8319d3de43f5f597ad13afe06a0db40a7360ea8a Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Tue, 1 Feb 2022 18:26:54 +0100 Subject: [PATCH 06/12] Add some more helper functions to ccTopology --- internal/ccTopology/ccTopology.go | 141 +++++++++++++++++++++++++++--- 1 file changed, 129 insertions(+), 12 deletions(-) diff --git a/internal/ccTopology/ccTopology.go b/internal/ccTopology/ccTopology.go index 8d53b05..030b2f7 100644 --- a/internal/ccTopology/ccTopology.go +++ b/internal/ccTopology/ccTopology.go @@ -24,17 +24,23 @@ func intArrayContains(array []int, str int) (int, bool) { return -1, false } -// stringArrayContains scans an array of strings if the value str is present in the array -// If the specified value is found, the corresponding array index is returned. -// The bool value is used to signal success or failure -// func stringArrayContains(array []string, str string) (int, bool) { -// for i, a := range array { -// if a == str { -// return i, true -// } -// } -// return -1, false -// } +func fileToInt(path string) int { + buffer, err := ioutil.ReadFile(path) + if err != nil { + log.Print(err) + cclogger.ComponentError("ccTopology", "Reading", path, ":", err.Error()) + return -1 + } + sbuffer := strings.Replace(string(buffer), "\n", "", -1) + var id int64 + //_, err = fmt.Scanf("%d", sbuffer, &id) + id, err = strconv.ParseInt(sbuffer, 10, 32) + if err != nil { + cclogger.ComponentError("ccTopology", "Parsing", path, ":", sbuffer, err.Error()) + return -1 + } + return int(id) +} func SocketList() []int { buffer, err := ioutil.ReadFile("/proc/cpuinfo") @@ -68,7 +74,7 @@ func CpuList() []int { return nil } ll := strings.Split(string(buffer), "\n") - var cpulist []int + cpulist := make([]int, 0) for _, line := range ll { if strings.HasPrefix(line, "processor") { lv := strings.Fields(line) @@ -86,6 +92,67 @@ func CpuList() []int { return cpulist } +func CoreList() []int { + buffer, err := ioutil.ReadFile("/proc/cpuinfo") + if err != nil { + log.Print(err) + return nil + } + ll := strings.Split(string(buffer), "\n") + corelist := make([]int, 0) + for _, line := range ll { + if strings.HasPrefix(line, "core id") { + lv := strings.Fields(line) + id, err := strconv.ParseInt(lv[3], 10, 32) + if err != nil { + log.Print(err) + return corelist + } + _, found := intArrayContains(corelist, int(id)) + if !found { + corelist = append(corelist, int(id)) + } + } + } + return corelist +} + +func NumaNodeList() []int { + numalist := make([]int, 0) + files, err := filepath.Glob("/sys/devices/system/node/node*") + if err != nil { + log.Print(err) + } + for _, f := range files { + finfo, err := os.Lstat(f) + if err == nil && (finfo.IsDir() || finfo.Mode()&os.ModeSymlink != 0) { + var id int + parts := strings.Split(f, "/") + _, err = fmt.Scanf("node%d", parts[len(parts)-1], &id) + if err == nil { + _, found := intArrayContains(numalist, int(id)) + if !found { + numalist = append(numalist, int(id)) + } + } + } + } + return numalist +} + +func DieList() []int { + cpulist := CpuList() + dielist := make([]int, 0) + for _, c := range cpulist { + dieid := fileToInt(fmt.Sprintf("/sys/devices/system/cpu/cpu%d/topology/die_id", c)) + _, found := intArrayContains(dielist, int(dieid)) + if !found { + dielist = append(dielist, int(dieid)) + } + } + return dielist +} + type CpuEntry struct { Cpuid int SMT int @@ -203,6 +270,7 @@ type CpuInformation struct { SMTWidth int NumSockets int NumDies int + NumCores int NumNumaDomains int } @@ -213,6 +281,7 @@ func CpuInfo() CpuInformation { numa := 0 die := 0 socket := 0 + core := 0 cdata := CpuData() for _, d := range cdata { if d.SMT > smt { @@ -227,10 +296,14 @@ func CpuInfo() CpuInformation { if d.Socket > socket { socket = d.Socket } + if d.Core > core { + core = d.Core + } } c.NumNumaDomains = numa + 1 c.SMTWidth = smt + 1 c.NumDies = die + 1 + c.NumCores = core + 1 c.NumSockets = socket + 1 c.NumHWthreads = len(cdata) return c @@ -275,3 +348,47 @@ func GetCpuCore(cpuid int) int { } return -1 } + +func GetSocketCpus(socket int) []int { + all := CpuData() + cpulist := make([]int, 0) + for _, d := range all { + if d.Socket == socket { + cpulist = append(cpulist, d.Cpuid) + } + } + return cpulist +} + +func GetNumaDomainCpus(domain int) []int { + all := CpuData() + cpulist := make([]int, 0) + for _, d := range all { + if d.Numadomain == domain { + cpulist = append(cpulist, d.Cpuid) + } + } + return cpulist +} + +func GetDieCpus(die int) []int { + all := CpuData() + cpulist := make([]int, 0) + for _, d := range all { + if d.Die == die { + cpulist = append(cpulist, d.Cpuid) + } + } + return cpulist +} + +func GetCoreCpus(core int) []int { + all := CpuData() + cpulist := make([]int, 0) + for _, d := range all { + if d.Core == core { + cpulist = append(cpulist, d.Cpuid) + } + } + return cpulist +} From 64a12b80bb1e860c12835a6925319c300f8b251a Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Tue, 1 Feb 2022 18:27:16 +0100 Subject: [PATCH 07/12] Add and export SetName() function for CCMetric --- internal/ccMetric/ccMetric.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/ccMetric/ccMetric.go b/internal/ccMetric/ccMetric.go index 20b9786..9745e9d 100644 --- a/internal/ccMetric/ccMetric.go +++ b/internal/ccMetric/ccMetric.go @@ -25,6 +25,7 @@ type ccMetric struct { type CCMetric interface { lp.Metric // Time(), Name(), TagList(), FieldList() + SetName(name string) SetTime(t time.Time) Meta() map[string]string // Map of meta data tags @@ -68,6 +69,10 @@ func (m *ccMetric) Name() string { return m.name } +func (m *ccMetric) SetName(name string) { + m.name = name +} + // Tags returns the the list of tags as key-value-mapping func (m *ccMetric) Tags() map[string]string { return m.tags From a4bd1417861dc3556f4fb1cb5c25a398295627c1 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Tue, 1 Feb 2022 18:27:59 +0100 Subject: [PATCH 08/12] Use the MetricAggregator for all calculations in the MetricRouter --- internal/metricRouter/metricAggregator.go | 82 ++++++++++++++++++++++- internal/metricRouter/metricRouter.go | 76 +++++++++++++-------- 2 files changed, 130 insertions(+), 28 deletions(-) diff --git a/internal/metricRouter/metricAggregator.go b/internal/metricRouter/metricAggregator.go index 41c5276..e3303e4 100644 --- a/internal/metricRouter/metricAggregator.go +++ b/internal/metricRouter/metricAggregator.go @@ -3,6 +3,7 @@ package metricRouter import ( "context" "fmt" + "math" "os" "strings" "time" @@ -84,7 +85,7 @@ func (c *metricAggregator) Init(output chan lp.CCMetric) error { c.constants["smtWidth"] = cinfo.SMTWidth c.language = gval.NewLanguage( - gval.Base(), + gval.Full(), metricCacheLanguage, ) @@ -281,6 +282,85 @@ func (c *metricAggregator) AddFunction(name string, function func(args ...interf c.language = gval.NewLanguage(c.language, gval.Function(name, function)) } +func EvalBoolCondition(condition string, params map[string]interface{}) (bool, error) { + newcond := strings.ReplaceAll(condition, "'", "\"") + newcond = strings.ReplaceAll(newcond, "%", "\\") + language := gval.NewLanguage( + gval.Full(), + metricCacheLanguage, + ) + value, err := gval.Evaluate(newcond, params, language) + if err != nil { + return false, err + } + var endResult bool = false + err = nil + switch r := value.(type) { + case bool: + endResult = r + case float64: + if r != 0.0 { + endResult = true + } + case float32: + if r != 0.0 { + endResult = true + } + case int: + if r != 0 { + endResult = true + } + case int64: + if r != 0 { + endResult = true + } + case int32: + if r != 0 { + endResult = true + } + default: + err = fmt.Errorf("cannot evaluate '%s' to bool", newcond) + } + return endResult, err +} + +func EvalFloat64Condition(condition string, params map[string]interface{}) (float64, error) { + var endResult float64 = math.NaN() + newcond := strings.ReplaceAll(condition, "'", "\"") + newcond = strings.ReplaceAll(newcond, "%", "\\") + language := gval.NewLanguage( + gval.Full(), + metricCacheLanguage, + ) + value, err := gval.Evaluate(newcond, params, language) + if err != nil { + cclog.ComponentDebug("MetricRouter", condition, " = ", err.Error()) + return endResult, err + } + err = nil + switch r := value.(type) { + case bool: + if r { + endResult = 1.0 + } else { + endResult = 0.0 + } + case float64: + endResult = r + case float32: + endResult = float64(r) + case int: + endResult = float64(r) + case int64: + endResult = float64(r) + case int32: + endResult = float64(r) + default: + err = fmt.Errorf("cannot evaluate '%s' to float64", newcond) + } + return endResult, err +} + func NewAggregator(output chan lp.CCMetric) (MetricAggregator, error) { a := new(metricAggregator) err := a.Init(output) diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 83c14e7..8ec7e06 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -11,7 +11,6 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" - "gopkg.in/Knetic/govaluate.v2" ) // Metric router tag configuration @@ -26,8 +25,12 @@ type metricRouterConfig struct { AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met IntervalAgg []metricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval + DropMetrics []string `json:"drop_metrics"` // List of metric names to drop. For fine-grained dropping use drop_metrics_if + DropMetricsIf []string `json:"drop_metrics_if"` // List of evaluatable terms to drop metrics + RenameMetrics map[string]string `json:"rename_metrics"` // Map to rename metric name from key to value IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval? NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation + dropMetrics map[string]bool // Internal map for O(1) lookup } // Metric router data structure @@ -104,6 +107,10 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout for _, agg := range r.config.IntervalAgg { r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta) } + r.config.dropMetrics = make(map[string]bool) + for _, mname := range r.config.DropMetrics { + r.config.dropMetrics[mname] = true + } return nil } @@ -130,16 +137,9 @@ func (r *metricRouter) StartTimer() { cclog.ComponentDebug("MetricRouter", "TIMER START") } -// EvalCondition evaluates condition cond for metric data from point -func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, error) { - expression, err := govaluate.NewEvaluableExpression(cond) - if err != nil { - cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error()) - return false, err - } - - // Add metric name, tags, meta data, fields and timestamp to the parameter list +func getParamMap(point lp.CCMetric) map[string]interface{} { params := make(map[string]interface{}) + params["metric"] = point params["name"] = point.Name() for key, value := range point.Tags() { params[key] = value @@ -151,26 +151,19 @@ func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, erro params[f.Key] = f.Value } params["timestamp"] = point.Time() - - // evaluate condition - result, err := expression.Evaluate(params) - if err != nil { - cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error()) - return false, err - } - return bool(result.(bool)), err + return params } // DoAddTags adds a tag when condition is fullfiled func (r *metricRouter) DoAddTags(point lp.CCMetric) { for _, m := range r.config.AddTags { - var conditionMatches bool + var conditionMatches bool = false if m.Condition == "*" { conditionMatches = true } else { var err error - conditionMatches, err = r.EvalCondition(m.Condition, point) + conditionMatches, err = EvalBoolCondition(m.Condition, getParamMap(point)) if err != nil { cclog.ComponentError("MetricRouter", err.Error()) conditionMatches = false @@ -185,13 +178,13 @@ func (r *metricRouter) DoAddTags(point lp.CCMetric) { // DoDelTags removes a tag when condition is fullfiled func (r *metricRouter) DoDelTags(point lp.CCMetric) { for _, m := range r.config.DelTags { - var conditionMatches bool + var conditionMatches bool = false if m.Condition == "*" { conditionMatches = true } else { var err error - conditionMatches, err = r.EvalCondition(m.Condition, point) + conditionMatches, err = EvalBoolCondition(m.Condition, getParamMap(point)) if err != nil { cclog.ComponentError("MetricRouter", err.Error()) conditionMatches = false @@ -203,9 +196,24 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) { } } +// Conditional test whether a metric should be dropped +func (r *metricRouter) dropMetric(point lp.CCMetric) bool { + // Simple drop check + if _, ok := r.config.dropMetrics[point.Name()]; ok { + return true + } + // Checking the dropping conditions + for _, m := range r.config.DropMetricsIf { + conditionMatches, err := EvalBoolCondition(m, getParamMap(point)) + if conditionMatches || err != nil { + return true + } + } + return false +} + // Start starts the metric router func (r *metricRouter) Start() { - // start timer if configured r.timestamp = time.Now() if r.config.IntervalStamp { @@ -224,6 +232,12 @@ func (r *metricRouter) Start() { cclog.ComponentDebug("MetricRouter", "FORWARD", point) r.DoAddTags(point) r.DoDelTags(point) + if new, ok := r.config.RenameMetrics[point.Name()]; ok { + point.SetName(new) + } + r.DoAddTags(point) + r.DoDelTags(point) + for _, o := range r.outputs { o <- point } @@ -247,7 +261,11 @@ func (r *metricRouter) Start() { if r.config.IntervalStamp { p.SetTime(r.timestamp) } - forward(p) + if !r.dropMetric(p) { + forward(p) + } + // even if the metric is dropped, it is stored in the cache for + // aggregations r.cache.Add(p) case p := <-r.recv_input: @@ -255,12 +273,16 @@ func (r *metricRouter) Start() { if r.config.IntervalStamp { p.SetTime(r.timestamp) } - forward(p) + if !r.dropMetric(p) { + forward(p) + } case p := <-r.cache_input: // receive from metric collector - p.AddTag("hostname", r.hostname) - forward(p) + if !r.dropMetric(p) { + p.AddTag("hostname", r.hostname) + forward(p) + } } } }() From af8654d3250e908078f60f6dcf5f25b1a905fb30 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Tue, 1 Feb 2022 18:28:20 +0100 Subject: [PATCH 09/12] Update MetricRouter README --- internal/metricRouter/README.md | 168 +++++++++++++++++++++++++++++++- 1 file changed, 163 insertions(+), 5 deletions(-) diff --git a/internal/metricRouter/README.md b/internal/metricRouter/README.md index a3aef16..17f336c 100644 --- a/internal/metricRouter/README.md +++ b/internal/metricRouter/README.md @@ -6,6 +6,7 @@ The CCMetric router sits in between the collectors and the sinks and can be used ```json { + "interval_timestamp" : true, "add_tags" : [ { "key" : "cluster", @@ -25,16 +26,52 @@ The CCMetric router sits in between the collectors and the sinks and can be used "if" : "*" } ], - "interval_timestamp" : true + "interval_aggregates" : [ + { + "name" : "temp_cores_avg", + "if" : "match('temp_core_%d+', metric.Name())", + "function" : "avg(values)", + "tags" : { + "type" : "node" + }, + "meta" : { + "group": "IPMI", + "unit": "degC", + "source": "TempCollector" + } + } + ], + "drop_metrics" : [ + "not_interesting_metric_at_all" + ], + "drop_metrics_if" : [ + "match('temp_core_%d+', metric.Name())" + ], + "rename_metrics" : { + "metric_12345" : "mymetric" + } } ``` There are three main options `add_tags`, `delete_tags` and `interval_timestamp`. `add_tags` and `delete_tags` are lists consisting of dicts with `key`, `value` and `if`. The `value` can be omitted in the `delete_tags` part as it only uses the `key` for removal. The `interval_timestamp` setting means that a unique timestamp is applied to all metrics traversing the router during an interval. +# The `interval_timestamp` option -# Conditional manipulation of tags +The collectors' `Read()` functions are not called simultaneously and therefore the metrics gathered in an interval can have different timestamps. If you want to avoid that and have a common timestamp (the beginning of the interval), set this option to `true` and the MetricRouter sets the time. -The `if` setting allows conditional testing of a single metric like in the example: +# The `rename_metrics` option +In the ClusterCockpit world we specified a set of standard metrics. Since some collectors determine the metric names based on files, execuables and libraries, they might change from system to system (or installation to installtion, OS to OS, ...). In order to get the common names, you can rename incoming metrics before sending them to the sink. If the metric name matches the `oldname`, it is changed to `newname` + +```json +{ + "oldname" : "newname", + "clock_mhz" : "clock" +} +``` + +# Conditional manipulation of tags (`add_tags` and `del_tags`) + +Common config format: ```json { "key" : "test", @@ -43,8 +80,129 @@ The `if` setting allows conditional testing of a single metric like in the examp } ``` -If the CCMetric name is equal to 'temp_package_id_0', it adds an additional tag `test=testing` to the metric. +## The `del_tags` option -In order to match all metrics, you can use `*`, so in order to add a flag per default, like the `cluster=testcluster` tag in the example. +The collectors are free to add whatever `key=value` pair to the metric tags (although the usage of tags should be minimized). If you want to delete a tag afterwards, you can do that. When the `if` condition matches on a metric, the `key` is removed from the metric's tags. + +If you want to remove a tag for all metrics, use the condition wildcard `*`. The `value` field can be omitted in the `del_tags` case. + +Never delete tags: +- `hostname` +- `type` +- `type-id` + +## The `add_tags` option + +In some cases, metrics should be tagged or an existing tag changed based on some condition. This can be done in the `add_tags` section. When the `if` condition evaluates to `true`, the tag `key` is added or gets changed to the new `value`. + +If the CCMetric name is equal to `temp_package_id_0`, it adds an additional tag `test=testing` to the metric. + +For this metric, a more useful example would be: + +```json +[ + { + "key" : "type", + "value" : "socket", + "if" : "name == 'temp_package_id_0'" + }, + { + "key" : "type-id", + "value" : "0", + "if" : "name == 'temp_package_id_0'" + }, +] +``` + +The metric `temp_package_id_0` corresponds to the tempature of the first CPU socket (=package). With the above configuration, the tags would reflect that because commonly the [TempCollector](../../collectors/tempMetric.md) submits only `node` metrics. + +In order to match all metrics, you can use `*`, so in order to add a flag per default. This is useful to attached system-specific tags like `cluster=testcluster`: + +```json +{ + "key" : "cluster", + "value" : "testcluster", + "if" : "*" +} +``` + +# Dropping metrics + +In some cases, you want to drop a metric and don't get it forwarded to the sinks. There are two options based on the required specification: +- Based only on the metric name -> `drop_metrics` section +- An evaluable condition with more overhead -> `drop_metrics_if` section + +## The `drop_metrics` section + +The argument is a list of metric names. No futher checks are performed, only a comparison of the metric name + +```json +{ + "drop_metrics" : [ + "drop_metric_1", + "drop_metric_2" + ] +} +``` + +The example drops all metrics with the name `drop_metric_1` and `drop_metric_2`. + +## The `drop_metrics_if` section + +This option takes a list of evaluable conditions and performs them one after the other on **all** metrics incoming from the collectors and the metric cache (aka `interval_aggregates`). + +```json +{ + "drop_metrics_if" : [ + "match('drop_metric_%d+', name)", + "match('cpu', type) && type-id == 0" + ] +} +``` +The first line is comparable with the example in `drop_metrics`, it drops all metrics starting with `drop_metric_` and ending with a number. The second line drops all metrics of the first hardware thread (**not** recommended) +# Aggregate metric values of the current interval with the `interval_aggregates` option + +In some cases, you need to derive new metrics based on the metrics arriving during an interval. This can be done in the `interval_aggregates` section. The logic is similar to the other metric manipulation and filtering options. A cache stores all metrics that arrive during an interval. At the beginning of the *next* interval, the list of metrics is submitted to the MetricAggregator. It derives new metrics and submits them back to the MetricRouter, so they are sent in the next interval but have the timestamp of the previous interval beginning. + +```json +"interval_aggregates" : [ + { + "name" : "new_metric_name", + "if" : "match('sub_metric_%d+', metric.Name())", + "function" : "avg(values)", + "tags" : { + "key" : "value", + "type" : "node" + }, + "meta" : { + "key" : "value", + "group": "IPMI", + "unit": "", + } + } +] +``` + +The above configuration, collects all metric values for metrics evaluating `if` to `true`. Afterwards it calculates the average `avg` of the `values` (list of all metrics' field `value`) and creates a new CCMetric with the name `new_metric_name` and adds the tags in `tags` and the meta information in `meta`. The special value `` searches the input metrics and copies the value of the first match of `key` to the new CCMetric. + +If you are not interested in the input metrics `sub_metric_%d+` at all, you can add the same condition used here to the `drop_metrics_if` section to drop them. + +Use cases for `interval_aggregates`: +- Combine multiple metrics of the a collector to a new one like the [MemstatCollector](../../collectors/memstatMetric.go) does it for `mem_used`)): +```json + { + "name" : "mem_used", + "if" : "source == 'MemstatCollector'", + "function" : "sum(mem_total) - (sum(mem_free) + sum(mem_buffers) + sum(mem_cached))", + "tags" : { + "type" : "node" + }, + "meta" : { + "group": "", + "unit": "", + "source": "" + } + } +``` From ed62e952ce98dab0dc8c9deb2567b3ab5534b155 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Wed, 2 Feb 2022 14:52:07 +0100 Subject: [PATCH 10/12] Use MetricAggregator to calculate metrics in LIKWID collector. --- collectors/likwidMetric.go | 401 ++++++++++++++++++------------------- 1 file changed, 199 insertions(+), 202 deletions(-) diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index e3be810..82e241d 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -24,7 +24,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology" - "github.com/PaesslerAG/gval" + mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) type MetricScope string @@ -43,16 +43,32 @@ func (ms MetricScope) String() string { return string(ms) } +func (ms MetricScope) Likwid() string { + LikwidDomains := map[string]string{ + "hwthread": "", + "core": "", + "llc": "C", + "numadomain": "M", + "die": "D", + "socket": "S", + "node": "N", + } + return LikwidDomains[string(ms)] +} + func (ms MetricScope) Granularity() int { - grans := []string{"hwthread", "core", "llc", "numadomain", "die", "socket", "node"} - for i, g := range grans { - if ms.String() == g { + for i, g := range GetAllMetricScopes() { + if ms == g { return i } } return -1 } +func GetAllMetricScopes() []MetricScope { + return []MetricScope{"hwthread" /*, "core", "llc", "numadomain", "die",*/, "socket", "node"} +} + type LikwidCollectorMetricConfig struct { Name string `json:"name"` // Name of the metric Calc string `json:"calc"` // Calculation for the metric using @@ -77,16 +93,18 @@ type LikwidCollectorConfig struct { type LikwidCollector struct { metricCollector - cpulist []C.int - sock2tid map[int]int - metrics map[C.int]map[string]int - groups []C.int - config LikwidCollectorConfig - results map[int]map[int]map[string]interface{} - mresults map[int]map[int]map[string]float64 - gmresults map[int]map[string]float64 - basefreq float64 - running bool + cpulist []C.int + cpu2tid map[int]int + sock2tid map[int]int + scopeRespTids map[MetricScope]map[int]int + metrics map[C.int]map[string]int + groups []C.int + config LikwidCollectorConfig + results map[int]map[int]map[string]interface{} + mresults map[int]map[int]map[string]float64 + gmresults map[int]map[string]float64 + basefreq float64 + running bool } type LikwidMetric struct { @@ -138,28 +156,8 @@ func getBaseFreq() float64 { return freq } -func getSocketCpus() map[C.int]int { - slist := SocketList() - var cpu C.int - outmap := make(map[C.int]int) - for _, s := range slist { - t := C.CString(fmt.Sprintf("S%d", s)) - clen := C.cpustr_to_cpulist(t, &cpu, 1) - if int(clen) == 1 { - outmap[cpu] = s - } - } - return outmap -} - -func (m *LikwidCollector) CatchGvalPanic() { - if rerr := recover(); rerr != nil { - cclog.ComponentError(m.name, "Gval failed to calculate a metric", rerr) - m.init = false - } -} - func (m *LikwidCollector) initGranularity() { + splitRegex := regexp.MustCompile("[+-/*()]") for _, evset := range m.config.Eventsets { evset.granulatity = make(map[string]MetricScope) for counter, event := range evset.Events { @@ -169,7 +167,7 @@ func (m *LikwidCollector) initGranularity() { } } for i, metric := range evset.Metrics { - s := regexp.MustCompile("[+-/*()]").Split(metric.Calc, -1) + s := splitRegex.Split(metric.Calc, -1) gran := MetricScope("hwthread") evset.Metrics[i].granulatity = gran for _, x := range s { @@ -183,7 +181,7 @@ func (m *LikwidCollector) initGranularity() { } } for i, metric := range m.config.Metrics { - s := regexp.MustCompile("[+-/*()]").Split(metric.Calc, -1) + s := splitRegex.Split(metric.Calc, -1) gran := MetricScope("hwthread") m.config.Metrics[i].granulatity = gran for _, x := range s { @@ -199,6 +197,59 @@ func (m *LikwidCollector) initGranularity() { } } +type TopoResolveFunc func(cpuid int) int + +func (m *LikwidCollector) getResponsiblities() map[MetricScope]map[int]int { + get_cpus := func(scope MetricScope) map[int]int { + var slist []int + var cpu C.int + var input func(index int) string + switch scope { + case "node": + slist = []int{0} + input = func(index int) string { return "N:0" } + case "socket": + input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) } + slist = topo.SocketList() + // case "numadomain": + // input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) } + // slist = topo.NumaNodeList() + // cclog.Debug(scope, " ", input(0), " ", slist) + // case "die": + // input = func(index int) string { return fmt.Sprintf("%s%d:0", scope.Likwid(), index) } + // slist = topo.DieList() + // case "llc": + // input = fmt.Sprintf("%s%d:0", scope.Likwid(), s) + // slist = topo.LLCacheList() + case "hwthread": + input = func(index int) string { return fmt.Sprintf("%d", index) } + slist = topo.CpuList() + } + outmap := make(map[int]int) + for _, s := range slist { + t := C.CString(input(s)) + clen := C.cpustr_to_cpulist(t, &cpu, 1) + if int(clen) == 1 { + outmap[s] = m.cpu2tid[int(cpu)] + } else { + cclog.Error(fmt.Sprintf("Cannot determine responsible CPU for %s", input(s))) + outmap[s] = -1 + } + C.free(unsafe.Pointer(t)) + } + return outmap + } + + scopes := GetAllMetricScopes() + complete := make(map[MetricScope]map[int]int) + for _, s := range scopes { + cclog.Debug("Start ", s) + complete[s] = get_cpus(s) + cclog.Debug("End ", s) + } + return complete +} + func (m *LikwidCollector) Init(config json.RawMessage) error { var ret C.int m.name = "LikwidCollector" @@ -208,40 +259,39 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { return err } } - m.initGranularity() if m.config.ForceOverwrite { + cclog.ComponentDebug(m.name, "Set LIKWID_FORCE=1") os.Setenv("LIKWID_FORCE", "1") } m.setup() - // in some cases, gval causes a panic. We catch it with the handler and deactivate - // the collector (m.init = false). - defer m.CatchGvalPanic() m.meta = map[string]string{"source": m.name, "group": "PerfCounter"} + cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists") cpulist := topo.CpuList() m.cpulist = make([]C.int, len(cpulist)) - - cclog.ComponentDebug(m.name, "Create maps for socket, numa, core and die metrics") - m.sock2tid = make(map[int]int) - // m.numa2tid = make(map[int]int) - // m.core2tid = make(map[int]int) - // m.die2tid = make(map[int]int) + m.cpu2tid = make(map[int]int) for i, c := range cpulist { m.cpulist[i] = C.int(c) - m.sock2tid[topo.GetCpuSocket(c)] = i - // m.numa2tid[topo.GetCpuNumaDomain(c)] = i - // m.core2tid[topo.GetCpuCore(c)] = i - // m.die2tid[topo.GetCpuDie(c)] = i + m.cpu2tid[c] = i + } m.results = make(map[int]map[int]map[string]interface{}) m.mresults = make(map[int]map[int]map[string]float64) m.gmresults = make(map[int]map[string]float64) + cclog.ComponentDebug(m.name, "initialize LIKWID topology") ret = C.topology_init() if ret != 0 { err := errors.New("failed to initialize LIKWID topology") cclog.ComponentError(m.name, err.Error()) return err } + + // Determine which counter works at which level. PMC*: hwthread, *BOX*: socket, ... + m.initGranularity() + // Generate map for MetricScope -> scope_id (like socket id) -> responsible id (offset in cpulist) + m.scopeRespTids = m.getResponsiblities() + + cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module") ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0]) if ret != 0 { C.topology_finalize() @@ -250,28 +300,33 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { return err } + // This is for the global metrics computation test globalParams := make(map[string]interface{}) globalParams["time"] = float64(1.0) globalParams["inverseClock"] = float64(1.0) - + // While adding the events, we test the metrics whether they can be computed at all for i, evset := range m.config.Eventsets { estr := eventsToEventStr(evset.Events) + // Generate parameter list for the metric computing test params := make(map[string]interface{}) params["time"] = float64(1.0) params["inverseClock"] = float64(1.0) - for counter, _ := range evset.Events { + for counter := range evset.Events { params[counter] = float64(1.0) } for _, metric := range evset.Metrics { - _, err := gval.Evaluate(metric.Calc, params, gval.Full()) + // Try to evaluate the metric + _, err := mr.EvalFloat64Condition(metric.Calc, params) if err != nil { cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) continue } + // If the metric is not in the parameter list for the global metrics, add it if _, ok := globalParams[metric.Name]; !ok { globalParams[metric.Name] = float64(1.0) } } + // Now we add the list of events to likwid cstr := C.CString(estr) gid := C.perfmon_addEventSet(cstr) if gid >= 0 { @@ -283,17 +338,21 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { for tid := range m.cpulist { m.results[i][tid] = make(map[string]interface{}) m.mresults[i][tid] = make(map[string]float64) - m.gmresults[tid] = make(map[string]float64) + if i == 0 { + m.gmresults[tid] = make(map[string]float64) + } } } for _, metric := range m.config.Metrics { - _, err := gval.Evaluate(metric.Calc, globalParams, gval.Full()) + // Try to evaluate the global metric + _, err := mr.EvalFloat64Condition(metric.Calc, globalParams) if err != nil { cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) continue } } + // If no event set could be added, shut down LikwidCollector if len(m.groups) == 0 { C.perfmon_finalize() C.topology_finalize() @@ -306,6 +365,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { return nil } +// take a measurement for 'interval' seconds of event set index 'group' func (m *LikwidCollector) takeMeasurement(group int, interval time.Duration) error { var ret C.int gid := m.groups[group] @@ -336,101 +396,104 @@ func (m *LikwidCollector) takeMeasurement(group int, interval time.Duration) err return nil } -func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration) error { +// Get all measurement results for an event set, derive the metric values out of the measurement results and send it +func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration, output chan lp.CCMetric) error { var eidx C.int evset := m.config.Eventsets[group] gid := m.groups[group] - for tid := range m.cpulist { - for eidx = 0; int(eidx) < len(evset.Events); eidx++ { - ctr := C.perfmon_getCounterName(gid, eidx) - gctr := C.GoString(ctr) - res := C.perfmon_getLastResult(gid, eidx, C.int(tid)) - m.results[group][tid][gctr] = float64(res) - if m.results[group][tid][gctr] == 0 { - m.results[group][tid][gctr] = 1.0 + + // Go over events and get the results + for eidx = 0; int(eidx) < len(evset.Events); eidx++ { + ctr := C.perfmon_getCounterName(gid, eidx) + ev := C.perfmon_getEventName(gid, eidx) + gctr := C.GoString(ctr) + gev := C.GoString(ev) + // MetricScope for the counter (and if needed the event) + scope := getGranularity(gctr, gev) + // Get the map scope-id -> tids + // This way we read less counters like only the responsible hardware thread for a socket + scopemap := m.scopeRespTids[scope] + for _, tid := range scopemap { + if tid >= 0 { + m.results[group][tid]["time"] = interval.Seconds() + m.results[group][tid]["inverseClock"] = float64(1.0 / m.basefreq) + res := C.perfmon_getLastResult(gid, eidx, C.int(tid)) + m.results[group][tid][gctr] = float64(res) } } - m.results[group][tid]["time"] = interval.Seconds() - m.results[group][tid]["inverseClock"] = float64(1.0 / m.basefreq) - for _, metric := range evset.Metrics { - value, err := gval.Evaluate(metric.Calc, m.results[group][tid], gval.Full()) - if err != nil { - cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) - continue - } - m.mresults[group][tid][metric.Name] = value.(float64) - } } - return nil -} -func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration) error { - for _, metric := range m.config.Metrics { - for tid := range m.cpulist { - params := make(map[string]interface{}) - for j := range m.groups { - for mname, mres := range m.mresults[j][tid] { - params[mname] = mres + // Go over the event set metrics, derive the value out of the event:counter values and send it + for _, metric := range evset.Metrics { + // The metric scope is determined in the Init() function + // Get the map scope-id -> tids + scopemap := m.scopeRespTids[metric.Scope] + for domain, tid := range scopemap { + if tid >= 0 { + value, err := mr.EvalFloat64Condition(metric.Calc, m.results[group][tid]) + if err != nil { + cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) + continue + } + m.mresults[group][tid][metric.Name] = value + // Now we have the result, send it with the proper tags + tags := map[string]string{"type": metric.Scope.String()} + if metric.Scope != "node" { + tags["type-id"] = fmt.Sprintf("%d", domain) + } + fields := map[string]interface{}{"value": value} + y, err := lp.New(metric.Name, tags, m.meta, fields, time.Now()) + if err == nil { + output <- y } } - value, err := gval.Evaluate(metric.Calc, params, gval.Full()) - if err != nil { - cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) - continue + } + } + + return nil +} + +// Go over the global metrics, derive the value out of the event sets' metric values and send it +func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan lp.CCMetric) error { + for _, metric := range m.config.Metrics { + scopemap := m.scopeRespTids[metric.Scope] + for domain, tid := range scopemap { + if tid >= 0 { + // Here we generate parameter list + params := make(map[string]interface{}) + for j := range m.groups { + for mname, mres := range m.mresults[j][tid] { + params[mname] = mres + } + } + // Evaluate the metric + value, err := mr.EvalFloat64Condition(metric.Calc, params) + if err != nil { + cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error()) + continue + } + m.gmresults[tid][metric.Name] = value + // Now we have the result, send it with the proper tags + tags := map[string]string{"type": metric.Scope.String()} + if metric.Scope != "node" { + tags["type-id"] = fmt.Sprintf("%d", domain) + } + fields := map[string]interface{}{"value": value} + y, err := lp.New(metric.Name, tags, m.meta, fields, time.Now()) + if err == nil { + output <- y + } } - m.gmresults[tid][metric.Name] = value.(float64) } } return nil } -// func (m *LikwidCollector) calcResultMetrics(interval time.Duration) ([]lp.CCMetric, error) { -// var err error = nil -// metrics := make([]lp.CCMetric, 0) -// for i := range m.groups { -// evset := m.config.Eventsets[i] -// for _, metric := range evset.Metrics { -// log.Print(metric.Name, " ", metric.Scope, " ", metric.granulatity) -// if metric.Scope.Granularity() > metric.granulatity.Granularity() { -// log.Print("Different granularity wanted for ", metric.Name, ": ", metric.Scope, " vs ", metric.granulatity) -// var idlist []int -// idfunc := func(cpuid int) int { return cpuid } -// switch metric.Scope { -// case "socket": -// idlist = topo.SocketList() -// idfunc = topo.GetCpuSocket -// case "numa": -// idlist = topo.NumaNodeList() -// idfunc = topo.GetCpuNumaDomain -// case "core": -// idlist = topo.CoreList() -// idfunc = topo.GetCpuCore -// case "die": -// idlist = topo.DieList() -// idfunc = topo.GetCpuDie -// case "node": -// idlist = topo.CpuList() -// } -// for i := 0; i < num_results; i++ { - -// } -// } -// } -// } -// for _, metric := range m.config.Metrics { -// log.Print(metric.Name, " ", metric.Scope, " ", metric.granulatity) -// if metric.Scope.Granularity() > metric.granulatity.Granularity() { -// log.Print("Different granularity wanted for ", metric.Name, ": ", metric.Scope, " vs ", metric.granulatity) -// } -// } -// return metrics, err -// } - +// main read function taking multiple measurement rounds, each 'interval' seconds long func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) { if !m.init { return } - defer m.CatchGvalPanic() for i, _ := range m.groups { // measure event set 'i' for 'interval' seconds @@ -439,77 +502,11 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) cclog.ComponentError(m.name, err.Error()) continue } - m.calcEventsetMetrics(i, interval) - } - - m.calcGlobalMetrics(interval) - - //metrics, err = m.calcResultMetrics(interval) - - for i := range m.groups { - evset := m.config.Eventsets[i] - for _, metric := range evset.Metrics { - - _, skip := stringArrayContains(m.config.ExcludeMetrics, metric.Name) - if metric.Publish && !skip { - if metric.Scope == "socket" { - for sid, tid := range m.sock2tid { - y, err := lp.New(metric.Name, - map[string]string{"type": "socket", - "type-id": fmt.Sprintf("%d", int(sid))}, - m.meta, - map[string]interface{}{"value": m.mresults[i][tid][metric.Name]}, - time.Now()) - if err == nil { - output <- y - } - } - } else if metric.Scope == "hwthread" { - for tid, cpu := range m.cpulist { - y, err := lp.New(metric.Name, - map[string]string{"type": "cpu", - "type-id": fmt.Sprintf("%d", int(cpu))}, - m.meta, - map[string]interface{}{"value": m.mresults[i][tid][metric.Name]}, - time.Now()) - if err == nil { - output <- y - } - } - } - } - } - } - for _, metric := range m.config.Metrics { - _, skip := stringArrayContains(m.config.ExcludeMetrics, metric.Name) - if metric.Publish && !skip { - if metric.Scope == "socket" { - for sid, tid := range m.sock2tid { - y, err := lp.New(metric.Name, - map[string]string{"type": "socket", - "type-id": fmt.Sprintf("%d", int(sid))}, - m.meta, - map[string]interface{}{"value": m.gmresults[tid][metric.Name]}, - time.Now()) - if err == nil { - output <- y - } - } - } else if metric.Scope == "hwthread" { - for tid, cpu := range m.cpulist { - y, err := lp.New(metric.Name, - map[string]string{"type": "cpu", - "type-id": fmt.Sprintf("%d", int(cpu))}, - m.meta, - map[string]interface{}{"value": m.gmresults[tid][metric.Name]}, - time.Now()) - if err == nil { - output <- y - } - } - } - } + // read measurements and derive event set metrics + m.calcEventsetMetrics(i, interval, output) } + // use the event set metrics to derive the global metrics + m.calcGlobalMetrics(interval, output) } func (m *LikwidCollector) Close() { From 2c13cecf133dc1f9c962cd9a24f25551d724584c Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Wed, 2 Feb 2022 14:52:19 +0100 Subject: [PATCH 11/12] Fix link in MetricRouter README --- internal/metricRouter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/metricRouter/README.md b/internal/metricRouter/README.md index 17f336c..d239c64 100644 --- a/internal/metricRouter/README.md +++ b/internal/metricRouter/README.md @@ -190,7 +190,7 @@ The above configuration, collects all metric values for metrics evaluating `if` If you are not interested in the input metrics `sub_metric_%d+` at all, you can add the same condition used here to the `drop_metrics_if` section to drop them. Use cases for `interval_aggregates`: -- Combine multiple metrics of the a collector to a new one like the [MemstatCollector](../../collectors/memstatMetric.go) does it for `mem_used`)): +- Combine multiple metrics of the a collector to a new one like the [MemstatCollector](../../collectors/memstatMetric.md) does it for `mem_used`)): ```json { "name" : "mem_used", From 1222f7a32fa96d5e8861cb6956c81ef579ffd847 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Wed, 2 Feb 2022 15:30:14 +0100 Subject: [PATCH 12/12] Configuration option to disable MetricCache completely --- internal/metricRouter/README.md | 9 +++++++ internal/metricRouter/metricRouter.go | 34 +++++++++++++++------------ 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/internal/metricRouter/README.md b/internal/metricRouter/README.md index d239c64..9cd0d6c 100644 --- a/internal/metricRouter/README.md +++ b/internal/metricRouter/README.md @@ -6,6 +6,7 @@ The CCMetric router sits in between the collectors and the sinks and can be used ```json { + "num_cache_intervals" : 1, "interval_timestamp" : true, "add_tags" : [ { @@ -58,6 +59,12 @@ There are three main options `add_tags`, `delete_tags` and `interval_timestamp`. The collectors' `Read()` functions are not called simultaneously and therefore the metrics gathered in an interval can have different timestamps. If you want to avoid that and have a common timestamp (the beginning of the interval), set this option to `true` and the MetricRouter sets the time. +# The `num_cache_intervals` option + +If the MetricRouter should buffer metrics of intervals in a MetricCache, this option specifies the number of past intervals that should be kept. If `num_cache_intervals = 0`, the cache is disabled. With `num_cache_intervals = 1`, only the metrics of the last interval are buffered. + +A `num_cache_intervals > 0` is required to use the `interval_aggregates` option. + # The `rename_metrics` option In the ClusterCockpit world we specified a set of standard metrics. Since some collectors determine the metric names based on files, execuables and libraries, they might change from system to system (or installation to installtion, OS to OS, ...). In order to get the common names, you can rename incoming metrics before sending them to the sink. If the metric name matches the `oldname`, it is changed to `newname` @@ -164,6 +171,8 @@ The first line is comparable with the example in `drop_metrics`, it drops all me # Aggregate metric values of the current interval with the `interval_aggregates` option +**Note:** `interval_aggregates` works only if `num_cache_intervals` > 0 + In some cases, you need to derive new metrics based on the metrics arriving during an interval. This can be done in the `interval_aggregates` section. The logic is similar to the other metric manipulation and filtering options. A cache stores all metrics that arrive during an interval. At the beginning of the *next* interval, the list of metrics is submitted to the MetricAggregator. It derives new metrics and submits them back to the MetricRouter, so they are sent in the next interval but have the timestamp of the previous interval beginning. ```json diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 8ec7e06..6d63e15 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -95,17 +95,15 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout cclog.ComponentError("MetricRouter", err.Error()) return err } - numIntervals := r.config.NumCacheIntervals - if numIntervals <= 0 { - numIntervals = 1 - } - r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, numIntervals) - if err != nil { - cclog.ComponentError("MetricRouter", "MetricCache initialization failed:", err.Error()) - return err - } - for _, agg := range r.config.IntervalAgg { - r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta) + if r.config.NumCacheIntervals >= 0 { + r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals) + if err != nil { + cclog.ComponentError("MetricRouter", "MetricCache initialization failed:", err.Error()) + return err + } + for _, agg := range r.config.IntervalAgg { + r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta) + } } r.config.dropMetrics = make(map[string]bool) for _, mname := range r.config.DropMetrics { @@ -244,7 +242,9 @@ func (r *metricRouter) Start() { } // Start Metric Cache - r.cache.Start() + if r.config.NumCacheIntervals > 0 { + r.cache.Start() + } r.wg.Add(1) go func() { @@ -266,7 +266,9 @@ func (r *metricRouter) Start() { } // even if the metric is dropped, it is stored in the cache for // aggregations - r.cache.Add(p) + if r.config.NumCacheIntervals > 0 { + r.cache.Add(p) + } case p := <-r.recv_input: // receive from receive manager @@ -316,8 +318,10 @@ func (r *metricRouter) Close() { // wait for close of channel r.timerdone <-r.timerdone } - r.cache.Close() - r.cachewg.Wait() + if r.config.NumCacheIntervals > 0 { + r.cache.Close() + r.cachewg.Wait() + } } // New creates a new initialized metric router