Generate influxDB point for data type ccMetric

This commit is contained in:
Holger Obermaier 2022-02-08 09:31:08 +01:00
parent af051b5e7e
commit e1a7379c2e
3 changed files with 57 additions and 60 deletions

View File

@ -26,28 +26,32 @@ type ccMetric struct {
// ccmetric access functions // ccmetric access functions
type CCMetric interface { type CCMetric interface {
lp.Metric // Time(), Name(), TagList(), FieldList() lp.Metric // Time(), Name(), TagList(), FieldList()
ToLineProtocol(metaAsTags bool) string // Generate influxDB line protocol for data type ccMetric
ToPoint(metaAsTags bool) *write.Point // Generate influxDB point for data type ccMetric
SetName(name string) SetName(name string) // Set metric name
SetTime(t time.Time) SetTime(t time.Time) // Set timestamp
Meta() map[string]string // Map of meta data tags
MetaList() []*lp.Tag // Ordered list of meta data
AddMeta(key, value string) // Add a meta data tag
HasMeta(key string) bool // Check a meta data tag
GetMeta(key string) (string, bool) // Get a meta data tab addressed by its key
RemoveMeta(key string) // Remove a meta data tag by its key
Tags() map[string]string // Map of tags Tags() map[string]string // Map of tags
TagList() []*lp.Tag // Ordered list of tags TagList() []*lp.Tag // Ordered list of tags
AddTag(key, value string) // Add a tag AddTag(key, value string) // Add a tag
GetTag(key string) (string, bool) // Get a tag by its key GetTag(key string) (value string, ok bool) // Get a tag by its key
HasTag(key string) bool // Check a tag HasTag(key string) (ok bool) // Check a tag
RemoveTag(key string) // Remove a tag by its key RemoveTag(key string) // Remove a tag by its key
GetField(key string) (interface{}, bool) // Get a field addressed by its key Meta() map[string]string // Map of meta data tags
HasField(key string) bool // Check if a field key is present MetaList() []*lp.Tag // Ordered list of meta data
RemoveField(key string) // Remove a field addressed by its key AddMeta(key, value string) // Add a meta data tag
GetMeta(key string) (value string, ok bool) // Get a meta data tab addressed by its key
HasMeta(key string) (ok bool) // Check a meta data tag
RemoveMeta(key string) // Remove a meta data tag by its key
Fields() map[string]interface{} // Map of fields Fields() map[string]interface{} // Map of fields
FieldList() []*lp.Field // Ordered list of fields
AddField(key string, value interface{}) // Add a field
GetField(key string) (value interface{}, ok bool) // Get a field addressed by its key
HasField(key string) (ok bool) // Check if a field key is present
RemoveField(key string) // Remove a field addressed by its key
} }
// Meta returns the meta data tags as key-value mapping // Meta returns the meta data tags as key-value mapping
@ -68,22 +72,34 @@ func (m *ccMetric) MetaList() []*lp.Tag {
// String implements the stringer interface for data type ccMetric // String implements the stringer interface for data type ccMetric
func (m *ccMetric) String() string { func (m *ccMetric) String() string {
return fmt.Sprintf("%s %v %v %v %d", m.name, m.tags, m.meta, m.fields, m.tm.UnixNano()) return fmt.Sprintf("Name: %s, Tags: %+v, Meta: %+v, fields: %+v, Timestamp: %d", m.name, m.tags, m.meta, m.fields, m.tm.UnixNano())
}
// ToLineProtocol generates influxDB line protocol for data type ccMetric
func (m *ccMetric) ToPoint(metaAsTags bool) (p *write.Point) {
if !metaAsTags {
p = influxdb2.NewPoint(m.name, m.tags, m.fields, m.tm)
} else {
tags := make(map[string]string, len(m.tags)+len(m.meta))
for key, value := range m.tags {
tags[key] = value
}
for key, value := range m.meta {
tags[key] = value
}
p = influxdb2.NewPoint(m.name, tags, m.fields, m.tm)
}
return
} }
// ToLineProtocol generates influxDB line protocol for data type ccMetric // ToLineProtocol generates influxDB line protocol for data type ccMetric
func (m *ccMetric) ToLineProtocol(metaAsTags bool) string { func (m *ccMetric) ToLineProtocol(metaAsTags bool) string {
tags := make(map[string]string)
for key, value := range m.tags { return write.PointToLineProtocol(
tags[key] = value m.ToPoint(metaAsTags),
} time.Nanosecond,
if metaAsTags { )
for key, value := range m.meta {
tags[key] = value
}
}
p := influxdb2.NewPoint(m.name, tags, m.fields, m.tm)
return write.PointToLineProtocol(p, time.Nanosecond)
} }
// Name returns the measurement name // Name returns the measurement name

View File

@ -2,6 +2,7 @@ package sinks
import ( import (
// "context" // "context"
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
"errors" "errors"
@ -11,7 +12,6 @@ import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
) )
type InfluxAsyncSinkConfig struct { type InfluxAsyncSinkConfig struct {
@ -88,22 +88,9 @@ func (s *InfluxAsyncSink) Init(config json.RawMessage) error {
return err return err
} }
func (s *InfluxAsyncSink) Write(point lp.CCMetric) error { func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
var p *write.Point s.writeApi.WritePoint(
if s.config.MetaAsTags { m.ToPoint(s.config.MetaAsTags))
tags := map[string]string{}
for k, v := range point.Tags() {
tags[k] = v
}
for k, v := range point.Meta() {
tags[k] = v
}
p = influxdb2.NewPoint(point.Name(), tags, point.Fields(), point.Time())
} else {
p = influxdb2.NewPoint(point.Name(), point.Tags(), point.Fields(), point.Time())
}
s.writeApi.WritePoint(p)
return nil return nil
} }

View File

@ -76,18 +76,12 @@ func (s *InfluxSink) Init(config json.RawMessage) error {
return s.connect() return s.connect()
} }
func (s *InfluxSink) Write(point lp.CCMetric) error { func (s *InfluxSink) Write(m lp.CCMetric) error {
tags := make(map[string]string) err :=
for key, value := range point.Tags() { s.writeApi.WritePoint(
tags[key] = value context.Background(),
} m.ToPoint(s.config.MetaAsTags),
if s.config.MetaAsTags { )
for key, value := range point.Meta() {
tags[key] = value
}
}
p := influxdb2.NewPoint(point.Name(), tags, point.Fields(), point.Time())
err := s.writeApi.WritePoint(context.Background(), p)
return err return err
} }