package ccmetric rewrite

This commit is contained in:
Holger Obermaier 2022-01-31 20:54:06 +01:00
parent 862630a218
commit eca963c58a
4 changed files with 86 additions and 158 deletions

View File

@ -15,61 +15,73 @@ import (
// See: https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/ // See: https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/
type ccMetric struct { type ccMetric struct {
name string // Measurement name name string // Measurement name
tags []*lp.Tag // ordered list of of tags meta map[string]string // map of meta data tags
tags map[string]string // map of of tags
fields []*lp.Field // unordered list of of fields fields []*lp.Field // unordered list of of fields
tm time.Time // timestamp tm time.Time // timestamp
meta []*lp.Tag // odered list of meta data tags
} }
// ccmetric access functions // ccmetric access functions
type CCMetric interface { type CCMetric interface {
lp.MutableMetric // SetTime, AddTag, AddField lp.Metric // Time(), Name(), TagList(), FieldList()
SetTime(t time.Time)
MetaMap() map[string]string // Map of meta data tags
MetaList() []*lp.Tag // Ordered list of meta data
AddMeta(key, value string) // Add a meta data tag AddMeta(key, value string) // Add a meta data tag
MetaList() []*lp.Tag // Returns the meta data list
RemoveTag(key string) // Remove a tag addressed by its key
GetTag(key string) (string, bool) // Get a tag addressed by its key
GetMeta(key string) (string, bool) // Get a meta data tab addressed by its key GetMeta(key string) (string, bool) // Get a meta data tab addressed by its key
TagMap() map[string]string // Map of tags
AddTag(key, value string) // Add a tag
GetTag(key string) (string, bool) // Get a tag by its key
RemoveTag(key string) // Remove a tag by its key
GetField(key string) (interface{}, bool) // Get a field addressed by its key GetField(key string) (interface{}, bool) // Get a field addressed by its key
HasField(key string) bool // Check if a field key is present HasField(key string) bool // Check if a field key is present
RemoveField(key string) // Remove a field addressed by its key RemoveField(key string) // Remove a field addressed by its key
} }
// Meta returns the list of meta data tags as key-value mapping // MetaMap returns the meta data tags as key-value mapping
func (m *ccMetric) Meta() map[string]string { func (m *ccMetric) MetaMap() map[string]string {
meta := make(map[string]string, len(m.meta)) return m.meta
for _, m := range m.meta {
meta[m.Key] = m.Value
}
return meta
} }
// MetaList returns the list of meta data tags // MetaList returns the the list of meta data tags as sorted list of key value tags
func (m *ccMetric) MetaList() []*lp.Tag { func (m *ccMetric) MetaList() (ml []*lp.Tag) {
return m.meta
ml = make([]*lp.Tag, 0, len(m.meta))
for key, value := range m.meta {
ml = append(ml, &lp.Tag{Key: key, Value: value})
}
sort.Slice(ml, func(i, j int) bool { return ml[i].Key < ml[j].Key })
return
} }
// String implements the stringer interface for data type ccMetric // String implements the stringer interface for data type ccMetric
func (m *ccMetric) String() string { func (m *ccMetric) String() string {
return fmt.Sprintf("%s %v %v %v %d", m.name, m.Tags(), m.Meta(), m.Fields(), m.tm.UnixNano()) return fmt.Sprintf("%s %v %v %v %d", m.name, m.tags, m.meta, m.Fields(), m.tm.UnixNano())
} }
// Name returns the metric name // Name returns the measurement name
func (m *ccMetric) Name() string { func (m *ccMetric) Name() string {
return m.name return m.name
} }
// Tags returns the the list of tags as key-value-mapping // TagMap returns the the list of tags as key-value-mapping
func (m *ccMetric) Tags() map[string]string { func (m *ccMetric) TagMap() map[string]string {
tags := make(map[string]string, len(m.tags)) return m.tags
for _, tag := range m.tags {
tags[tag.Key] = tag.Value
}
return tags
} }
// TagList returns the list of tags // TagList returns the the list of tags as sorted list of key value tags
func (m *ccMetric) TagList() []*lp.Tag { func (m *ccMetric) TagList() (tl []*lp.Tag) {
return m.tags
tl = make([]*lp.Tag, 0, len(m.tags))
for key, value := range m.tags {
tl = append(tl, &lp.Tag{Key: key, Value: value})
}
sort.Slice(tl, func(i, j int) bool { return tl[i].Key < tl[j].Key })
return
} }
// Fields returns the list of fields as key-value-mapping // Fields returns the list of fields as key-value-mapping
@ -99,112 +111,50 @@ func (m *ccMetric) SetTime(t time.Time) {
// HasTag checks if a tag with key equal to <key> is present in the list of tags // HasTag checks if a tag with key equal to <key> is present in the list of tags
func (m *ccMetric) HasTag(key string) bool { func (m *ccMetric) HasTag(key string) bool {
for _, tag := range m.tags { _, ok := m.tags[key]
if tag.Key == key { return ok
return true
}
}
return false
} }
// GetTag returns the tag with tag's key equal to <key> // GetTag returns the tag with tag's key equal to <key>
func (m *ccMetric) GetTag(key string) (string, bool) { func (m *ccMetric) GetTag(key string) (string, bool) {
for _, tag := range m.tags { value, ok := m.tags[key]
if tag.Key == key { return value, ok
return tag.Value, true
}
}
return "", false
} }
// RemoveTag removes the tag with tag's key equal to <key> // RemoveTag removes the tag with tag's key equal to <key>
// and keeps the tag list ordered by the keys // and keeps the tag list ordered by the keys
func (m *ccMetric) RemoveTag(key string) { func (m *ccMetric) RemoveTag(key string) {
for i, tag := range m.tags { delete(m.tags, key)
if tag.Key == key {
copy(m.tags[i:], m.tags[i+1:])
m.tags[len(m.tags)-1] = nil
m.tags = m.tags[:len(m.tags)-1]
return
}
}
} }
// AddTag adds a tag (consisting of key and value) // AddTag adds a tag (consisting of key and value)
// and keeps the tag list ordered by the keys // and keeps the tag list ordered by the keys
func (m *ccMetric) AddTag(key, value string) { func (m *ccMetric) AddTag(key, value string) {
for i, tag := range m.tags { m.tags[key] = value
if key > tag.Key {
continue
}
if key == tag.Key {
tag.Value = value
return
}
m.tags = append(m.tags, nil)
copy(m.tags[i+1:], m.tags[i:])
m.tags[i] = &lp.Tag{Key: key, Value: value}
return
}
m.tags = append(m.tags, &lp.Tag{Key: key, Value: value})
} }
// HasTag checks if a meta data tag with meta data's key equal to <key> is present in the list of meta data tags // HasTag checks if a meta data tag with meta data's key equal to <key> is present in the list of meta data tags
func (m *ccMetric) HasMeta(key string) bool { func (m *ccMetric) HasMeta(key string) bool {
for _, tag := range m.meta { _, ok := m.meta[key]
if tag.Key == key { return ok
return true
}
}
return false
} }
// GetMeta returns the meta data tag with meta data's key equal to <key> // GetMeta returns the meta data tag with meta data's key equal to <key>
func (m *ccMetric) GetMeta(key string) (string, bool) { func (m *ccMetric) GetMeta(key string) (string, bool) {
for _, tag := range m.meta { value, ok := m.meta[key]
if tag.Key == key { return value, ok
return tag.Value, true
}
}
return "", false
} }
// RemoveMeta removes the meta data tag with tag's key equal to <key> // RemoveMeta removes the meta data tag with tag's key equal to <key>
// and keeps the meta data tag list ordered by the keys // and keeps the meta data tag list ordered by the keys
func (m *ccMetric) RemoveMeta(key string) { func (m *ccMetric) RemoveMeta(key string) {
for i, tag := range m.meta { delete(m.meta, key)
if tag.Key == key {
copy(m.meta[i:], m.meta[i+1:])
m.meta[len(m.meta)-1] = nil
m.meta = m.meta[:len(m.meta)-1]
return
}
}
} }
// AddMeta adds a meta data tag (consisting of key and value) // AddMeta adds a meta data tag (consisting of key and value)
// and keeps the meta data list ordered by the keys // and keeps the meta data list ordered by the keys
func (m *ccMetric) AddMeta(key, value string) { func (m *ccMetric) AddMeta(key, value string) {
for i, tag := range m.meta { m.meta[key] = value
if key > tag.Key {
continue
}
if key == tag.Key {
tag.Value = value
return
}
m.meta = append(m.meta, nil)
copy(m.meta[i+1:], m.meta[i:])
m.meta[i] = &lp.Tag{Key: key, Value: value}
return
}
m.meta = append(m.meta, &lp.Tag{Key: key, Value: value})
} }
// AddField adds a field (consisting of key and value) to the unordered list of fields // AddField adds a field (consisting of key and value) to the unordered list of fields
@ -261,30 +211,10 @@ func New(
) (CCMetric, error) { ) (CCMetric, error) {
m := &ccMetric{ m := &ccMetric{
name: name, name: name,
tags: nil, tags: tags,
fields: nil, fields: nil,
tm: tm, tm: tm,
meta: nil, meta: meta,
}
// Sorted list of tags
if len(tags) > 0 {
m.tags = make([]*lp.Tag, 0, len(tags))
for k, v := range tags {
m.tags = append(m.tags,
&lp.Tag{Key: k, Value: v})
}
sort.Slice(m.tags, func(i, j int) bool { return m.tags[i].Key < m.tags[j].Key })
}
// Sorted list of meta data tags
if len(meta) > 0 {
m.meta = make([]*lp.Tag, 0, len(meta))
for k, v := range meta {
m.meta = append(m.meta,
&lp.Tag{Key: k, Value: v})
}
sort.Slice(m.meta, func(i, j int) bool { return m.meta[i].Key < m.meta[j].Key })
} }
// Unsorted list of fields // Unsorted list of fields
@ -303,20 +233,20 @@ func New(
} }
// FromMetric copies the metric <other> // FromMetric copies the metric <other>
func FromMetric(other CCMetric) CCMetric { func FromMetric(other ccMetric) CCMetric {
m := &ccMetric{ m := &ccMetric{
name: other.Name(), name: other.Name(),
tags: make([]*lp.Tag, len(other.TagList())), tags: make(map[string]string),
fields: make([]*lp.Field, len(other.FieldList())), fields: make([]*lp.Field, len(other.FieldList())),
meta: make([]*lp.Tag, len(other.MetaList())), meta: make(map[string]string),
tm: other.Time(), tm: other.Time(),
} }
for i, tag := range other.TagList() { for key, value := range other.TagMap() {
m.tags[i] = &lp.Tag{Key: tag.Key, Value: tag.Value} m.tags[key] = value
} }
for i, s := range other.MetaList() { for key, value := range other.MetaMap() {
m.meta[i] = &lp.Tag{Key: s.Key, Value: s.Value} m.meta[key] = value
} }
for i, field := range other.FieldList() { for i, field := range other.FieldList() {
@ -329,17 +259,14 @@ func FromMetric(other CCMetric) CCMetric {
func FromInfluxMetric(other lp.Metric) CCMetric { func FromInfluxMetric(other lp.Metric) CCMetric {
m := &ccMetric{ m := &ccMetric{
name: other.Name(), name: other.Name(),
tags: make([]*lp.Tag, len(other.TagList())), tags: make(map[string]string),
fields: make([]*lp.Field, len(other.FieldList())), fields: make([]*lp.Field, len(other.FieldList())),
meta: make([]*lp.Tag, 0), meta: make(map[string]string),
tm: other.Time(), tm: other.Time(),
} }
for i, otherTag := range other.TagList() { for _, otherTag := range other.TagList() {
m.tags[i] = &lp.Tag{ m.tags[otherTag.Key] = otherTag.Value
Key: otherTag.Key,
Value: otherTag.Value,
}
} }
for i, otherField := range other.FieldList() { for i, otherField := range other.FieldList() {

View File

@ -141,11 +141,11 @@ func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, erro
// Add metric name, tags, meta data, fields and timestamp to the parameter list // Add metric name, tags, meta data, fields and timestamp to the parameter list
params := make(map[string]interface{}) params := make(map[string]interface{})
params["name"] = point.Name() params["name"] = point.Name()
for _, t := range point.TagList() { for key, value := range point.TagMap() {
params[t.Key] = t.Value params[key] = value
} }
for _, m := range point.MetaList() { for key, value := range point.MetaMap() {
params[m.Key] = m.Value params[key] = value
} }
for _, f := range point.FieldList() { for _, f := range point.FieldList() {
params[f.Key] = f.Value params[f.Key] = f.Value

View File

@ -30,16 +30,16 @@ func (s *GangliaSink) Write(point lp.CCMetric) error {
var err error = nil var err error = nil
var tagsstr []string var tagsstr []string
var argstr []string var argstr []string
for _, t := range point.TagList() { for key, value := range point.TagMap() {
switch t.Key { switch key {
case "cluster": case "cluster":
argstr = append(argstr, fmt.Sprintf("--cluster=%s", t.Value)) argstr = append(argstr, fmt.Sprintf("--cluster=%s", value))
case "unit": case "unit":
argstr = append(argstr, fmt.Sprintf("--units=%s", t.Value)) argstr = append(argstr, fmt.Sprintf("--units=%s", value))
case "group": case "group":
argstr = append(argstr, fmt.Sprintf("--group=%s", t.Value)) argstr = append(argstr, fmt.Sprintf("--group=%s", value))
default: default:
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", t.Key, t.Value)) tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value))
} }
} }
if len(tagsstr) > 0 { if len(tagsstr) > 0 {

View File

@ -5,10 +5,11 @@ import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt" "fmt"
"log"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
"log"
) )
type InfluxSink struct { type InfluxSink struct {
@ -61,12 +62,12 @@ func (s *InfluxSink) Init(config sinkConfig) error {
func (s *InfluxSink) Write(point lp.CCMetric) error { func (s *InfluxSink) Write(point lp.CCMetric) error {
tags := map[string]string{} tags := map[string]string{}
fields := map[string]interface{}{} fields := map[string]interface{}{}
for _, t := range point.TagList() { for key, value := range point.TagMap() {
tags[t.Key] = t.Value tags[key] = value
} }
if s.meta_as_tags { if s.meta_as_tags {
for _, m := range point.MetaList() { for key, value := range point.MetaMap() {
tags[m.Key] = m.Value tags[key] = value
} }
} }
for _, f := range point.FieldList() { for _, f := range point.FieldList() {