diff --git a/.github/ci-sinks.json b/.github/ci-sinks.json index d304018..aa8ae80 100644 --- a/.github/ci-sinks.json +++ b/.github/ci-sinks.json @@ -1,6 +1,6 @@ -[ - { +{ + "testoutput" : { "type" : "stdout", "meta_as_tags" : true } -] +} diff --git a/internal/ccTopology/ccTopology.go b/internal/ccTopology/ccTopology.go index 030b2f7..6d8bfae 100644 --- a/internal/ccTopology/ccTopology.go +++ b/internal/ccTopology/ccTopology.go @@ -168,7 +168,7 @@ func CpuData() []CpuEntry { buffer, err := ioutil.ReadFile(path) if err != nil { log.Print(err) - cclogger.ComponentError("ccTopology", "Reading", path, ":", err.Error()) + //cclogger.ComponentError("ccTopology", "Reading", path, ":", err.Error()) return -1 } sbuffer := strings.Replace(string(buffer), "\n", "", -1) @@ -254,6 +254,9 @@ func CpuData() []CpuEntry { // Lookup CPU die id centry.Die = getDie(base) + if centry.Die < 0 { + centry.Die = centry.Socket + } // Lookup SMT thread id centry.SMT = getSMT(centry.Cpuid, base) diff --git a/sinks.json b/sinks.json index d304018..2fdae5a 100644 --- a/sinks.json +++ b/sinks.json @@ -1,6 +1,6 @@ -[ - { +{ + "mystdout" : { "type" : "stdout", "meta_as_tags" : true } -] +} diff --git a/sinks/README.md b/sinks/README.md index 8fac8e5..1690df9 100644 --- a/sinks/README.md +++ b/sinks/README.md @@ -2,17 +2,24 @@ This folder contains the SinkManager and sink implementations for the cc-metric-collector. +# Available sinks: +- [`stdout`](./stdoutSink.md): Print all metrics to `stdout`, `stderr` or a file +- [`http`](./httpSink.md): Send metrics to an HTTP server as POST requests +- [`influxdb`](./influxSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database +- [`nats`](./natsSink.md): Publish metrics to the [NATS](https://nats.io/) network overlay system +- [`ganglia`](./gangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) + # Configuration The configuration file for the sinks is a list of configurations. The `type` field in each specifies which sink to initialize. ```json [ - { + "mystdout" : { "type" : "stdout", "meta_as_tags" : false }, - { + "metricstore" : { "type" : "http", "host" : "localhost", "port" : "4123", @@ -22,74 +29,12 @@ The configuration file for the sinks is a list of configurations. The `type` fie ] ``` -This example initializes two sinks, the `stdout` sink printing all metrics to the STDOUT and the `http` sink with the given `host`, `port`, `database` and `password`. - -If `meta_as_tags` is set, all meta information attached to CCMetric are printed out as tags. - -## Type `stdout` - -```json -{ - "type" : "stdout", - "meta_as_tags" : -} -``` - -The `stdout` sink dumps all metrics to the STDOUT. - -## Type `http` - -```json -{ - "type" : "http", - "host" : "", - "port" : "", - "database" : "", - "password" : "", - "meta_as_tags" : -} -``` -The sink uses POST requests to send metrics to `http://:/` using the JWT token as a JWT in the 'Authorization' header. - -## Type `nats` - -```json -{ - "type" : "nats", - "host" : "", - "port" : "", - "user" : "", - "password" : "", - "database" : "" - "meta_as_tags" : -} -``` - -This sink publishes the CCMetric in a NATS environment using `host`, `port`, `user` and `password` for connecting. The metrics are published using the topic `database`. - -## Type `influxdb` - -```json -{ - "type" : "influxdb", - "host" : "", - "port" : "", - "user" : "", - "password" : "", - "database" : "" - "organization": "", - "ssl" : , - "meta_as_tags" : -} -``` - -This sink submits the CCMetrics to an InfluxDB time-series database. It uses `host`, `port` and `ssl` for connecting. For authentification, it uses either `user:password` if `user` is set and only `password` as API key. The `organization` and `database` are used for writing to the correct database. # Contributing own sinks -A sink contains three functions and is derived from the type `Sink`: -* `Init(config SinkConfig) error` +A sink contains four functions and is derived from the type `sink`: +* `Init(config json.RawMessage) error` * `Write(point CCMetric) error` * `Flush() error` * `Close()` @@ -97,3 +42,52 @@ A sink contains three functions and is derived from the type `Sink`: The data structures should be set up in `Init()` like opening a file or server connection. The `Write()` function writes/sends the data. For non-blocking sinks, the `Flush()` method tells the sink to drain its internal buffers. The `Close()` function should tear down anything created in `Init()`. Finally, the sink needs to be registered in the `sinkManager.go`. There is a list of sinks called `AvailableSinks` which is a map (`sink_type_string` -> `pointer to sink interface`). Add a new entry with a descriptive name and the new sink. + +## Sample sink + +```go +package sinks + +import ( + "encoding/json" + "log" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" +) + +type SampleSinkConfig struct { + defaultSinkConfig // defines JSON tags for 'name' and 'meta_as_tags' +} + +type SampleSink struct { + sink // declarate 'name' and 'meta_as_tags' + config StdoutSinkConfig // entry point to the SampleSinkConfig +} + +// Initialize the sink by giving it a name and reading in the config JSON +func (s *SampleSink) Init(config json.RawMessage) error { + s.name = "SampleSink" // Always specify a name here + // Read in the config JSON + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return err + } + } + return nil +} + +// Code to submit a single CCMetric to the sink +func (s *SampleSink) Write(point lp.CCMetric) error { + log.Print(point) + return nil +} + +// If the sink uses batched sends internally, you can tell to flush its buffers +func (s *SampleSink) Flush() error { + return nil +} + + +// Close sink: close network connection, close files, close libraries, ... +func (s *SampleSink) Close() {} +``` \ No newline at end of file diff --git a/sinks/gangliaSink.go b/sinks/gangliaSink.go index 3fd48e7..989e537 100644 --- a/sinks/gangliaSink.go +++ b/sinks/gangliaSink.go @@ -1,6 +1,8 @@ package sinks import ( + "encoding/json" + "errors" "fmt" "log" "strings" @@ -8,20 +10,49 @@ import ( // "time" "os/exec" + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) const GMETRIC_EXEC = `gmetric` -type GangliaSink struct { - Sink - gmetric_path string +type GangliaSinkConfig struct { + defaultSinkConfig + GmetricPath string `json:"gmetric_path"` + AddGangliaGroup bool `json:"add_ganglia_group"` } -func (s *GangliaSink) Init(config sinkConfig) error { - p, err := exec.LookPath(string(GMETRIC_EXEC)) - if err == nil { - s.gmetric_path = p +type GangliaSink struct { + sink + gmetric_path string + config GangliaSinkConfig +} + +func (s *GangliaSink) Init(config json.RawMessage) error { + var err error = nil + s.name = "GangliaSink" + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error()) + return err + } + } + s.gmetric_path = "" + if len(s.config.GmetricPath) > 0 { + p, err := exec.LookPath(s.config.GmetricPath) + if err == nil { + s.gmetric_path = p + } + } + if len(s.gmetric_path) == 0 { + p, err := exec.LookPath(string(GMETRIC_EXEC)) + if err == nil { + s.gmetric_path = p + } + } + if len(s.gmetric_path) == 0 { + err = errors.New("cannot find executable 'gmetric'") } return err } @@ -37,11 +68,29 @@ func (s *GangliaSink) Write(point lp.CCMetric) error { case "unit": argstr = append(argstr, fmt.Sprintf("--units=%s", value)) case "group": - argstr = append(argstr, fmt.Sprintf("--group=%s", value)) + if s.config.AddGangliaGroup { + argstr = append(argstr, fmt.Sprintf("--group=%s", value)) + } default: tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value)) } } + if s.config.MetaAsTags { + for key, value := range point.Meta() { + switch key { + case "cluster": + argstr = append(argstr, fmt.Sprintf("--cluster=%s", value)) + case "unit": + argstr = append(argstr, fmt.Sprintf("--units=%s", value)) + case "group": + if s.config.AddGangliaGroup { + argstr = append(argstr, fmt.Sprintf("--group=%s", value)) + } + default: + tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value)) + } + } + } if len(tagsstr) > 0 { argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ","))) } diff --git a/sinks/gangliaSink.md b/sinks/gangliaSink.md new file mode 100644 index 0000000..9b77ac9 --- /dev/null +++ b/sinks/gangliaSink.md @@ -0,0 +1,21 @@ +## `ganglia` sink + +The `ganglia` sink uses the `gmetric` tool of the [Ganglia Monitoring System](http://ganglia.info/) to submit the metrics + +### Configuration structure + +```json +{ + "": { + "type": "ganglia", + "meta_as_tags" : true, + "gmetric_path" : "/path/to/gmetric", + "add_ganglia_group" : true + } +} +``` + +- `type`: makes the sink an `ganglia` sink +- `meta_as_tags`: print all meta information as tags in the output (optional) +- `gmetric_path`: Path to `gmetric` executable (optional). If not given, the sink searches in `$PATH` for `gmetric`. +- `add_ganglia_group`: Add `--group=X` based on meta information to the `gmetric` call. Some old versions of `gmetric` do not support the `--group` option. \ No newline at end of file diff --git a/sinks/httpSink.go b/sinks/httpSink.go index a703f82..3080faa 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -2,6 +2,7 @@ package sinks import ( "bytes" + "encoding/json" "errors" "fmt" "net/http" @@ -11,28 +12,44 @@ import ( influx "github.com/influxdata/line-protocol" ) +type HttpSinkConfig struct { + defaultSinkConfig + Host string `json:"host,omitempty"` + Port string `json:"port,omitempty"` + Database string `json:"database,omitempty"` + JWT string `json:"jwt,omitempty"` + SSL bool `json:"ssl,omitempty"` +} + type HttpSink struct { sink client *http.Client url, jwt string encoder *influx.Encoder buffer *bytes.Buffer + config HttpSinkConfig } -func (s *HttpSink) Init(config sinkConfig) error { +func (s *HttpSink) Init(config json.RawMessage) error { s.name = "HttpSink" - if len(config.Host) == 0 || len(config.Port) == 0 || len(config.Database) == 0 { + s.config.SSL = false + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return err + } + } + if len(s.config.Host) == 0 || len(s.config.Port) == 0 || len(s.config.Database) == 0 { return errors.New("`host`, `port` and `database` config options required for TCP sink") } s.client = &http.Client{} proto := "http" - if config.SSL { + if s.config.SSL { proto = "https" } - s.url = fmt.Sprintf("%s://%s:%s/%s", proto, config.Host, config.Port, config.Database) - s.port = config.Port - s.jwt = config.Password + s.url = fmt.Sprintf("%s://%s:%s/%s", proto, s.config.Host, s.config.Port, s.config.Database) + s.jwt = s.config.JWT s.buffer = &bytes.Buffer{} s.encoder = influx.NewEncoder(s.buffer) s.encoder.SetPrecision(time.Second) diff --git a/sinks/httpSink.md b/sinks/httpSink.md new file mode 100644 index 0000000..5440a82 --- /dev/null +++ b/sinks/httpSink.md @@ -0,0 +1,27 @@ +## `http` sink + +The `http` sink uses POST requests to a HTTP server to submit the metrics in the InfluxDB line-protocol format. It uses JSON web tokens for authentification. The sink creates batches of metrics before sending, to reduce the HTTP traffic. + +### Configuration structure + +```json +{ + "": { + "type": "http", + "meta_as_tags" : true, + "database" : "mymetrics", + "host": "dbhost.example.com", + "port": "4222", + "jwt" : "0x0000q231", + "ssl" : false + } +} +``` + +- `type`: makes the sink an `http` sink +- `meta_as_tags`: print all meta information as tags in the output (optional) +- `database`: All metrics are written to this bucket +- `host`: Hostname of the InfluxDB database server +- `port`: Portnumber (as string) of the InfluxDB database server +- `jwt`: JSON web tokens for authentification +- `ssl`: Activate SSL encryption \ No newline at end of file diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 7313490..bb35349 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -3,6 +3,7 @@ package sinks import ( "context" "crypto/tls" + "encoding/json" "errors" "fmt" "log" @@ -12,50 +13,60 @@ import ( influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" ) +type InfluxSinkConfig struct { + defaultSinkConfig + Host string `json:"host,omitempty"` + Port string `json:"port,omitempty"` + Database string `json:"database,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + Organization string `json:"organization,omitempty"` + SSL bool `json:"ssl,omitempty"` + RetentionPol string `json:"retention_policy,omitempty"` +} + type InfluxSink struct { sink - client influxdb2.Client - writeApi influxdb2Api.WriteAPIBlocking - retPolicy string + client influxdb2.Client + writeApi influxdb2Api.WriteAPIBlocking + config InfluxSinkConfig } func (s *InfluxSink) connect() error { var auth string var uri string - if s.ssl { - uri = fmt.Sprintf("https://%s:%s", s.host, s.port) + if s.config.SSL { + uri = fmt.Sprintf("https://%s:%s", s.config.Host, s.config.Port) } else { - uri = fmt.Sprintf("http://%s:%s", s.host, s.port) + uri = fmt.Sprintf("http://%s:%s", s.config.Host, s.config.Port) } - if len(s.user) == 0 { - auth = s.password + if len(s.config.User) == 0 { + auth = s.config.Password } else { - auth = fmt.Sprintf("%s:%s", s.user, s.password) + auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password) } - log.Print("Using URI ", uri, " Org ", s.organization, " Bucket ", s.database) + log.Print("Using URI ", uri, " Org ", s.config.Organization, " Bucket ", s.config.Database) s.client = influxdb2.NewClientWithOptions(uri, auth, influxdb2.DefaultOptions().SetTLSConfig(&tls.Config{InsecureSkipVerify: true})) - s.writeApi = s.client.WriteAPIBlocking(s.organization, s.database) + s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database) return nil } -func (s *InfluxSink) Init(config sinkConfig) error { +func (s *InfluxSink) Init(config json.RawMessage) error { s.name = "InfluxSink" - if len(config.Host) == 0 || - len(config.Port) == 0 || - len(config.Database) == 0 || - len(config.Organization) == 0 || - len(config.Password) == 0 { - return errors.New("Not all configuration variables set required by InfluxSink") + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return err + } + } + if len(s.config.Host) == 0 || + len(s.config.Port) == 0 || + len(s.config.Database) == 0 || + len(s.config.Organization) == 0 || + len(s.config.Password) == 0 { + return errors.New("not all configuration variables set required by InfluxSink") } - s.host = config.Host - s.port = config.Port - s.database = config.Database - s.organization = config.Organization - s.user = config.User - s.password = config.Password - s.ssl = config.SSL - s.meta_as_tags = config.MetaAsTags return s.connect() } @@ -65,7 +76,7 @@ func (s *InfluxSink) Write(point lp.CCMetric) error { for key, value := range point.Tags() { tags[key] = value } - if s.meta_as_tags { + if s.config.MetaAsTags { for key, value := range point.Meta() { tags[key] = value } diff --git a/sinks/influxSink.md b/sinks/influxSink.md new file mode 100644 index 0000000..2624034 --- /dev/null +++ b/sinks/influxSink.md @@ -0,0 +1,32 @@ +## `influxdb` sink + +The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2) to write the metrics to an InfluxDB database. It provides only support for V2 write endpoints (InfluxDB 1.8.0 or later). + + +### Configuration structure + +```json +{ + "": { + "type": "influxdb", + "meta_as_tags" : true, + "database" : "mymetrics", + "host": "dbhost.example.com", + "port": "4222", + "user": "exampleuser", + "password" : "examplepw", + "organization": "myorg", + "ssl": true, + } +} +``` + +- `type`: makes the sink an `influxdb` sink +- `meta_as_tags`: print all meta information as tags in the output (optional) +- `database`: All metrics are written to this bucket +- `host`: Hostname of the InfluxDB database server +- `port`: Portnumber (as string) of the InfluxDB database server +- `user`: Username for basic authentification +- `password`: Password for basic authentification +- `organization`: Organization in the InfluxDB +- `ssl`: Use SSL connection \ No newline at end of file diff --git a/sinks/metricSink.go b/sinks/metricSink.go index 25f66bb..d76f5f2 100644 --- a/sinks/metricSink.go +++ b/sinks/metricSink.go @@ -1,36 +1,23 @@ package sinks import ( - // "time" + "encoding/json" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) -type sinkConfig struct { - Type string `json:"type"` - Host string `json:"host,omitempty"` - Port string `json:"port,omitempty"` - Database string `json:"database,omitempty"` - User string `json:"user,omitempty"` - Password string `json:"password,omitempty"` - Organization string `json:"organization,omitempty"` - SSL bool `json:"ssl,omitempty"` - MetaAsTags bool `json:"meta_as_tags,omitempty"` +type defaultSinkConfig struct { + MetaAsTags bool `json:"meta_as_tags,omitempty"` + Type string `json:"type"` } type sink struct { - host string - port string - user string - password string - database string - organization string - ssl bool meta_as_tags bool name string } type Sink interface { - Init(config sinkConfig) error + Init(config json.RawMessage) error Write(point lp.CCMetric) error Flush() error Close() diff --git a/sinks/natsSink.go b/sinks/natsSink.go index f9cd7eb..37e8c2b 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -2,49 +2,71 @@ package sinks import ( "bytes" + "encoding/json" "errors" "fmt" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" influx "github.com/influxdata/line-protocol" nats "github.com/nats-io/nats.go" - "log" - "time" ) +type NatsSinkConfig struct { + defaultSinkConfig + Host string `json:"host,omitempty"` + Port string `json:"port,omitempty"` + Database string `json:"database,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` +} + type NatsSink struct { sink client *nats.Conn encoder *influx.Encoder buffer *bytes.Buffer + config NatsSinkConfig } func (s *NatsSink) connect() error { - uinfo := nats.UserInfo(s.user, s.password) - uri := fmt.Sprintf("nats://%s:%s", s.host, s.port) - log.Print("Using URI ", uri) + var err error + var uinfo nats.Option = nil + var nc *nats.Conn + if len(s.config.User) > 0 && len(s.config.Password) > 0 { + uinfo = nats.UserInfo(s.config.User, s.config.Password) + } + uri := fmt.Sprintf("nats://%s:%s", s.config.Host, s.config.Port) + cclog.ComponentDebug(s.name, "Connect to", uri) s.client = nil - nc, err := nats.Connect(uri, uinfo) + if uinfo != nil { + nc, err = nats.Connect(uri, uinfo) + } else { + nc, err = nats.Connect(uri) + } if err != nil { - log.Fatal(err) + cclog.ComponentError(s.name, "Connect to", uri, "failed:", err.Error()) return err } s.client = nc return nil } -func (s *NatsSink) Init(config sinkConfig) error { +func (s *NatsSink) Init(config json.RawMessage) error { s.name = "NatsSink" - if len(config.Host) == 0 || - len(config.Port) == 0 || - len(config.Database) == 0 { - return errors.New("Not all configuration variables set required by NatsSink") + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error()) + return err + } + } + if len(s.config.Host) == 0 || + len(s.config.Port) == 0 || + len(s.config.Database) == 0 { + return errors.New("not all configuration variables set required by NatsSink") } - s.host = config.Host - s.port = config.Port - s.database = config.Database - s.organization = config.Organization - s.user = config.User - s.password = config.Password // Setup Influx line protocol s.buffer = &bytes.Buffer{} s.buffer.Grow(1025) @@ -59,7 +81,7 @@ func (s *NatsSink) Write(point lp.CCMetric) error { if s.client != nil { _, err := s.encoder.Encode(point) if err != nil { - log.Print(err) + cclog.ComponentError(s.name, "Write:", err.Error()) return err } } @@ -68,7 +90,8 @@ func (s *NatsSink) Write(point lp.CCMetric) error { func (s *NatsSink) Flush() error { if s.client != nil { - if err := s.client.Publish(s.database, s.buffer.Bytes()); err != nil { + if err := s.client.Publish(s.config.Database, s.buffer.Bytes()); err != nil { + cclog.ComponentError(s.name, "Flush:", err.Error()) return err } s.buffer.Reset() @@ -77,8 +100,8 @@ func (s *NatsSink) Flush() error { } func (s *NatsSink) Close() { - log.Print("Closing Nats connection") if s.client != nil { + cclog.ComponentDebug(s.name, "Close") s.client.Close() } } diff --git a/sinks/natsSink.md b/sinks/natsSink.md new file mode 100644 index 0000000..7a53f27 --- /dev/null +++ b/sinks/natsSink.md @@ -0,0 +1,28 @@ +## `nats` sink + +The `nats` sink publishes all metrics into a NATS network. The publishing key is the database name provided in the configuration file + + +### Configuration structure + +```json +{ + "": { + "type": "nats", + "meta_as_tags" : true, + "database" : "mymetrics", + "host": "dbhost.example.com", + "port": "4222", + "user": "exampleuser", + "password" : "examplepw" + } +} +``` + +- `type`: makes the sink an `nats` sink +- `meta_as_tags`: print all meta information as tags in the output (optional) +- `database`: All metrics are published with this subject +- `host`: Hostname of the NATS server +- `port`: Portnumber (as string) of the NATS server +- `user`: Username for basic authentification +- `password`: Password for basic authentification \ No newline at end of file diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index 02421d3..21c392f 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -2,6 +2,7 @@ package sinks import ( "encoding/json" + "fmt" "os" "sync" @@ -20,28 +21,26 @@ var AvailableSinks = map[string]Sink{ // Metric collector manager data structure type sinkManager struct { - input chan lp.CCMetric // input channel - outputs []Sink // List of sinks to use - done chan bool // channel to finish / stop metric sink manager - wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector - config []sinkConfig // json encoded config for sink manager + input chan lp.CCMetric // input channel + done chan bool // channel to finish / stop metric sink manager + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + sinks map[string]Sink // Mapping sink name to sink } // Sink manager access functions type SinkManager interface { Init(wg *sync.WaitGroup, sinkConfigFile string) error AddInput(input chan lp.CCMetric) - AddOutput(config json.RawMessage) error + AddOutput(name string, config json.RawMessage) error Start() Close() } func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { sm.input = nil - sm.outputs = make([]Sink, 0) sm.done = make(chan bool) sm.wg = wg - sm.config = make([]sinkConfig, 0) + sm.sinks = make(map[string]Sink, 0) // Read sink config file if len(sinkConfigFile) > 0 { @@ -52,15 +51,16 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { } defer configFile.Close() jsonParser := json.NewDecoder(configFile) - var rawConfigs []json.RawMessage + var rawConfigs map[string]json.RawMessage err = jsonParser.Decode(&rawConfigs) if err != nil { cclog.ComponentError("SinkManager", err.Error()) return err } - for _, raw := range rawConfigs { - err = sm.AddOutput(raw) + for name, raw := range rawConfigs { + err = sm.AddOutput(name, raw) if err != nil { + cclog.ComponentError("SinkManager", err.Error()) continue } } @@ -77,7 +77,7 @@ func (sm *sinkManager) Start() { // Sink manager is done done := func() { - for _, s := range sm.outputs { + for _, s := range sm.sinks { s.Flush() s.Close() } @@ -95,14 +95,14 @@ func (sm *sinkManager) Start() { case p := <-sm.input: // Send received metric to all outputs cclog.ComponentDebug("SinkManager", "WRITE", p) - for _, s := range sm.outputs { + for _, s := range sm.sinks { s.Write(p) } // Flush all outputs if batchcount == 0 { cclog.ComponentDebug("SinkManager", "FLUSH") - for _, s := range sm.outputs { + for _, s := range sm.sinks { s.Flush() } batchcount = 20 @@ -121,29 +121,27 @@ func (sm *sinkManager) AddInput(input chan lp.CCMetric) { sm.input = input } -func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error { +func (sm *sinkManager) AddOutput(name string, rawConfig json.RawMessage) error { var err error - var config sinkConfig - if len(rawConfig) > 3 { - err = json.Unmarshal(rawConfig, &config) + var sinkConfig defaultSinkConfig + if len(rawConfig) > 0 { + err := json.Unmarshal(rawConfig, &sinkConfig) if err != nil { - cclog.ComponentError("SinkManager", "SKIP", config.Type, "JSON config error:", err.Error()) return err } } - if _, found := AvailableSinks[config.Type]; !found { - cclog.ComponentError("SinkManager", "SKIP", config.Type, "unknown sink:", err.Error()) + if _, found := AvailableSinks[sinkConfig.Type]; !found { + cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", err.Error()) return err } - s := AvailableSinks[config.Type] - err = s.Init(config) + s := AvailableSinks[sinkConfig.Type] + err = s.Init(rawConfig) if err != nil { cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error()) return err } - sm.outputs = append(sm.outputs, s) - sm.config = append(sm.config, config) - cclog.ComponentDebug("SinkManager", "ADD SINK", s.Name()) + sm.sinks[name] = s + cclog.ComponentDebug("SinkManager", "ADD SINK", s.Name(), "with name", fmt.Sprintf("'%s'", name)) return nil } diff --git a/sinks/stdoutSink.go b/sinks/stdoutSink.go index 215239f..2c9e710 100644 --- a/sinks/stdoutSink.go +++ b/sinks/stdoutSink.go @@ -1,21 +1,50 @@ package sinks import ( + "encoding/json" "fmt" "math" + "os" "strings" // "time" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) -type StdoutSink struct { - sink +type StdoutSinkConfig struct { + defaultSinkConfig + Output string `json:"output_file,omitempty"` } -func (s *StdoutSink) Init(config sinkConfig) error { +type StdoutSink struct { + sink + output *os.File + config StdoutSinkConfig +} + +func (s *StdoutSink) Init(config json.RawMessage) error { s.name = "StdoutSink" - s.meta_as_tags = config.MetaAsTags + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return err + } + } + s.output = os.Stdout + if len(s.config.Output) > 0 { + if strings.ToLower(s.config.Output) == "stdout" { + s.output = os.Stdout + } else if strings.ToLower(s.config.Output) == "stderr" { + s.output = os.Stderr + } else { + f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600)) + if err != nil { + return err + } + s.output = f + } + } + s.meta_as_tags = s.config.MetaAsTags return nil } @@ -63,7 +92,12 @@ func (s *StdoutSink) Write(point lp.CCMetric) error { } func (s *StdoutSink) Flush() error { + s.output.Sync() return nil } -func (s *StdoutSink) Close() {} +func (s *StdoutSink) Close() { + if s.output != os.Stdout && s.output != os.Stderr { + s.output.Close() + } +} diff --git a/sinks/stdoutSink.md b/sinks/stdoutSink.md new file mode 100644 index 0000000..317ca3f --- /dev/null +++ b/sinks/stdoutSink.md @@ -0,0 +1,22 @@ +## `stdout` sink + +The `stdout` sink is the most simple sink provided by cc-metric-collector. It writes all metrics in InfluxDB line-procol format to the configurable output file or the common special files `stdout` and `stderr`. + + +### Configuration structure + +```json +{ + "": { + "type": "stdout", + "meta_as_tags" : true, + "output_file" : "mylogfile.log" + } +} +``` + +- `type`: makes the sink an `stdout` sink +- `meta_as_tags`: print all meta information as tags in the output (optional) +- `output_file`: Write all data to the selected file (optional). There are two 'special' files: `stdout` and `stderr`. If this option is not provided, the default value is `stdout` + +