Use sink-specific configurations to have more flexibility. Adjust sample sink configuration files

This commit is contained in:
Thomas Roehl 2022-02-03 18:47:17 +01:00
parent 92d4a9c2b9
commit 2fd7f42ba1
9 changed files with 234 additions and 117 deletions

View File

@ -1,6 +1,6 @@
[ {
{ "testoutput" : {
"type" : "stdout", "type" : "stdout",
"meta_as_tags" : true "meta_as_tags" : true
} }
] }

View File

@ -1,6 +1,6 @@
[ {
{ "mystdout" : {
"type" : "stdout", "type" : "stdout",
"meta_as_tags" : true "meta_as_tags" : true
} }
] }

View File

@ -1,6 +1,8 @@
package sinks package sinks
import ( import (
"encoding/json"
"errors"
"fmt" "fmt"
"log" "log"
"strings" "strings"
@ -8,21 +10,50 @@ import (
// "time" // "time"
"os/exec" "os/exec"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
const GMETRIC_EXEC = `gmetric` const GMETRIC_EXEC = `gmetric`
type GangliaSink struct { type GangliaSinkConfig struct {
Sink defaultSinkConfig
gmetric_path string GmetricPath string `json:"gmetric_path"`
AddGangliaGroup bool `json:"add_ganglia_group"`
} }
func (s *GangliaSink) Init(config sinkConfig) error { type GangliaSink struct {
sink
gmetric_path string
config GangliaSinkConfig
}
func (s *GangliaSink) Init(config json.RawMessage) error {
var err error = nil
s.name = "GangliaSink"
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
return err
}
}
s.gmetric_path = ""
if len(s.config.GmetricPath) > 0 {
p, err := exec.LookPath(s.config.GmetricPath)
if err == nil {
s.gmetric_path = p
}
}
if len(s.gmetric_path) == 0 {
p, err := exec.LookPath(string(GMETRIC_EXEC)) p, err := exec.LookPath(string(GMETRIC_EXEC))
if err == nil { if err == nil {
s.gmetric_path = p s.gmetric_path = p
} }
}
if len(s.gmetric_path) == 0 {
err = errors.New("cannot find executable 'gmetric'")
}
return err return err
} }
@ -37,11 +68,29 @@ func (s *GangliaSink) Write(point lp.CCMetric) error {
case "unit": case "unit":
argstr = append(argstr, fmt.Sprintf("--units=%s", value)) argstr = append(argstr, fmt.Sprintf("--units=%s", value))
case "group": case "group":
if s.config.AddGangliaGroup {
argstr = append(argstr, fmt.Sprintf("--group=%s", value)) argstr = append(argstr, fmt.Sprintf("--group=%s", value))
}
default: default:
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value)) tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value))
} }
} }
if s.config.MetaAsTags {
for key, value := range point.Meta() {
switch key {
case "cluster":
argstr = append(argstr, fmt.Sprintf("--cluster=%s", value))
case "unit":
argstr = append(argstr, fmt.Sprintf("--units=%s", value))
case "group":
if s.config.AddGangliaGroup {
argstr = append(argstr, fmt.Sprintf("--group=%s", value))
}
default:
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value))
}
}
}
if len(tagsstr) > 0 { if len(tagsstr) > 0 {
argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ","))) argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ",")))
} }

View File

@ -2,6 +2,7 @@ package sinks
import ( import (
"bytes" "bytes"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
@ -11,24 +12,38 @@ import (
influx "github.com/influxdata/line-protocol" influx "github.com/influxdata/line-protocol"
) )
type HttpSinkConfig struct {
defaultSinkConfig
Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"`
JWT string `json:"jwt,omitempty"`
}
type HttpSink struct { type HttpSink struct {
sink sink
client *http.Client client *http.Client
url, jwt string url, jwt string
encoder *influx.Encoder encoder *influx.Encoder
buffer *bytes.Buffer buffer *bytes.Buffer
config HttpSinkConfig
} }
func (s *HttpSink) Init(config sinkConfig) error { func (s *HttpSink) Init(config json.RawMessage) error {
s.name = "HttpSink" s.name = "HttpSink"
if len(config.Host) == 0 || len(config.Port) == 0 || len(config.Database) == 0 { if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
if len(s.config.Host) == 0 || len(s.config.Port) == 0 || len(s.config.Database) == 0 {
return errors.New("`host`, `port` and `database` config options required for TCP sink") return errors.New("`host`, `port` and `database` config options required for TCP sink")
} }
s.client = &http.Client{} s.client = &http.Client{}
s.url = fmt.Sprintf("http://%s:%s/%s", config.Host, config.Port, config.Database) s.url = fmt.Sprintf("http://%s:%s/%s", s.config.Host, s.config.Port, s.config.Database)
s.port = config.Port s.jwt = s.config.JWT
s.jwt = config.Password
s.buffer = &bytes.Buffer{} s.buffer = &bytes.Buffer{}
s.encoder = influx.NewEncoder(s.buffer) s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second) s.encoder.SetPrecision(time.Second)

View File

@ -3,6 +3,7 @@ package sinks
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"log" "log"
@ -12,50 +13,60 @@ import (
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
) )
type InfluxSinkConfig struct {
defaultSinkConfig
Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"`
RetentionPol string `json:"retention_policy,omitempty"`
}
type InfluxSink struct { type InfluxSink struct {
sink sink
client influxdb2.Client client influxdb2.Client
writeApi influxdb2Api.WriteAPIBlocking writeApi influxdb2Api.WriteAPIBlocking
retPolicy string config InfluxSinkConfig
} }
func (s *InfluxSink) connect() error { func (s *InfluxSink) connect() error {
var auth string var auth string
var uri string var uri string
if s.ssl { if s.config.SSL {
uri = fmt.Sprintf("https://%s:%s", s.host, s.port) uri = fmt.Sprintf("https://%s:%s", s.config.Host, s.config.Port)
} else { } else {
uri = fmt.Sprintf("http://%s:%s", s.host, s.port) uri = fmt.Sprintf("http://%s:%s", s.config.Host, s.config.Port)
} }
if len(s.user) == 0 { if len(s.config.User) == 0 {
auth = s.password auth = s.config.Password
} else { } else {
auth = fmt.Sprintf("%s:%s", s.user, s.password) auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password)
} }
log.Print("Using URI ", uri, " Org ", s.organization, " Bucket ", s.database) log.Print("Using URI ", uri, " Org ", s.config.Organization, " Bucket ", s.config.Database)
s.client = influxdb2.NewClientWithOptions(uri, auth, s.client = influxdb2.NewClientWithOptions(uri, auth,
influxdb2.DefaultOptions().SetTLSConfig(&tls.Config{InsecureSkipVerify: true})) influxdb2.DefaultOptions().SetTLSConfig(&tls.Config{InsecureSkipVerify: true}))
s.writeApi = s.client.WriteAPIBlocking(s.organization, s.database) s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database)
return nil return nil
} }
func (s *InfluxSink) Init(config sinkConfig) error { func (s *InfluxSink) Init(config json.RawMessage) error {
s.name = "InfluxSink" s.name = "InfluxSink"
if len(config.Host) == 0 || if len(config) > 0 {
len(config.Port) == 0 || err := json.Unmarshal(config, &s.config)
len(config.Database) == 0 || if err != nil {
len(config.Organization) == 0 || return err
len(config.Password) == 0 { }
return errors.New("Not all configuration variables set required by InfluxSink") }
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return errors.New("not all configuration variables set required by InfluxSink")
} }
s.host = config.Host
s.port = config.Port
s.database = config.Database
s.organization = config.Organization
s.user = config.User
s.password = config.Password
s.ssl = config.SSL
s.meta_as_tags = config.MetaAsTags
return s.connect() return s.connect()
} }
@ -65,7 +76,7 @@ func (s *InfluxSink) Write(point lp.CCMetric) error {
for key, value := range point.Tags() { for key, value := range point.Tags() {
tags[key] = value tags[key] = value
} }
if s.meta_as_tags { if s.config.MetaAsTags {
for key, value := range point.Meta() { for key, value := range point.Meta() {
tags[key] = value tags[key] = value
} }

View File

@ -1,36 +1,23 @@
package sinks package sinks
import ( import (
// "time" "encoding/json"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
type sinkConfig struct { type defaultSinkConfig struct {
Type string `json:"type"`
Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"`
MetaAsTags bool `json:"meta_as_tags,omitempty"` MetaAsTags bool `json:"meta_as_tags,omitempty"`
Type string `json:"type"`
} }
type sink struct { type sink struct {
host string
port string
user string
password string
database string
organization string
ssl bool
meta_as_tags bool meta_as_tags bool
name string name string
} }
type Sink interface { type Sink interface {
Init(config sinkConfig) error Init(config json.RawMessage) error
Write(point lp.CCMetric) error Write(point lp.CCMetric) error
Flush() error Flush() error
Close() Close()

View File

@ -2,49 +2,71 @@ package sinks
import ( import (
"bytes" "bytes"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influx "github.com/influxdata/line-protocol" influx "github.com/influxdata/line-protocol"
nats "github.com/nats-io/nats.go" nats "github.com/nats-io/nats.go"
"log"
"time"
) )
type NatsSinkConfig struct {
defaultSinkConfig
Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
}
type NatsSink struct { type NatsSink struct {
sink sink
client *nats.Conn client *nats.Conn
encoder *influx.Encoder encoder *influx.Encoder
buffer *bytes.Buffer buffer *bytes.Buffer
config NatsSinkConfig
} }
func (s *NatsSink) connect() error { func (s *NatsSink) connect() error {
uinfo := nats.UserInfo(s.user, s.password) var err error
uri := fmt.Sprintf("nats://%s:%s", s.host, s.port) var uinfo nats.Option = nil
log.Print("Using URI ", uri) var nc *nats.Conn
if len(s.config.User) > 0 && len(s.config.Password) > 0 {
uinfo = nats.UserInfo(s.config.User, s.config.Password)
}
uri := fmt.Sprintf("nats://%s:%s", s.config.Host, s.config.Port)
cclog.ComponentDebug(s.name, "Connect to", uri)
s.client = nil s.client = nil
nc, err := nats.Connect(uri, uinfo) if uinfo != nil {
nc, err = nats.Connect(uri, uinfo)
} else {
nc, err = nats.Connect(uri)
}
if err != nil { if err != nil {
log.Fatal(err) cclog.ComponentError(s.name, "Connect to", uri, "failed:", err.Error())
return err return err
} }
s.client = nc s.client = nc
return nil return nil
} }
func (s *NatsSink) Init(config sinkConfig) error { func (s *NatsSink) Init(config json.RawMessage) error {
s.name = "NatsSink" s.name = "NatsSink"
if len(config.Host) == 0 || if len(config) > 0 {
len(config.Port) == 0 || err := json.Unmarshal(config, &s.config)
len(config.Database) == 0 { if err != nil {
return errors.New("Not all configuration variables set required by NatsSink") cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
return err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 {
return errors.New("not all configuration variables set required by NatsSink")
} }
s.host = config.Host
s.port = config.Port
s.database = config.Database
s.organization = config.Organization
s.user = config.User
s.password = config.Password
// Setup Influx line protocol // Setup Influx line protocol
s.buffer = &bytes.Buffer{} s.buffer = &bytes.Buffer{}
s.buffer.Grow(1025) s.buffer.Grow(1025)
@ -59,7 +81,7 @@ func (s *NatsSink) Write(point lp.CCMetric) error {
if s.client != nil { if s.client != nil {
_, err := s.encoder.Encode(point) _, err := s.encoder.Encode(point)
if err != nil { if err != nil {
log.Print(err) cclog.ComponentError(s.name, "Write:", err.Error())
return err return err
} }
} }
@ -68,7 +90,8 @@ func (s *NatsSink) Write(point lp.CCMetric) error {
func (s *NatsSink) Flush() error { func (s *NatsSink) Flush() error {
if s.client != nil { if s.client != nil {
if err := s.client.Publish(s.database, s.buffer.Bytes()); err != nil { if err := s.client.Publish(s.config.Database, s.buffer.Bytes()); err != nil {
cclog.ComponentError(s.name, "Flush:", err.Error())
return err return err
} }
s.buffer.Reset() s.buffer.Reset()
@ -77,8 +100,8 @@ func (s *NatsSink) Flush() error {
} }
func (s *NatsSink) Close() { func (s *NatsSink) Close() {
log.Print("Closing Nats connection")
if s.client != nil { if s.client != nil {
cclog.ComponentDebug(s.name, "Close")
s.client.Close() s.client.Close()
} }
} }

View File

@ -2,6 +2,7 @@ package sinks
import ( import (
"encoding/json" "encoding/json"
"fmt"
"os" "os"
"sync" "sync"
@ -21,27 +22,25 @@ var AvailableSinks = map[string]Sink{
// Metric collector manager data structure // Metric collector manager data structure
type sinkManager struct { type sinkManager struct {
input chan lp.CCMetric // input channel input chan lp.CCMetric // input channel
outputs []Sink // List of sinks to use
done chan bool // channel to finish / stop metric sink manager done chan bool // channel to finish / stop metric sink manager
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
config []sinkConfig // json encoded config for sink manager sinks map[string]Sink // Mapping sink name to sink
} }
// Sink manager access functions // Sink manager access functions
type SinkManager interface { type SinkManager interface {
Init(wg *sync.WaitGroup, sinkConfigFile string) error Init(wg *sync.WaitGroup, sinkConfigFile string) error
AddInput(input chan lp.CCMetric) AddInput(input chan lp.CCMetric)
AddOutput(config json.RawMessage) error AddOutput(name string, config json.RawMessage) error
Start() Start()
Close() Close()
} }
func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
sm.input = nil sm.input = nil
sm.outputs = make([]Sink, 0)
sm.done = make(chan bool) sm.done = make(chan bool)
sm.wg = wg sm.wg = wg
sm.config = make([]sinkConfig, 0) sm.sinks = make(map[string]Sink, 0)
// Read sink config file // Read sink config file
if len(sinkConfigFile) > 0 { if len(sinkConfigFile) > 0 {
@ -52,15 +51,16 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
} }
defer configFile.Close() defer configFile.Close()
jsonParser := json.NewDecoder(configFile) jsonParser := json.NewDecoder(configFile)
var rawConfigs []json.RawMessage var rawConfigs map[string]json.RawMessage
err = jsonParser.Decode(&rawConfigs) err = jsonParser.Decode(&rawConfigs)
if err != nil { if err != nil {
cclog.ComponentError("SinkManager", err.Error()) cclog.ComponentError("SinkManager", err.Error())
return err return err
} }
for _, raw := range rawConfigs { for name, raw := range rawConfigs {
err = sm.AddOutput(raw) err = sm.AddOutput(name, raw)
if err != nil { if err != nil {
cclog.ComponentError("SinkManager", err.Error())
continue continue
} }
} }
@ -77,7 +77,7 @@ func (sm *sinkManager) Start() {
// Sink manager is done // Sink manager is done
done := func() { done := func() {
for _, s := range sm.outputs { for _, s := range sm.sinks {
s.Flush() s.Flush()
s.Close() s.Close()
} }
@ -95,14 +95,14 @@ func (sm *sinkManager) Start() {
case p := <-sm.input: case p := <-sm.input:
// Send received metric to all outputs // Send received metric to all outputs
cclog.ComponentDebug("SinkManager", "WRITE", p) cclog.ComponentDebug("SinkManager", "WRITE", p)
for _, s := range sm.outputs { for _, s := range sm.sinks {
s.Write(p) s.Write(p)
} }
// Flush all outputs // Flush all outputs
if batchcount == 0 { if batchcount == 0 {
cclog.ComponentDebug("SinkManager", "FLUSH") cclog.ComponentDebug("SinkManager", "FLUSH")
for _, s := range sm.outputs { for _, s := range sm.sinks {
s.Flush() s.Flush()
} }
batchcount = 20 batchcount = 20
@ -121,29 +121,27 @@ func (sm *sinkManager) AddInput(input chan lp.CCMetric) {
sm.input = input sm.input = input
} }
func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error { func (sm *sinkManager) AddOutput(name string, rawConfig json.RawMessage) error {
var err error var err error
var config sinkConfig var sinkConfig defaultSinkConfig
if len(rawConfig) > 3 { if len(rawConfig) > 0 {
err = json.Unmarshal(rawConfig, &config) err := json.Unmarshal(rawConfig, &sinkConfig)
if err != nil { if err != nil {
cclog.ComponentError("SinkManager", "SKIP", config.Type, "JSON config error:", err.Error())
return err return err
} }
} }
if _, found := AvailableSinks[config.Type]; !found { if _, found := AvailableSinks[sinkConfig.Type]; !found {
cclog.ComponentError("SinkManager", "SKIP", config.Type, "unknown sink:", err.Error()) cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", err.Error())
return err return err
} }
s := AvailableSinks[config.Type] s := AvailableSinks[sinkConfig.Type]
err = s.Init(config) err = s.Init(rawConfig)
if err != nil { if err != nil {
cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error()) cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error())
return err return err
} }
sm.outputs = append(sm.outputs, s) sm.sinks[name] = s
sm.config = append(sm.config, config) cclog.ComponentDebug("SinkManager", "ADD SINK", s.Name(), "with name", fmt.Sprintf("'%s'", name))
cclog.ComponentDebug("SinkManager", "ADD SINK", s.Name())
return nil return nil
} }

View File

@ -1,21 +1,50 @@
package sinks package sinks
import ( import (
"encoding/json"
"fmt" "fmt"
"math" "math"
"os"
"strings" "strings"
// "time" // "time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
type StdoutSink struct { type StdoutSinkConfig struct {
sink defaultSinkConfig
Output string `json:"output_file,omitempty"`
} }
func (s *StdoutSink) Init(config sinkConfig) error { type StdoutSink struct {
sink
output *os.File
config StdoutSinkConfig
}
func (s *StdoutSink) Init(config json.RawMessage) error {
s.name = "StdoutSink" s.name = "StdoutSink"
s.meta_as_tags = config.MetaAsTags if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
s.output = os.Stdout
if len(s.config.Output) > 0 {
if strings.ToLower(s.config.Output) == "stdout" {
s.output = os.Stdout
} else if strings.ToLower(s.config.Output) == "stderr" {
s.output = os.Stderr
} else {
f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600))
if err != nil {
return err
}
s.output = f
}
}
s.meta_as_tags = s.config.MetaAsTags
return nil return nil
} }
@ -63,7 +92,12 @@ func (s *StdoutSink) Write(point lp.CCMetric) error {
} }
func (s *StdoutSink) Flush() error { func (s *StdoutSink) Flush() error {
s.output.Sync()
return nil return nil
} }
func (s *StdoutSink) Close() {} func (s *StdoutSink) Close() {
if s.output != os.Stdout && s.output != os.Stderr {
s.output.Close()
}
}