From 2fd7f42ba1d74df699fb8f34ae47dbae1e9e083d Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Thu, 3 Feb 2022 18:47:17 +0100 Subject: [PATCH] Use sink-specific configurations to have more flexibility. Adjust sample sink configuration files --- .github/ci-sinks.json | 6 ++-- sinks.json | 6 ++-- sinks/gangliaSink.go | 65 +++++++++++++++++++++++++++++++++++++------ sinks/httpSink.go | 25 +++++++++++++---- sinks/influxSink.go | 65 +++++++++++++++++++++++++------------------ sinks/metricSink.go | 25 ++++------------- sinks/natsSink.go | 65 +++++++++++++++++++++++++++++-------------- sinks/sinkManager.go | 50 ++++++++++++++++----------------- sinks/stdoutSink.go | 44 +++++++++++++++++++++++++---- 9 files changed, 234 insertions(+), 117 deletions(-) diff --git a/.github/ci-sinks.json b/.github/ci-sinks.json index d304018..aa8ae80 100644 --- a/.github/ci-sinks.json +++ b/.github/ci-sinks.json @@ -1,6 +1,6 @@ -[ - { +{ + "testoutput" : { "type" : "stdout", "meta_as_tags" : true } -] +} diff --git a/sinks.json b/sinks.json index d304018..2fdae5a 100644 --- a/sinks.json +++ b/sinks.json @@ -1,6 +1,6 @@ -[ - { +{ + "mystdout" : { "type" : "stdout", "meta_as_tags" : true } -] +} diff --git a/sinks/gangliaSink.go b/sinks/gangliaSink.go index 3fd48e7..989e537 100644 --- a/sinks/gangliaSink.go +++ b/sinks/gangliaSink.go @@ -1,6 +1,8 @@ package sinks import ( + "encoding/json" + "errors" "fmt" "log" "strings" @@ -8,20 +10,49 @@ import ( // "time" "os/exec" + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) const GMETRIC_EXEC = `gmetric` -type GangliaSink struct { - Sink - gmetric_path string +type GangliaSinkConfig struct { + defaultSinkConfig + GmetricPath string `json:"gmetric_path"` + AddGangliaGroup bool `json:"add_ganglia_group"` } -func (s *GangliaSink) Init(config sinkConfig) error { - p, err := exec.LookPath(string(GMETRIC_EXEC)) - if err == nil { - s.gmetric_path = p +type GangliaSink struct { + sink + gmetric_path string + config GangliaSinkConfig +} + +func (s *GangliaSink) Init(config json.RawMessage) error { + var err error = nil + s.name = "GangliaSink" + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error()) + return err + } + } + s.gmetric_path = "" + if len(s.config.GmetricPath) > 0 { + p, err := exec.LookPath(s.config.GmetricPath) + if err == nil { + s.gmetric_path = p + } + } + if len(s.gmetric_path) == 0 { + p, err := exec.LookPath(string(GMETRIC_EXEC)) + if err == nil { + s.gmetric_path = p + } + } + if len(s.gmetric_path) == 0 { + err = errors.New("cannot find executable 'gmetric'") } return err } @@ -37,11 +68,29 @@ func (s *GangliaSink) Write(point lp.CCMetric) error { case "unit": argstr = append(argstr, fmt.Sprintf("--units=%s", value)) case "group": - argstr = append(argstr, fmt.Sprintf("--group=%s", value)) + if s.config.AddGangliaGroup { + argstr = append(argstr, fmt.Sprintf("--group=%s", value)) + } default: tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value)) } } + if s.config.MetaAsTags { + for key, value := range point.Meta() { + switch key { + case "cluster": + argstr = append(argstr, fmt.Sprintf("--cluster=%s", value)) + case "unit": + argstr = append(argstr, fmt.Sprintf("--units=%s", value)) + case "group": + if s.config.AddGangliaGroup { + argstr = append(argstr, fmt.Sprintf("--group=%s", value)) + } + default: + tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value)) + } + } + } if len(tagsstr) > 0 { argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ","))) } diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 25b0082..a21f8f2 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -2,6 +2,7 @@ package sinks import ( "bytes" + "encoding/json" "errors" "fmt" "net/http" @@ -11,24 +12,38 @@ import ( influx "github.com/influxdata/line-protocol" ) +type HttpSinkConfig struct { + defaultSinkConfig + Host string `json:"host,omitempty"` + Port string `json:"port,omitempty"` + Database string `json:"database,omitempty"` + JWT string `json:"jwt,omitempty"` +} + type HttpSink struct { sink client *http.Client url, jwt string encoder *influx.Encoder buffer *bytes.Buffer + config HttpSinkConfig } -func (s *HttpSink) Init(config sinkConfig) error { +func (s *HttpSink) Init(config json.RawMessage) error { s.name = "HttpSink" - if len(config.Host) == 0 || len(config.Port) == 0 || len(config.Database) == 0 { + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return err + } + } + if len(s.config.Host) == 0 || len(s.config.Port) == 0 || len(s.config.Database) == 0 { return errors.New("`host`, `port` and `database` config options required for TCP sink") } s.client = &http.Client{} - s.url = fmt.Sprintf("http://%s:%s/%s", config.Host, config.Port, config.Database) - s.port = config.Port - s.jwt = config.Password + s.url = fmt.Sprintf("http://%s:%s/%s", s.config.Host, s.config.Port, s.config.Database) + s.jwt = s.config.JWT s.buffer = &bytes.Buffer{} s.encoder = influx.NewEncoder(s.buffer) s.encoder.SetPrecision(time.Second) diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 7313490..bb35349 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -3,6 +3,7 @@ package sinks import ( "context" "crypto/tls" + "encoding/json" "errors" "fmt" "log" @@ -12,50 +13,60 @@ import ( influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" ) +type InfluxSinkConfig struct { + defaultSinkConfig + Host string `json:"host,omitempty"` + Port string `json:"port,omitempty"` + Database string `json:"database,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + Organization string `json:"organization,omitempty"` + SSL bool `json:"ssl,omitempty"` + RetentionPol string `json:"retention_policy,omitempty"` +} + type InfluxSink struct { sink - client influxdb2.Client - writeApi influxdb2Api.WriteAPIBlocking - retPolicy string + client influxdb2.Client + writeApi influxdb2Api.WriteAPIBlocking + config InfluxSinkConfig } func (s *InfluxSink) connect() error { var auth string var uri string - if s.ssl { - uri = fmt.Sprintf("https://%s:%s", s.host, s.port) + if s.config.SSL { + uri = fmt.Sprintf("https://%s:%s", s.config.Host, s.config.Port) } else { - uri = fmt.Sprintf("http://%s:%s", s.host, s.port) + uri = fmt.Sprintf("http://%s:%s", s.config.Host, s.config.Port) } - if len(s.user) == 0 { - auth = s.password + if len(s.config.User) == 0 { + auth = s.config.Password } else { - auth = fmt.Sprintf("%s:%s", s.user, s.password) + auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password) } - log.Print("Using URI ", uri, " Org ", s.organization, " Bucket ", s.database) + log.Print("Using URI ", uri, " Org ", s.config.Organization, " Bucket ", s.config.Database) s.client = influxdb2.NewClientWithOptions(uri, auth, influxdb2.DefaultOptions().SetTLSConfig(&tls.Config{InsecureSkipVerify: true})) - s.writeApi = s.client.WriteAPIBlocking(s.organization, s.database) + s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database) return nil } -func (s *InfluxSink) Init(config sinkConfig) error { +func (s *InfluxSink) Init(config json.RawMessage) error { s.name = "InfluxSink" - if len(config.Host) == 0 || - len(config.Port) == 0 || - len(config.Database) == 0 || - len(config.Organization) == 0 || - len(config.Password) == 0 { - return errors.New("Not all configuration variables set required by InfluxSink") + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return err + } + } + if len(s.config.Host) == 0 || + len(s.config.Port) == 0 || + len(s.config.Database) == 0 || + len(s.config.Organization) == 0 || + len(s.config.Password) == 0 { + return errors.New("not all configuration variables set required by InfluxSink") } - s.host = config.Host - s.port = config.Port - s.database = config.Database - s.organization = config.Organization - s.user = config.User - s.password = config.Password - s.ssl = config.SSL - s.meta_as_tags = config.MetaAsTags return s.connect() } @@ -65,7 +76,7 @@ func (s *InfluxSink) Write(point lp.CCMetric) error { for key, value := range point.Tags() { tags[key] = value } - if s.meta_as_tags { + if s.config.MetaAsTags { for key, value := range point.Meta() { tags[key] = value } diff --git a/sinks/metricSink.go b/sinks/metricSink.go index 25f66bb..d76f5f2 100644 --- a/sinks/metricSink.go +++ b/sinks/metricSink.go @@ -1,36 +1,23 @@ package sinks import ( - // "time" + "encoding/json" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) -type sinkConfig struct { - Type string `json:"type"` - Host string `json:"host,omitempty"` - Port string `json:"port,omitempty"` - Database string `json:"database,omitempty"` - User string `json:"user,omitempty"` - Password string `json:"password,omitempty"` - Organization string `json:"organization,omitempty"` - SSL bool `json:"ssl,omitempty"` - MetaAsTags bool `json:"meta_as_tags,omitempty"` +type defaultSinkConfig struct { + MetaAsTags bool `json:"meta_as_tags,omitempty"` + Type string `json:"type"` } type sink struct { - host string - port string - user string - password string - database string - organization string - ssl bool meta_as_tags bool name string } type Sink interface { - Init(config sinkConfig) error + Init(config json.RawMessage) error Write(point lp.CCMetric) error Flush() error Close() diff --git a/sinks/natsSink.go b/sinks/natsSink.go index f9cd7eb..37e8c2b 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -2,49 +2,71 @@ package sinks import ( "bytes" + "encoding/json" "errors" "fmt" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" influx "github.com/influxdata/line-protocol" nats "github.com/nats-io/nats.go" - "log" - "time" ) +type NatsSinkConfig struct { + defaultSinkConfig + Host string `json:"host,omitempty"` + Port string `json:"port,omitempty"` + Database string `json:"database,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` +} + type NatsSink struct { sink client *nats.Conn encoder *influx.Encoder buffer *bytes.Buffer + config NatsSinkConfig } func (s *NatsSink) connect() error { - uinfo := nats.UserInfo(s.user, s.password) - uri := fmt.Sprintf("nats://%s:%s", s.host, s.port) - log.Print("Using URI ", uri) + var err error + var uinfo nats.Option = nil + var nc *nats.Conn + if len(s.config.User) > 0 && len(s.config.Password) > 0 { + uinfo = nats.UserInfo(s.config.User, s.config.Password) + } + uri := fmt.Sprintf("nats://%s:%s", s.config.Host, s.config.Port) + cclog.ComponentDebug(s.name, "Connect to", uri) s.client = nil - nc, err := nats.Connect(uri, uinfo) + if uinfo != nil { + nc, err = nats.Connect(uri, uinfo) + } else { + nc, err = nats.Connect(uri) + } if err != nil { - log.Fatal(err) + cclog.ComponentError(s.name, "Connect to", uri, "failed:", err.Error()) return err } s.client = nc return nil } -func (s *NatsSink) Init(config sinkConfig) error { +func (s *NatsSink) Init(config json.RawMessage) error { s.name = "NatsSink" - if len(config.Host) == 0 || - len(config.Port) == 0 || - len(config.Database) == 0 { - return errors.New("Not all configuration variables set required by NatsSink") + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error()) + return err + } + } + if len(s.config.Host) == 0 || + len(s.config.Port) == 0 || + len(s.config.Database) == 0 { + return errors.New("not all configuration variables set required by NatsSink") } - s.host = config.Host - s.port = config.Port - s.database = config.Database - s.organization = config.Organization - s.user = config.User - s.password = config.Password // Setup Influx line protocol s.buffer = &bytes.Buffer{} s.buffer.Grow(1025) @@ -59,7 +81,7 @@ func (s *NatsSink) Write(point lp.CCMetric) error { if s.client != nil { _, err := s.encoder.Encode(point) if err != nil { - log.Print(err) + cclog.ComponentError(s.name, "Write:", err.Error()) return err } } @@ -68,7 +90,8 @@ func (s *NatsSink) Write(point lp.CCMetric) error { func (s *NatsSink) Flush() error { if s.client != nil { - if err := s.client.Publish(s.database, s.buffer.Bytes()); err != nil { + if err := s.client.Publish(s.config.Database, s.buffer.Bytes()); err != nil { + cclog.ComponentError(s.name, "Flush:", err.Error()) return err } s.buffer.Reset() @@ -77,8 +100,8 @@ func (s *NatsSink) Flush() error { } func (s *NatsSink) Close() { - log.Print("Closing Nats connection") if s.client != nil { + cclog.ComponentDebug(s.name, "Close") s.client.Close() } } diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index 02421d3..21c392f 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -2,6 +2,7 @@ package sinks import ( "encoding/json" + "fmt" "os" "sync" @@ -20,28 +21,26 @@ var AvailableSinks = map[string]Sink{ // Metric collector manager data structure type sinkManager struct { - input chan lp.CCMetric // input channel - outputs []Sink // List of sinks to use - done chan bool // channel to finish / stop metric sink manager - wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector - config []sinkConfig // json encoded config for sink manager + input chan lp.CCMetric // input channel + done chan bool // channel to finish / stop metric sink manager + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + sinks map[string]Sink // Mapping sink name to sink } // Sink manager access functions type SinkManager interface { Init(wg *sync.WaitGroup, sinkConfigFile string) error AddInput(input chan lp.CCMetric) - AddOutput(config json.RawMessage) error + AddOutput(name string, config json.RawMessage) error Start() Close() } func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { sm.input = nil - sm.outputs = make([]Sink, 0) sm.done = make(chan bool) sm.wg = wg - sm.config = make([]sinkConfig, 0) + sm.sinks = make(map[string]Sink, 0) // Read sink config file if len(sinkConfigFile) > 0 { @@ -52,15 +51,16 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { } defer configFile.Close() jsonParser := json.NewDecoder(configFile) - var rawConfigs []json.RawMessage + var rawConfigs map[string]json.RawMessage err = jsonParser.Decode(&rawConfigs) if err != nil { cclog.ComponentError("SinkManager", err.Error()) return err } - for _, raw := range rawConfigs { - err = sm.AddOutput(raw) + for name, raw := range rawConfigs { + err = sm.AddOutput(name, raw) if err != nil { + cclog.ComponentError("SinkManager", err.Error()) continue } } @@ -77,7 +77,7 @@ func (sm *sinkManager) Start() { // Sink manager is done done := func() { - for _, s := range sm.outputs { + for _, s := range sm.sinks { s.Flush() s.Close() } @@ -95,14 +95,14 @@ func (sm *sinkManager) Start() { case p := <-sm.input: // Send received metric to all outputs cclog.ComponentDebug("SinkManager", "WRITE", p) - for _, s := range sm.outputs { + for _, s := range sm.sinks { s.Write(p) } // Flush all outputs if batchcount == 0 { cclog.ComponentDebug("SinkManager", "FLUSH") - for _, s := range sm.outputs { + for _, s := range sm.sinks { s.Flush() } batchcount = 20 @@ -121,29 +121,27 @@ func (sm *sinkManager) AddInput(input chan lp.CCMetric) { sm.input = input } -func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error { +func (sm *sinkManager) AddOutput(name string, rawConfig json.RawMessage) error { var err error - var config sinkConfig - if len(rawConfig) > 3 { - err = json.Unmarshal(rawConfig, &config) + var sinkConfig defaultSinkConfig + if len(rawConfig) > 0 { + err := json.Unmarshal(rawConfig, &sinkConfig) if err != nil { - cclog.ComponentError("SinkManager", "SKIP", config.Type, "JSON config error:", err.Error()) return err } } - if _, found := AvailableSinks[config.Type]; !found { - cclog.ComponentError("SinkManager", "SKIP", config.Type, "unknown sink:", err.Error()) + if _, found := AvailableSinks[sinkConfig.Type]; !found { + cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", err.Error()) return err } - s := AvailableSinks[config.Type] - err = s.Init(config) + s := AvailableSinks[sinkConfig.Type] + err = s.Init(rawConfig) if err != nil { cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error()) return err } - sm.outputs = append(sm.outputs, s) - sm.config = append(sm.config, config) - cclog.ComponentDebug("SinkManager", "ADD SINK", s.Name()) + sm.sinks[name] = s + cclog.ComponentDebug("SinkManager", "ADD SINK", s.Name(), "with name", fmt.Sprintf("'%s'", name)) return nil } diff --git a/sinks/stdoutSink.go b/sinks/stdoutSink.go index 215239f..2c9e710 100644 --- a/sinks/stdoutSink.go +++ b/sinks/stdoutSink.go @@ -1,21 +1,50 @@ package sinks import ( + "encoding/json" "fmt" "math" + "os" "strings" // "time" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) -type StdoutSink struct { - sink +type StdoutSinkConfig struct { + defaultSinkConfig + Output string `json:"output_file,omitempty"` } -func (s *StdoutSink) Init(config sinkConfig) error { +type StdoutSink struct { + sink + output *os.File + config StdoutSinkConfig +} + +func (s *StdoutSink) Init(config json.RawMessage) error { s.name = "StdoutSink" - s.meta_as_tags = config.MetaAsTags + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return err + } + } + s.output = os.Stdout + if len(s.config.Output) > 0 { + if strings.ToLower(s.config.Output) == "stdout" { + s.output = os.Stdout + } else if strings.ToLower(s.config.Output) == "stderr" { + s.output = os.Stderr + } else { + f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600)) + if err != nil { + return err + } + s.output = f + } + } + s.meta_as_tags = s.config.MetaAsTags return nil } @@ -63,7 +92,12 @@ func (s *StdoutSink) Write(point lp.CCMetric) error { } func (s *StdoutSink) Flush() error { + s.output.Sync() return nil } -func (s *StdoutSink) Close() {} +func (s *StdoutSink) Close() { + if s.output != os.Stdout && s.output != os.Stderr { + s.output.Close() + } +}