diff --git a/internal/ccMetric/ccMetric.go b/internal/ccMetric/ccMetric.go index ee2cce5..ad2b2b1 100644 --- a/internal/ccMetric/ccMetric.go +++ b/internal/ccMetric/ccMetric.go @@ -25,29 +25,33 @@ type ccMetric struct { // ccmetric access functions type CCMetric interface { - lp.Metric // Time(), Name(), TagList(), FieldList() + lp.Metric // Time(), Name(), TagList(), FieldList() + ToLineProtocol(metaAsTags bool) string // Generate influxDB line protocol for data type ccMetric + ToPoint(metaAsTags bool) *write.Point // Generate influxDB point for data type ccMetric - SetName(name string) - SetTime(t time.Time) + SetName(name string) // Set metric name + SetTime(t time.Time) // Set timestamp - Meta() map[string]string // Map of meta data tags - MetaList() []*lp.Tag // Ordered list of meta data - AddMeta(key, value string) // Add a meta data tag - HasMeta(key string) bool // Check a meta data tag - GetMeta(key string) (string, bool) // Get a meta data tab addressed by its key - RemoveMeta(key string) // Remove a meta data tag by its key + Tags() map[string]string // Map of tags + TagList() []*lp.Tag // Ordered list of tags + AddTag(key, value string) // Add a tag + GetTag(key string) (value string, ok bool) // Get a tag by its key + HasTag(key string) (ok bool) // Check a tag + RemoveTag(key string) // Remove a tag by its key - Tags() map[string]string // Map of tags - TagList() []*lp.Tag // Ordered list of tags - AddTag(key, value string) // Add a tag - GetTag(key string) (string, bool) // Get a tag by its key - HasTag(key string) bool // Check a tag - RemoveTag(key string) // Remove a tag by its key + Meta() map[string]string // Map of meta data tags + MetaList() []*lp.Tag // Ordered list of meta data + AddMeta(key, value string) // Add a meta data tag + GetMeta(key string) (value string, ok bool) // Get a meta data tab addressed by its key + HasMeta(key string) (ok bool) // Check a meta data tag + RemoveMeta(key string) // Remove a meta data tag by its key - GetField(key string) (interface{}, bool) // Get a field addressed by its key - HasField(key string) bool // Check if a field key is present - RemoveField(key string) // Remove a field addressed by its key - Fields() map[string]interface{} // Map of fields + Fields() map[string]interface{} // Map of fields + FieldList() []*lp.Field // Ordered list of fields + AddField(key string, value interface{}) // Add a field + GetField(key string) (value interface{}, ok bool) // Get a field addressed by its key + HasField(key string) (ok bool) // Check if a field key is present + RemoveField(key string) // Remove a field addressed by its key } // Meta returns the meta data tags as key-value mapping @@ -68,22 +72,34 @@ func (m *ccMetric) MetaList() []*lp.Tag { // String implements the stringer interface for data type ccMetric func (m *ccMetric) String() string { - return fmt.Sprintf("%s %v %v %v %d", m.name, m.tags, m.meta, m.fields, m.tm.UnixNano()) + return fmt.Sprintf("Name: %s, Tags: %+v, Meta: %+v, fields: %+v, Timestamp: %d", m.name, m.tags, m.meta, m.fields, m.tm.UnixNano()) +} + +// ToLineProtocol generates influxDB line protocol for data type ccMetric +func (m *ccMetric) ToPoint(metaAsTags bool) (p *write.Point) { + + if !metaAsTags { + p = influxdb2.NewPoint(m.name, m.tags, m.fields, m.tm) + } else { + tags := make(map[string]string, len(m.tags)+len(m.meta)) + for key, value := range m.tags { + tags[key] = value + } + for key, value := range m.meta { + tags[key] = value + } + p = influxdb2.NewPoint(m.name, tags, m.fields, m.tm) + } + return } // ToLineProtocol generates influxDB line protocol for data type ccMetric func (m *ccMetric) ToLineProtocol(metaAsTags bool) string { - tags := make(map[string]string) - for key, value := range m.tags { - tags[key] = value - } - if metaAsTags { - for key, value := range m.meta { - tags[key] = value - } - } - p := influxdb2.NewPoint(m.name, tags, m.fields, m.tm) - return write.PointToLineProtocol(p, time.Nanosecond) + + return write.PointToLineProtocol( + m.ToPoint(metaAsTags), + time.Nanosecond, + ) } // Name returns the measurement name diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index 0763a4b..3315456 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -2,6 +2,7 @@ package sinks import ( // "context" + "crypto/tls" "encoding/json" "errors" @@ -11,7 +12,6 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" - "github.com/influxdata/influxdb-client-go/v2/api/write" ) type InfluxAsyncSinkConfig struct { @@ -88,22 +88,9 @@ func (s *InfluxAsyncSink) Init(config json.RawMessage) error { return err } -func (s *InfluxAsyncSink) Write(point lp.CCMetric) error { - var p *write.Point - if s.config.MetaAsTags { - tags := map[string]string{} - for k, v := range point.Tags() { - tags[k] = v - } - for k, v := range point.Meta() { - tags[k] = v - } - p = influxdb2.NewPoint(point.Name(), tags, point.Fields(), point.Time()) - } else { - p = influxdb2.NewPoint(point.Name(), point.Tags(), point.Fields(), point.Time()) - } - - s.writeApi.WritePoint(p) +func (s *InfluxAsyncSink) Write(m lp.CCMetric) error { + s.writeApi.WritePoint( + m.ToPoint(s.config.MetaAsTags)) return nil } diff --git a/sinks/influxSink.go b/sinks/influxSink.go index fcfc32f..156f6eb 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -76,18 +76,12 @@ func (s *InfluxSink) Init(config json.RawMessage) error { return s.connect() } -func (s *InfluxSink) Write(point lp.CCMetric) error { - tags := make(map[string]string) - for key, value := range point.Tags() { - tags[key] = value - } - if s.config.MetaAsTags { - for key, value := range point.Meta() { - tags[key] = value - } - } - p := influxdb2.NewPoint(point.Name(), tags, point.Fields(), point.Time()) - err := s.writeApi.WritePoint(context.Background(), p) +func (s *InfluxSink) Write(m lp.CCMetric) error { + err := + s.writeApi.WritePoint( + context.Background(), + m.ToPoint(s.config.MetaAsTags), + ) return err }