Use channels, add a metric router, split up configuration and use extended version of Influx line protocol internally

This commit is contained in:
Thomas Roehl
2021-12-20 12:40:51 +01:00
parent 15cf16b46f
commit 44d8b0c979
34 changed files with 1695 additions and 538 deletions

View File

@@ -7,19 +7,21 @@ import (
"net/http"
"time"
lp "github.com/influxdata/line-protocol"
influx "github.com/influxdata/line-protocol"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
type HttpSink struct {
Sink
sink
client *http.Client
url, jwt string
encoder *lp.Encoder
encoder *influx.Encoder
buffer *bytes.Buffer
}
func (s *HttpSink) Init(config SinkConfig) error {
if len(config.Host) == 0 || len(config.Port) == 0 {
func (s *HttpSink) Init(config sinkConfig) error {
s.name = "HttpSink"
if len(config.Host) == 0 || len(config.Port) == 0 || len(config.Database) == 0 {
return errors.New("`host`, `port` and `database` config options required for TCP sink")
}
@@ -28,13 +30,13 @@ func (s *HttpSink) Init(config SinkConfig) error {
s.port = config.Port
s.jwt = config.Password
s.buffer = &bytes.Buffer{}
s.encoder = lp.NewEncoder(s.buffer)
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
return nil
}
func (s *HttpSink) Write(point lp.MutableMetric) error {
func (s *HttpSink) Write(point lp.CCMetric) error {
_, err := s.encoder.Encode(point)
return err
}

View File

@@ -5,15 +5,14 @@ import (
"crypto/tls"
"errors"
"fmt"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
lp "github.com/influxdata/line-protocol"
"log"
)
type InfluxSink struct {
Sink
sink
client influxdb2.Client
writeApi influxdb2Api.WriteAPIBlocking
retPolicy string
@@ -39,7 +38,8 @@ func (s *InfluxSink) connect() error {
return nil
}
func (s *InfluxSink) Init(config SinkConfig) error {
func (s *InfluxSink) Init(config sinkConfig) error {
s.name = "InfluxSink"
if len(config.Host) == 0 ||
len(config.Port) == 0 ||
len(config.Database) == 0 ||
@@ -54,15 +54,21 @@ func (s *InfluxSink) Init(config SinkConfig) error {
s.user = config.User
s.password = config.Password
s.ssl = config.SSL
s.meta_as_tags = config.MetaAsTags
return s.connect()
}
func (s *InfluxSink) Write(point lp.MutableMetric) error {
func (s *InfluxSink) Write(point lp.CCMetric) error {
tags := map[string]string{}
fields := map[string]interface{}{}
for _, t := range point.TagList() {
tags[t.Key] = t.Value
}
if s.meta_as_tags {
for _, m := range point.MetaList() {
tags[m.Key] = m.Value
}
}
for _, f := range point.FieldList() {
fields[f.Key] = f.Value
}

View File

@@ -2,21 +2,22 @@ package sinks
import (
// "time"
lp "github.com/influxdata/line-protocol"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
type SinkConfig struct {
Host string `json:"host"`
Port string `json:"port"`
Database string `json:"database"`
User string `json:"user"`
Password string `json:"password"`
Organization string `json:"organization"`
Type string `json:"type"`
SSL bool `json:"ssl"`
type sinkConfig struct {
Type string `json:"type"`
Host string `json:"host", omitempty`
Port string `json:"port", omitempty`
Database string `json:"database, omitempty"`
User string `json:"user, omitempty"`
Password string `json:"password", omitempty`
Organization string `json:"organization", omitempty`
SSL bool `json:"ssl", omitempty`
MetaAsTags bool `json:"meta_as_tags", omitempty`
}
type Sink struct {
type sink struct {
host string
port string
user string
@@ -24,11 +25,18 @@ type Sink struct {
database string
organization string
ssl bool
meta_as_tags bool
name string
}
type SinkFuncs interface {
Init(config SinkConfig) error
Write(point lp.MutableMetric) error
type Sink interface {
Init(config sinkConfig) error
Write(point lp.CCMetric) error
Flush() error
Close()
Name() string
}
func (s *sink) Name() string {
return s.name
}

View File

@@ -4,16 +4,17 @@ import (
"bytes"
"errors"
"fmt"
lp "github.com/influxdata/line-protocol"
influx "github.com/influxdata/line-protocol"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
nats "github.com/nats-io/nats.go"
"log"
"time"
)
type NatsSink struct {
Sink
sink
client *nats.Conn
encoder *lp.Encoder
encoder *influx.Encoder
buffer *bytes.Buffer
}
@@ -31,7 +32,8 @@ func (s *NatsSink) connect() error {
return nil
}
func (s *NatsSink) Init(config SinkConfig) error {
func (s *NatsSink) Init(config sinkConfig) error {
s.name = "NatsSink"
if len(config.Host) == 0 ||
len(config.Port) == 0 ||
len(config.Database) == 0 {
@@ -46,14 +48,14 @@ func (s *NatsSink) Init(config SinkConfig) error {
// Setup Influx line protocol
s.buffer = &bytes.Buffer{}
s.buffer.Grow(1025)
s.encoder = lp.NewEncoder(s.buffer)
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
s.encoder.SetMaxLineBytes(1024)
// Setup infos for connection
return s.connect()
}
func (s *NatsSink) Write(point lp.MutableMetric) error {
func (s *NatsSink) Write(point lp.CCMetric) error {
if s.client != nil {
// var tags map[string]string
// var fields map[string]interface{}

151
sinks/sinkManager.go Normal file
View File

@@ -0,0 +1,151 @@
package sinks
import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"sync"
"log"
"os"
"encoding/json"
)
type SinkEntity struct {
config json.RawMessage
output Sink
}
var AvailableSinks = map[string]Sink{
"influxdb": &InfluxSink{},
"stdout": &StdoutSink{},
"nats": &NatsSink{},
"http": &HttpSink{},
}
type sinkManager struct {
input chan lp.CCMetric
outputs []Sink
done chan bool
wg *sync.WaitGroup
config []sinkConfig
}
type SinkManager interface {
Init(wg *sync.WaitGroup, sinkConfigFile string) error
AddInput(input chan lp.CCMetric)
AddOutput(config json.RawMessage) error
Start()
Close()
}
func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
sm.input = nil
sm.outputs = make([]Sink, 0)
sm.done = make(chan bool)
sm.wg = wg
sm.config = make([]sinkConfig, 0)
if len(sinkConfigFile) > 0 {
configFile, err := os.Open(sinkConfigFile)
if err != nil {
log.Print("[SinkManager] ", err.Error())
return err
}
defer configFile.Close()
jsonParser := json.NewDecoder(configFile)
var rawConfigs []json.RawMessage
err = jsonParser.Decode(&rawConfigs)
if err != nil {
log.Print("[SinkManager] ", err.Error())
return err
}
for _, raw := range rawConfigs {
err = sm.AddOutput(raw)
if err != nil {
continue
}
}
}
return nil
}
func (sm *sinkManager) Start() {
sm.wg.Add(1)
batchcount := 20
go func() {
for {
SinkManagerLoop:
select {
case <- sm.done:
for _, s := range sm.outputs {
s.Close()
}
log.Print("[SinkManager] DONE\n")
sm.wg.Done()
break SinkManagerLoop
case p := <- sm.input:
log.Print("[SinkManager] WRITE ", p)
for _, s := range sm.outputs {
s.Write(p)
}
if (batchcount == 0) {
log.Print("[SinkManager] FLUSH")
for _, s := range sm.outputs {
s.Flush()
}
batchcount = 20
}
batchcount--
default:
}
}
log.Print("[SinkManager] EXIT\n")
}()
log.Print("[SinkManager] STARTED\n")
}
func (sm *sinkManager) AddInput(input chan lp.CCMetric) {
sm.input = input
}
func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error {
var err error
var config sinkConfig
if len(rawConfig) > 3 {
err = json.Unmarshal(rawConfig, &config)
if err != nil {
log.Print("[SinkManager] SKIP ", config.Type, " JSON config error: ", err.Error())
return err
}
}
if _, found := AvailableSinks[config.Type]; !found {
log.Print("[SinkManager] SKIP ", config.Type, " unknown sink: ", err.Error())
return err
}
s := AvailableSinks[config.Type]
err = s.Init(config)
if err != nil {
log.Print("[SinkManager] SKIP ", s.Name(), " initialization failed: ", err.Error())
return err
}
sm.outputs = append(sm.outputs, s)
sm.config = append(sm.config, config)
return nil
}
func (sm *sinkManager) Close() {
sm.done <- true
log.Print("[SinkManager] CLOSE")
return
}
func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) {
sm := &sinkManager{}
err := sm.Init(wg, sinkConfigFile)
if err != nil {
return nil, err
}
return sm, err
}

View File

@@ -6,23 +6,30 @@ import (
"strings"
// "time"
lp "github.com/influxdata/line-protocol"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
type StdoutSink struct {
Sink
sink
}
func (s *StdoutSink) Init(config SinkConfig) error {
func (s *StdoutSink) Init(config sinkConfig) error {
s.name = "StdoutSink"
s.meta_as_tags = config.MetaAsTags
return nil
}
func (s *StdoutSink) Write(point lp.MutableMetric) error {
func (s *StdoutSink) Write(point lp.CCMetric) error {
var tagsstr []string
var fieldstr []string
for _, t := range point.TagList() {
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", t.Key, t.Value))
}
if s.meta_as_tags {
for _, m := range point.MetaList() {
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", m.Key, m.Value))
}
}
for _, f := range point.FieldList() {
switch f.Value.(type) {
case float64: