cc-metric-collector/sinks/influxSink.go
Thomas Gruber 3f76947f54
Merge latest developments into main (#67)
* Update configuration.md

Add an additional receiver to have better alignment of components

* Change default GpfsCollector command to `mmpmon` (#53)

* Set default cmd to 'mmpmon'

* Reuse looked up path

* Cast const to string

* Just download LIKWID to get the headers (#54)

* Just download LIKWID to get the headers

* Remove perl-Data-Dumper from BuildRequires, only required by LIKWID build

* Add HttpReceiver as counterpart to the HttpSink (#49)

* Use GBytes as unit for large memory numbers

* Make maxForward configurable, save old name in meta in rename metrics and make the hostname tag key configurable

* Single release action (#55)

Building all RPMs and releasing in a single workflow

* Makefile target to build binary-only Debian packages (#61)

* Add 'install' and 'DEB' make targets to build binary-only Debian packages

* Add control file for DEB builds

* Use a single line for bash loop in make clean

* Add config options for retry intervals of InfluxDB clients (#59)

* Refactoring of LikwidCollector and metric units (#62)

* Reduce complexity of LikwidCollector and allow metric units

* Add unit to LikwidCollector docu and fix some typos

* Make library path configurable

* Use old metric name in Ganglia if rename has happened in the router (#60)

* Use old metric name if rename has happened in the router

* Also check for Ganglia renames for the oldname

* Derived metrics (#57)

* Add time-based derivatived (e.g. bandwidth) to some collectors

* Add documentation

* Add comments

* Fix: Only compute rates with a valid previous state

* Only compute rates with a valid previous state

* Define const values for net/dev fields

* Set default config values

* Add comments

* Refactor: Consolidate data structures

* Refactor: Consolidate data structures

* Refactor: Avoid struct deep copy

* Refactor: Avoid redundant tag maps

* Refactor: Use int64 type for absolut values

Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>

* Simplified iota usage

* Move unit tag to meta data tags

* Derived metrics (#65)

* Add time-based derivatived (e.g. bandwidth) to some collectors

* Add documentation

* Add comments

* Fix: Only compute rates with a valid previous state

* Only compute rates with a valid previous state

* Define const values for net/dev fields

* Set default config values

* Add comments

* Refactor: Consolidate data structures

* Refactor: Consolidate data structures

* Refactor: Avoid struct deep copy

* Refactor: Avoid redundant tag maps

* Refactor: Use int64 type for absolut values

* Update LustreCollector

Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>

* Meta to tags list and map for sinks (#63)

* Change ccMetric->Influx functions

* Use a meta_as_tags string list in config but create a lookup map afterwards

* Add meta as tag logic to sampleSink

* Fix staticcheck warnings (#66)

Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>
2022-03-15 16:41:11 +01:00

145 lines
4.3 KiB
Go

package sinks
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
)
type InfluxSinkConfig struct {
defaultSinkConfig
Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"`
RetentionPol string `json:"retention_policy,omitempty"`
InfluxRetryInterval string `json:"retry_interval"`
InfluxExponentialBase uint `json:"retry_exponential_base"`
InfluxMaxRetries uint `json:"max_retries"`
InfluxMaxRetryTime string `json:"max_retry_time"`
//InfluxMaxRetryDelay string `json:"max_retry_delay"` // It is mentioned in the docs but there is no way to set it
}
type InfluxSink struct {
sink
client influxdb2.Client
writeApi influxdb2Api.WriteAPIBlocking
config InfluxSinkConfig
influxRetryInterval uint
influxMaxRetryTime uint
//influxMaxRetryDelay uint
}
func (s *InfluxSink) connect() error {
var auth string
var uri string
if s.config.SSL {
uri = fmt.Sprintf("https://%s:%s", s.config.Host, s.config.Port)
} else {
uri = fmt.Sprintf("http://%s:%s", s.config.Host, s.config.Port)
}
if len(s.config.User) == 0 {
auth = s.config.Password
} else {
auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password)
}
cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database)
clientOptions := influxdb2.DefaultOptions()
clientOptions.SetTLSConfig(
&tls.Config{
InsecureSkipVerify: true,
},
)
clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database)
ok, err := s.client.Ping(context.Background())
if err != nil {
return err
}
if !ok {
return fmt.Errorf("connection to %s not healthy", uri)
}
return nil
}
func (s *InfluxSink) Write(m lp.CCMetric) error {
err :=
s.writeApi.WritePoint(
context.Background(),
m.ToPoint(s.meta_as_tags),
)
return err
}
func (s *InfluxSink) Flush() error {
return nil
}
func (s *InfluxSink) Close() {
cclog.ComponentDebug(s.name, "Closing InfluxDB connection")
s.client.Close()
}
func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
s := new(InfluxSink)
s.name = fmt.Sprintf("InfluxSink(%s)", name)
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
}
}
s.influxRetryInterval = uint(time.Duration(1) * time.Second)
s.config.InfluxRetryInterval = "1s"
s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour)
s.config.InfluxMaxRetryTime = "168h"
s.config.InfluxMaxRetries = 20
s.config.InfluxExponentialBase = 2
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return nil, errors.New("not all configuration variables set required by InfluxSink")
}
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
toUint := func(duration string, def uint) uint {
t, err := time.ParseDuration(duration)
if err == nil {
return uint(t.Milliseconds())
}
return def
}
s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
// Connect to InfluxDB server
if err := s.connect(); err != nil {
return nil, fmt.Errorf("unable to connect: %v", err)
}
return s, nil
}