Sink specific configuration maps (#25)

* Use sink-specific configurations to have more flexibility. Adjust sample sink configuration files

* Add documentation

* Add links to individual sink readmes

* Fix link in README

* HTTPS for HttpSink

* If no CPU die id available, use the socket id instead
This commit is contained in:
Thomas Gruber 2022-02-04 18:12:24 +01:00 committed by GitHub
parent f719f1915c
commit fdb58b0be2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 431 additions and 185 deletions

View File

@ -1,6 +1,6 @@
[ {
{ "testoutput" : {
"type" : "stdout", "type" : "stdout",
"meta_as_tags" : true "meta_as_tags" : true
} }
] }

View File

@ -168,7 +168,7 @@ func CpuData() []CpuEntry {
buffer, err := ioutil.ReadFile(path) buffer, err := ioutil.ReadFile(path)
if err != nil { if err != nil {
log.Print(err) log.Print(err)
cclogger.ComponentError("ccTopology", "Reading", path, ":", err.Error()) //cclogger.ComponentError("ccTopology", "Reading", path, ":", err.Error())
return -1 return -1
} }
sbuffer := strings.Replace(string(buffer), "\n", "", -1) sbuffer := strings.Replace(string(buffer), "\n", "", -1)
@ -254,6 +254,9 @@ func CpuData() []CpuEntry {
// Lookup CPU die id // Lookup CPU die id
centry.Die = getDie(base) centry.Die = getDie(base)
if centry.Die < 0 {
centry.Die = centry.Socket
}
// Lookup SMT thread id // Lookup SMT thread id
centry.SMT = getSMT(centry.Cpuid, base) centry.SMT = getSMT(centry.Cpuid, base)

View File

@ -1,6 +1,6 @@
[ {
{ "mystdout" : {
"type" : "stdout", "type" : "stdout",
"meta_as_tags" : true "meta_as_tags" : true
} }
] }

View File

@ -2,17 +2,24 @@
This folder contains the SinkManager and sink implementations for the cc-metric-collector. This folder contains the SinkManager and sink implementations for the cc-metric-collector.
# Available sinks:
- [`stdout`](./stdoutSink.md): Print all metrics to `stdout`, `stderr` or a file
- [`http`](./httpSink.md): Send metrics to an HTTP server as POST requests
- [`influxdb`](./influxSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database
- [`nats`](./natsSink.md): Publish metrics to the [NATS](https://nats.io/) network overlay system
- [`ganglia`](./gangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/)
# Configuration # Configuration
The configuration file for the sinks is a list of configurations. The `type` field in each specifies which sink to initialize. The configuration file for the sinks is a list of configurations. The `type` field in each specifies which sink to initialize.
```json ```json
[ [
{ "mystdout" : {
"type" : "stdout", "type" : "stdout",
"meta_as_tags" : false "meta_as_tags" : false
}, },
{ "metricstore" : {
"type" : "http", "type" : "http",
"host" : "localhost", "host" : "localhost",
"port" : "4123", "port" : "4123",
@ -22,74 +29,12 @@ The configuration file for the sinks is a list of configurations. The `type` fie
] ]
``` ```
This example initializes two sinks, the `stdout` sink printing all metrics to the STDOUT and the `http` sink with the given `host`, `port`, `database` and `password`.
If `meta_as_tags` is set, all meta information attached to CCMetric are printed out as tags.
## Type `stdout`
```json
{
"type" : "stdout",
"meta_as_tags" : <true|false>
}
```
The `stdout` sink dumps all metrics to the STDOUT.
## Type `http`
```json
{
"type" : "http",
"host" : "<hostname>",
"port" : "<portnumber>",
"database" : "<database name>",
"password" : "<jwt token>",
"meta_as_tags" : <true|false>
}
```
The sink uses POST requests to send metrics to `http://<host>:<port>/<database>` using the JWT token as a JWT in the 'Authorization' header.
## Type `nats`
```json
{
"type" : "nats",
"host" : "<hostname>",
"port" : "<portnumber>",
"user" : "<username>",
"password" : "<password>",
"database" : "<database name>"
"meta_as_tags" : <true|false>
}
```
This sink publishes the CCMetric in a NATS environment using `host`, `port`, `user` and `password` for connecting. The metrics are published using the topic `database`.
## Type `influxdb`
```json
{
"type" : "influxdb",
"host" : "<hostname>",
"port" : "<portnumber>",
"user" : "<username>",
"password" : "<password or API key>",
"database" : "<database name>"
"organization": "<InfluxDB v2 organization>",
"ssl" : <true|false>,
"meta_as_tags" : <true|false>
}
```
This sink submits the CCMetrics to an InfluxDB time-series database. It uses `host`, `port` and `ssl` for connecting. For authentification, it uses either `user:password` if `user` is set and only `password` as API key. The `organization` and `database` are used for writing to the correct database.
# Contributing own sinks # Contributing own sinks
A sink contains three functions and is derived from the type `Sink`: A sink contains four functions and is derived from the type `sink`:
* `Init(config SinkConfig) error` * `Init(config json.RawMessage) error`
* `Write(point CCMetric) error` * `Write(point CCMetric) error`
* `Flush() error` * `Flush() error`
* `Close()` * `Close()`
@ -97,3 +42,52 @@ A sink contains three functions and is derived from the type `Sink`:
The data structures should be set up in `Init()` like opening a file or server connection. The `Write()` function writes/sends the data. For non-blocking sinks, the `Flush()` method tells the sink to drain its internal buffers. The `Close()` function should tear down anything created in `Init()`. The data structures should be set up in `Init()` like opening a file or server connection. The `Write()` function writes/sends the data. For non-blocking sinks, the `Flush()` method tells the sink to drain its internal buffers. The `Close()` function should tear down anything created in `Init()`.
Finally, the sink needs to be registered in the `sinkManager.go`. There is a list of sinks called `AvailableSinks` which is a map (`sink_type_string` -> `pointer to sink interface`). Add a new entry with a descriptive name and the new sink. Finally, the sink needs to be registered in the `sinkManager.go`. There is a list of sinks called `AvailableSinks` which is a map (`sink_type_string` -> `pointer to sink interface`). Add a new entry with a descriptive name and the new sink.
## Sample sink
```go
package sinks
import (
"encoding/json"
"log"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
type SampleSinkConfig struct {
defaultSinkConfig // defines JSON tags for 'name' and 'meta_as_tags'
}
type SampleSink struct {
sink // declarate 'name' and 'meta_as_tags'
config StdoutSinkConfig // entry point to the SampleSinkConfig
}
// Initialize the sink by giving it a name and reading in the config JSON
func (s *SampleSink) Init(config json.RawMessage) error {
s.name = "SampleSink" // Always specify a name here
// Read in the config JSON
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
return nil
}
// Code to submit a single CCMetric to the sink
func (s *SampleSink) Write(point lp.CCMetric) error {
log.Print(point)
return nil
}
// If the sink uses batched sends internally, you can tell to flush its buffers
func (s *SampleSink) Flush() error {
return nil
}
// Close sink: close network connection, close files, close libraries, ...
func (s *SampleSink) Close() {}
```

View File

@ -1,6 +1,8 @@
package sinks package sinks
import ( import (
"encoding/json"
"errors"
"fmt" "fmt"
"log" "log"
"strings" "strings"
@ -8,21 +10,50 @@ import (
// "time" // "time"
"os/exec" "os/exec"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
const GMETRIC_EXEC = `gmetric` const GMETRIC_EXEC = `gmetric`
type GangliaSink struct { type GangliaSinkConfig struct {
Sink defaultSinkConfig
gmetric_path string GmetricPath string `json:"gmetric_path"`
AddGangliaGroup bool `json:"add_ganglia_group"`
} }
func (s *GangliaSink) Init(config sinkConfig) error { type GangliaSink struct {
sink
gmetric_path string
config GangliaSinkConfig
}
func (s *GangliaSink) Init(config json.RawMessage) error {
var err error = nil
s.name = "GangliaSink"
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
return err
}
}
s.gmetric_path = ""
if len(s.config.GmetricPath) > 0 {
p, err := exec.LookPath(s.config.GmetricPath)
if err == nil {
s.gmetric_path = p
}
}
if len(s.gmetric_path) == 0 {
p, err := exec.LookPath(string(GMETRIC_EXEC)) p, err := exec.LookPath(string(GMETRIC_EXEC))
if err == nil { if err == nil {
s.gmetric_path = p s.gmetric_path = p
} }
}
if len(s.gmetric_path) == 0 {
err = errors.New("cannot find executable 'gmetric'")
}
return err return err
} }
@ -37,11 +68,29 @@ func (s *GangliaSink) Write(point lp.CCMetric) error {
case "unit": case "unit":
argstr = append(argstr, fmt.Sprintf("--units=%s", value)) argstr = append(argstr, fmt.Sprintf("--units=%s", value))
case "group": case "group":
if s.config.AddGangliaGroup {
argstr = append(argstr, fmt.Sprintf("--group=%s", value)) argstr = append(argstr, fmt.Sprintf("--group=%s", value))
}
default: default:
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value)) tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value))
} }
} }
if s.config.MetaAsTags {
for key, value := range point.Meta() {
switch key {
case "cluster":
argstr = append(argstr, fmt.Sprintf("--cluster=%s", value))
case "unit":
argstr = append(argstr, fmt.Sprintf("--units=%s", value))
case "group":
if s.config.AddGangliaGroup {
argstr = append(argstr, fmt.Sprintf("--group=%s", value))
}
default:
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value))
}
}
}
if len(tagsstr) > 0 { if len(tagsstr) > 0 {
argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ","))) argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ",")))
} }

21
sinks/gangliaSink.md Normal file
View File

@ -0,0 +1,21 @@
## `ganglia` sink
The `ganglia` sink uses the `gmetric` tool of the [Ganglia Monitoring System](http://ganglia.info/) to submit the metrics
### Configuration structure
```json
{
"<name>": {
"type": "ganglia",
"meta_as_tags" : true,
"gmetric_path" : "/path/to/gmetric",
"add_ganglia_group" : true
}
}
```
- `type`: makes the sink an `ganglia` sink
- `meta_as_tags`: print all meta information as tags in the output (optional)
- `gmetric_path`: Path to `gmetric` executable (optional). If not given, the sink searches in `$PATH` for `gmetric`.
- `add_ganglia_group`: Add `--group=X` based on meta information to the `gmetric` call. Some old versions of `gmetric` do not support the `--group` option.

View File

@ -2,6 +2,7 @@ package sinks
import ( import (
"bytes" "bytes"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
@ -11,28 +12,44 @@ import (
influx "github.com/influxdata/line-protocol" influx "github.com/influxdata/line-protocol"
) )
type HttpSinkConfig struct {
defaultSinkConfig
Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"`
JWT string `json:"jwt,omitempty"`
SSL bool `json:"ssl,omitempty"`
}
type HttpSink struct { type HttpSink struct {
sink sink
client *http.Client client *http.Client
url, jwt string url, jwt string
encoder *influx.Encoder encoder *influx.Encoder
buffer *bytes.Buffer buffer *bytes.Buffer
config HttpSinkConfig
} }
func (s *HttpSink) Init(config sinkConfig) error { func (s *HttpSink) Init(config json.RawMessage) error {
s.name = "HttpSink" s.name = "HttpSink"
if len(config.Host) == 0 || len(config.Port) == 0 || len(config.Database) == 0 { s.config.SSL = false
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
if len(s.config.Host) == 0 || len(s.config.Port) == 0 || len(s.config.Database) == 0 {
return errors.New("`host`, `port` and `database` config options required for TCP sink") return errors.New("`host`, `port` and `database` config options required for TCP sink")
} }
s.client = &http.Client{} s.client = &http.Client{}
proto := "http" proto := "http"
if config.SSL { if s.config.SSL {
proto = "https" proto = "https"
} }
s.url = fmt.Sprintf("%s://%s:%s/%s", proto, config.Host, config.Port, config.Database) s.url = fmt.Sprintf("%s://%s:%s/%s", proto, s.config.Host, s.config.Port, s.config.Database)
s.port = config.Port s.jwt = s.config.JWT
s.jwt = config.Password
s.buffer = &bytes.Buffer{} s.buffer = &bytes.Buffer{}
s.encoder = influx.NewEncoder(s.buffer) s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second) s.encoder.SetPrecision(time.Second)

27
sinks/httpSink.md Normal file
View File

@ -0,0 +1,27 @@
## `http` sink
The `http` sink uses POST requests to a HTTP server to submit the metrics in the InfluxDB line-protocol format. It uses JSON web tokens for authentification. The sink creates batches of metrics before sending, to reduce the HTTP traffic.
### Configuration structure
```json
{
"<name>": {
"type": "http",
"meta_as_tags" : true,
"database" : "mymetrics",
"host": "dbhost.example.com",
"port": "4222",
"jwt" : "0x0000q231",
"ssl" : false
}
}
```
- `type`: makes the sink an `http` sink
- `meta_as_tags`: print all meta information as tags in the output (optional)
- `database`: All metrics are written to this bucket
- `host`: Hostname of the InfluxDB database server
- `port`: Portnumber (as string) of the InfluxDB database server
- `jwt`: JSON web tokens for authentification
- `ssl`: Activate SSL encryption

View File

@ -3,6 +3,7 @@ package sinks
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"log" "log"
@ -12,50 +13,60 @@ import (
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
) )
type InfluxSinkConfig struct {
defaultSinkConfig
Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"`
RetentionPol string `json:"retention_policy,omitempty"`
}
type InfluxSink struct { type InfluxSink struct {
sink sink
client influxdb2.Client client influxdb2.Client
writeApi influxdb2Api.WriteAPIBlocking writeApi influxdb2Api.WriteAPIBlocking
retPolicy string config InfluxSinkConfig
} }
func (s *InfluxSink) connect() error { func (s *InfluxSink) connect() error {
var auth string var auth string
var uri string var uri string
if s.ssl { if s.config.SSL {
uri = fmt.Sprintf("https://%s:%s", s.host, s.port) uri = fmt.Sprintf("https://%s:%s", s.config.Host, s.config.Port)
} else { } else {
uri = fmt.Sprintf("http://%s:%s", s.host, s.port) uri = fmt.Sprintf("http://%s:%s", s.config.Host, s.config.Port)
} }
if len(s.user) == 0 { if len(s.config.User) == 0 {
auth = s.password auth = s.config.Password
} else { } else {
auth = fmt.Sprintf("%s:%s", s.user, s.password) auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password)
} }
log.Print("Using URI ", uri, " Org ", s.organization, " Bucket ", s.database) log.Print("Using URI ", uri, " Org ", s.config.Organization, " Bucket ", s.config.Database)
s.client = influxdb2.NewClientWithOptions(uri, auth, s.client = influxdb2.NewClientWithOptions(uri, auth,
influxdb2.DefaultOptions().SetTLSConfig(&tls.Config{InsecureSkipVerify: true})) influxdb2.DefaultOptions().SetTLSConfig(&tls.Config{InsecureSkipVerify: true}))
s.writeApi = s.client.WriteAPIBlocking(s.organization, s.database) s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database)
return nil return nil
} }
func (s *InfluxSink) Init(config sinkConfig) error { func (s *InfluxSink) Init(config json.RawMessage) error {
s.name = "InfluxSink" s.name = "InfluxSink"
if len(config.Host) == 0 || if len(config) > 0 {
len(config.Port) == 0 || err := json.Unmarshal(config, &s.config)
len(config.Database) == 0 || if err != nil {
len(config.Organization) == 0 || return err
len(config.Password) == 0 { }
return errors.New("Not all configuration variables set required by InfluxSink") }
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return errors.New("not all configuration variables set required by InfluxSink")
} }
s.host = config.Host
s.port = config.Port
s.database = config.Database
s.organization = config.Organization
s.user = config.User
s.password = config.Password
s.ssl = config.SSL
s.meta_as_tags = config.MetaAsTags
return s.connect() return s.connect()
} }
@ -65,7 +76,7 @@ func (s *InfluxSink) Write(point lp.CCMetric) error {
for key, value := range point.Tags() { for key, value := range point.Tags() {
tags[key] = value tags[key] = value
} }
if s.meta_as_tags { if s.config.MetaAsTags {
for key, value := range point.Meta() { for key, value := range point.Meta() {
tags[key] = value tags[key] = value
} }

32
sinks/influxSink.md Normal file
View File

@ -0,0 +1,32 @@
## `influxdb` sink
The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2) to write the metrics to an InfluxDB database. It provides only support for V2 write endpoints (InfluxDB 1.8.0 or later).
### Configuration structure
```json
{
"<name>": {
"type": "influxdb",
"meta_as_tags" : true,
"database" : "mymetrics",
"host": "dbhost.example.com",
"port": "4222",
"user": "exampleuser",
"password" : "examplepw",
"organization": "myorg",
"ssl": true,
}
}
```
- `type`: makes the sink an `influxdb` sink
- `meta_as_tags`: print all meta information as tags in the output (optional)
- `database`: All metrics are written to this bucket
- `host`: Hostname of the InfluxDB database server
- `port`: Portnumber (as string) of the InfluxDB database server
- `user`: Username for basic authentification
- `password`: Password for basic authentification
- `organization`: Organization in the InfluxDB
- `ssl`: Use SSL connection

View File

@ -1,36 +1,23 @@
package sinks package sinks
import ( import (
// "time" "encoding/json"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
type sinkConfig struct { type defaultSinkConfig struct {
Type string `json:"type"`
Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"`
MetaAsTags bool `json:"meta_as_tags,omitempty"` MetaAsTags bool `json:"meta_as_tags,omitempty"`
Type string `json:"type"`
} }
type sink struct { type sink struct {
host string
port string
user string
password string
database string
organization string
ssl bool
meta_as_tags bool meta_as_tags bool
name string name string
} }
type Sink interface { type Sink interface {
Init(config sinkConfig) error Init(config json.RawMessage) error
Write(point lp.CCMetric) error Write(point lp.CCMetric) error
Flush() error Flush() error
Close() Close()

View File

@ -2,49 +2,71 @@ package sinks
import ( import (
"bytes" "bytes"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influx "github.com/influxdata/line-protocol" influx "github.com/influxdata/line-protocol"
nats "github.com/nats-io/nats.go" nats "github.com/nats-io/nats.go"
"log"
"time"
) )
type NatsSinkConfig struct {
defaultSinkConfig
Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
}
type NatsSink struct { type NatsSink struct {
sink sink
client *nats.Conn client *nats.Conn
encoder *influx.Encoder encoder *influx.Encoder
buffer *bytes.Buffer buffer *bytes.Buffer
config NatsSinkConfig
} }
func (s *NatsSink) connect() error { func (s *NatsSink) connect() error {
uinfo := nats.UserInfo(s.user, s.password) var err error
uri := fmt.Sprintf("nats://%s:%s", s.host, s.port) var uinfo nats.Option = nil
log.Print("Using URI ", uri) var nc *nats.Conn
if len(s.config.User) > 0 && len(s.config.Password) > 0 {
uinfo = nats.UserInfo(s.config.User, s.config.Password)
}
uri := fmt.Sprintf("nats://%s:%s", s.config.Host, s.config.Port)
cclog.ComponentDebug(s.name, "Connect to", uri)
s.client = nil s.client = nil
nc, err := nats.Connect(uri, uinfo) if uinfo != nil {
nc, err = nats.Connect(uri, uinfo)
} else {
nc, err = nats.Connect(uri)
}
if err != nil { if err != nil {
log.Fatal(err) cclog.ComponentError(s.name, "Connect to", uri, "failed:", err.Error())
return err return err
} }
s.client = nc s.client = nc
return nil return nil
} }
func (s *NatsSink) Init(config sinkConfig) error { func (s *NatsSink) Init(config json.RawMessage) error {
s.name = "NatsSink" s.name = "NatsSink"
if len(config.Host) == 0 || if len(config) > 0 {
len(config.Port) == 0 || err := json.Unmarshal(config, &s.config)
len(config.Database) == 0 { if err != nil {
return errors.New("Not all configuration variables set required by NatsSink") cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
return err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 {
return errors.New("not all configuration variables set required by NatsSink")
} }
s.host = config.Host
s.port = config.Port
s.database = config.Database
s.organization = config.Organization
s.user = config.User
s.password = config.Password
// Setup Influx line protocol // Setup Influx line protocol
s.buffer = &bytes.Buffer{} s.buffer = &bytes.Buffer{}
s.buffer.Grow(1025) s.buffer.Grow(1025)
@ -59,7 +81,7 @@ func (s *NatsSink) Write(point lp.CCMetric) error {
if s.client != nil { if s.client != nil {
_, err := s.encoder.Encode(point) _, err := s.encoder.Encode(point)
if err != nil { if err != nil {
log.Print(err) cclog.ComponentError(s.name, "Write:", err.Error())
return err return err
} }
} }
@ -68,7 +90,8 @@ func (s *NatsSink) Write(point lp.CCMetric) error {
func (s *NatsSink) Flush() error { func (s *NatsSink) Flush() error {
if s.client != nil { if s.client != nil {
if err := s.client.Publish(s.database, s.buffer.Bytes()); err != nil { if err := s.client.Publish(s.config.Database, s.buffer.Bytes()); err != nil {
cclog.ComponentError(s.name, "Flush:", err.Error())
return err return err
} }
s.buffer.Reset() s.buffer.Reset()
@ -77,8 +100,8 @@ func (s *NatsSink) Flush() error {
} }
func (s *NatsSink) Close() { func (s *NatsSink) Close() {
log.Print("Closing Nats connection")
if s.client != nil { if s.client != nil {
cclog.ComponentDebug(s.name, "Close")
s.client.Close() s.client.Close()
} }
} }

28
sinks/natsSink.md Normal file
View File

@ -0,0 +1,28 @@
## `nats` sink
The `nats` sink publishes all metrics into a NATS network. The publishing key is the database name provided in the configuration file
### Configuration structure
```json
{
"<name>": {
"type": "nats",
"meta_as_tags" : true,
"database" : "mymetrics",
"host": "dbhost.example.com",
"port": "4222",
"user": "exampleuser",
"password" : "examplepw"
}
}
```
- `type`: makes the sink an `nats` sink
- `meta_as_tags`: print all meta information as tags in the output (optional)
- `database`: All metrics are published with this subject
- `host`: Hostname of the NATS server
- `port`: Portnumber (as string) of the NATS server
- `user`: Username for basic authentification
- `password`: Password for basic authentification

View File

@ -2,6 +2,7 @@ package sinks
import ( import (
"encoding/json" "encoding/json"
"fmt"
"os" "os"
"sync" "sync"
@ -21,27 +22,25 @@ var AvailableSinks = map[string]Sink{
// Metric collector manager data structure // Metric collector manager data structure
type sinkManager struct { type sinkManager struct {
input chan lp.CCMetric // input channel input chan lp.CCMetric // input channel
outputs []Sink // List of sinks to use
done chan bool // channel to finish / stop metric sink manager done chan bool // channel to finish / stop metric sink manager
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
config []sinkConfig // json encoded config for sink manager sinks map[string]Sink // Mapping sink name to sink
} }
// Sink manager access functions // Sink manager access functions
type SinkManager interface { type SinkManager interface {
Init(wg *sync.WaitGroup, sinkConfigFile string) error Init(wg *sync.WaitGroup, sinkConfigFile string) error
AddInput(input chan lp.CCMetric) AddInput(input chan lp.CCMetric)
AddOutput(config json.RawMessage) error AddOutput(name string, config json.RawMessage) error
Start() Start()
Close() Close()
} }
func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
sm.input = nil sm.input = nil
sm.outputs = make([]Sink, 0)
sm.done = make(chan bool) sm.done = make(chan bool)
sm.wg = wg sm.wg = wg
sm.config = make([]sinkConfig, 0) sm.sinks = make(map[string]Sink, 0)
// Read sink config file // Read sink config file
if len(sinkConfigFile) > 0 { if len(sinkConfigFile) > 0 {
@ -52,15 +51,16 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
} }
defer configFile.Close() defer configFile.Close()
jsonParser := json.NewDecoder(configFile) jsonParser := json.NewDecoder(configFile)
var rawConfigs []json.RawMessage var rawConfigs map[string]json.RawMessage
err = jsonParser.Decode(&rawConfigs) err = jsonParser.Decode(&rawConfigs)
if err != nil { if err != nil {
cclog.ComponentError("SinkManager", err.Error()) cclog.ComponentError("SinkManager", err.Error())
return err return err
} }
for _, raw := range rawConfigs { for name, raw := range rawConfigs {
err = sm.AddOutput(raw) err = sm.AddOutput(name, raw)
if err != nil { if err != nil {
cclog.ComponentError("SinkManager", err.Error())
continue continue
} }
} }
@ -77,7 +77,7 @@ func (sm *sinkManager) Start() {
// Sink manager is done // Sink manager is done
done := func() { done := func() {
for _, s := range sm.outputs { for _, s := range sm.sinks {
s.Flush() s.Flush()
s.Close() s.Close()
} }
@ -95,14 +95,14 @@ func (sm *sinkManager) Start() {
case p := <-sm.input: case p := <-sm.input:
// Send received metric to all outputs // Send received metric to all outputs
cclog.ComponentDebug("SinkManager", "WRITE", p) cclog.ComponentDebug("SinkManager", "WRITE", p)
for _, s := range sm.outputs { for _, s := range sm.sinks {
s.Write(p) s.Write(p)
} }
// Flush all outputs // Flush all outputs
if batchcount == 0 { if batchcount == 0 {
cclog.ComponentDebug("SinkManager", "FLUSH") cclog.ComponentDebug("SinkManager", "FLUSH")
for _, s := range sm.outputs { for _, s := range sm.sinks {
s.Flush() s.Flush()
} }
batchcount = 20 batchcount = 20
@ -121,29 +121,27 @@ func (sm *sinkManager) AddInput(input chan lp.CCMetric) {
sm.input = input sm.input = input
} }
func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error { func (sm *sinkManager) AddOutput(name string, rawConfig json.RawMessage) error {
var err error var err error
var config sinkConfig var sinkConfig defaultSinkConfig
if len(rawConfig) > 3 { if len(rawConfig) > 0 {
err = json.Unmarshal(rawConfig, &config) err := json.Unmarshal(rawConfig, &sinkConfig)
if err != nil { if err != nil {
cclog.ComponentError("SinkManager", "SKIP", config.Type, "JSON config error:", err.Error())
return err return err
} }
} }
if _, found := AvailableSinks[config.Type]; !found { if _, found := AvailableSinks[sinkConfig.Type]; !found {
cclog.ComponentError("SinkManager", "SKIP", config.Type, "unknown sink:", err.Error()) cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", err.Error())
return err return err
} }
s := AvailableSinks[config.Type] s := AvailableSinks[sinkConfig.Type]
err = s.Init(config) err = s.Init(rawConfig)
if err != nil { if err != nil {
cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error()) cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error())
return err return err
} }
sm.outputs = append(sm.outputs, s) sm.sinks[name] = s
sm.config = append(sm.config, config) cclog.ComponentDebug("SinkManager", "ADD SINK", s.Name(), "with name", fmt.Sprintf("'%s'", name))
cclog.ComponentDebug("SinkManager", "ADD SINK", s.Name())
return nil return nil
} }

View File

@ -1,21 +1,50 @@
package sinks package sinks
import ( import (
"encoding/json"
"fmt" "fmt"
"math" "math"
"os"
"strings" "strings"
// "time" // "time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
type StdoutSink struct { type StdoutSinkConfig struct {
sink defaultSinkConfig
Output string `json:"output_file,omitempty"`
} }
func (s *StdoutSink) Init(config sinkConfig) error { type StdoutSink struct {
sink
output *os.File
config StdoutSinkConfig
}
func (s *StdoutSink) Init(config json.RawMessage) error {
s.name = "StdoutSink" s.name = "StdoutSink"
s.meta_as_tags = config.MetaAsTags if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
s.output = os.Stdout
if len(s.config.Output) > 0 {
if strings.ToLower(s.config.Output) == "stdout" {
s.output = os.Stdout
} else if strings.ToLower(s.config.Output) == "stderr" {
s.output = os.Stderr
} else {
f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600))
if err != nil {
return err
}
s.output = f
}
}
s.meta_as_tags = s.config.MetaAsTags
return nil return nil
} }
@ -63,7 +92,12 @@ func (s *StdoutSink) Write(point lp.CCMetric) error {
} }
func (s *StdoutSink) Flush() error { func (s *StdoutSink) Flush() error {
s.output.Sync()
return nil return nil
} }
func (s *StdoutSink) Close() {} func (s *StdoutSink) Close() {
if s.output != os.Stdout && s.output != os.Stderr {
s.output.Close()
}
}

22
sinks/stdoutSink.md Normal file
View File

@ -0,0 +1,22 @@
## `stdout` sink
The `stdout` sink is the most simple sink provided by cc-metric-collector. It writes all metrics in InfluxDB line-procol format to the configurable output file or the common special files `stdout` and `stderr`.
### Configuration structure
```json
{
"<name>": {
"type": "stdout",
"meta_as_tags" : true,
"output_file" : "mylogfile.log"
}
}
```
- `type`: makes the sink an `stdout` sink
- `meta_as_tags`: print all meta information as tags in the output (optional)
- `output_file`: Write all data to the selected file (optional). There are two 'special' files: `stdout` and `stderr`. If this option is not provided, the default value is `stdout`