Merge branch 'develop' into sqlite3_sink

This commit is contained in:
Thomas Roehl
2022-01-31 05:59:25 +01:00
76 changed files with 5517 additions and 1388 deletions

View File

@@ -1,24 +1,99 @@
This folder contains the sinks for the cc-metric-collector.
# CCMetric sinks
# `metricSink.go`
The base class/configuration is located in `metricSink.go`.
This folder contains the SinkManager and sink implementations for the cc-metric-collector.
# Configuration
The configuration file for the sinks is a list of configurations. The `type` field in each specifies which sink to initialize.
```json
[
{
"type" : "stdout",
"meta_as_tags" : false
},
{
"type" : "http",
"host" : "localhost",
"port" : "4123",
"database" : "ccmetric",
"password" : "<jwt token>"
}
]
```
This example initializes two sinks, the `stdout` sink printing all metrics to the STDOUT and the `http` sink with the given `host`, `port`, `database` and `password`.
If `meta_as_tags` is set, all meta information attached to CCMetric are printed out as tags.
## Type `stdout`
```json
{
"type" : "stdout",
"meta_as_tags" : <true|false>
}
```
The `stdout` sink dumps all metrics to the STDOUT.
## Type `http`
```json
{
"type" : "http",
"host" : "<hostname>",
"port" : "<portnumber>",
"database" : "<database name>",
"password" : "<jwt token>",
"meta_as_tags" : <true|false>
}
```
The sink uses POST requests to send metrics to `http://<host>:<port>/<database>` using the JWT token as a JWT in the 'Authorization' header.
## Type `nats`
```json
{
"type" : "nats",
"host" : "<hostname>",
"port" : "<portnumber>",
"user" : "<username>",
"password" : "<password>",
"database" : "<database name>"
"meta_as_tags" : <true|false>
}
```
This sink publishes the CCMetric in a NATS environment using `host`, `port`, `user` and `password` for connecting. The metrics are published using the topic `database`.
## Type `influxdb`
```json
{
"type" : "influxdb",
"host" : "<hostname>",
"port" : "<portnumber>",
"user" : "<username>",
"password" : "<password or API key>",
"database" : "<database name>"
"organization": "<InfluxDB v2 organization>",
"ssl" : <true|false>,
"meta_as_tags" : <true|false>
}
```
This sink submits the CCMetrics to an InfluxDB time-series database. It uses `host`, `port` and `ssl` for connecting. For authentification, it uses either `user:password` if `user` is set and only `password` as API key. The `organization` and `database` are used for writing to the correct database.
# Sinks
* `stdoutSink.go`: Writes all metrics to `stdout` in InfluxDB line protocol. The sink does not use https://github.com/influxdata/line-protocol to reduce the executed code for debugging
* `influxSink.go`: Writes all metrics to an InfluxDB database instance using a blocking writer. It uses https://github.com/influxdata/influxdb-client-go . Configuration for the server, port, ssl, password, database name and organisation are in the global configuration file. The 'password' is used for the token and the 'database' for the bucket. It uses the v2 API of Influx.
* `natsSink.go`: Sends all metrics to an NATS server using the InfluxDB line protocol as encoding. It uses https://github.com/nats-io/nats.go . Configuration for the server, port, user, password and database name are in the global configuration file. The database name is used as subject for the NATS messages.
* `sqliteSink.go`: Writes all metrics to a Sqlite3 database. It uses https://github.com/mattn/go-sqlite3
* `httpSink.go`: Sends all metrics to an HTTP endpoint `http://<host>:<port>/<database>` using a POST request. The body of the request will consist of lines in the InfluxDB line protocol. In case password is specified, that password is used as a JWT in the 'Authorization' header.
# Installation
Nothing to do, all sinks are pure Go code
# Contributing own sinks
A sink contains three functions and is derived from the type `Sink` (in `metricSink.go`):
A sink contains three functions and is derived from the type `Sink`:
* `Init(config SinkConfig) error`
* `Write(measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error`
* `Write(point CCMetric) error`
* `Flush() error`
* `Close()`
The data structures should be set up in `Init()` like opening a file or server connection. The `Write()` function takes a measurement, tags, fields and a timestamp and writes/sends the data. The `Close()` function should tear down anything created in `Init()`.
The data structures should be set up in `Init()` like opening a file or server connection. The `Write()` function writes/sends the data. For non-blocking sinks, the `Flush()` method tells the sink to drain its internal buffers. The `Close()` function should tear down anything created in `Init()`.
Finally, the sink needs to be registered in the `metric-collector.go`. There is a list of sinks called `Sinks` which is a map (string -> pointer to sink). Add a new entry with a descriptive name and the new sink.
Finally, the sink needs to be registered in the `sinkManager.go`. There is a list of sinks called `AvailableSinks` which is a map (`sink_type_string` -> `pointer to sink interface`). Add a new entry with a descriptive name and the new sink.

82
sinks/gangliaSink.go Normal file
View File

@@ -0,0 +1,82 @@
package sinks
import (
"fmt"
"log"
"strings"
// "time"
"os/exec"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
const GMETRIC_EXEC = `gmetric`
type GangliaSink struct {
Sink
gmetric_path string
}
func (s *GangliaSink) Init(config sinkConfig) error {
p, err := exec.LookPath(string(GMETRIC_EXEC))
if err == nil {
s.gmetric_path = p
}
return err
}
func (s *GangliaSink) Write(point lp.CCMetric) error {
var err error = nil
var tagsstr []string
var argstr []string
for _, t := range point.TagList() {
switch t.Key {
case "cluster":
argstr = append(argstr, fmt.Sprintf("--cluster=%s", t.Value))
case "unit":
argstr = append(argstr, fmt.Sprintf("--units=%s", t.Value))
case "group":
argstr = append(argstr, fmt.Sprintf("--group=%s", t.Value))
default:
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", t.Key, t.Value))
}
}
if len(tagsstr) > 0 {
argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ",")))
}
argstr = append(argstr, fmt.Sprintf("--name=%s", point.Name()))
for _, f := range point.FieldList() {
if f.Key == "value" {
switch f.Value.(type) {
case float64:
argstr = append(argstr, fmt.Sprintf("--value=%v", f.Value.(float64)))
argstr = append(argstr, "--type=double")
case float32:
argstr = append(argstr, fmt.Sprintf("--value=%v", f.Value.(float32)))
argstr = append(argstr, "--type=float")
case int:
argstr = append(argstr, fmt.Sprintf("--value=%d", f.Value.(int)))
argstr = append(argstr, "--type=int32")
case int64:
argstr = append(argstr, fmt.Sprintf("--value=%d", f.Value.(int64)))
argstr = append(argstr, "--type=int32")
case string:
argstr = append(argstr, fmt.Sprintf("--value=%q", f.Value.(string)))
argstr = append(argstr, "--type=string")
}
}
}
log.Print(s.gmetric_path, " ", strings.Join(argstr, " "))
// command := exec.Command(string(GMETRIC_EXEC), strings.Join(argstr, " "))
// command.Wait()
// _, err := command.Output()
return err
}
func (s *GangliaSink) Flush() error {
return nil
}
func (s *GangliaSink) Close() {
}

View File

@@ -7,19 +7,21 @@ import (
"net/http"
"time"
lp "github.com/influxdata/line-protocol"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influx "github.com/influxdata/line-protocol"
)
type HttpSink struct {
Sink
sink
client *http.Client
url, jwt string
encoder *lp.Encoder
encoder *influx.Encoder
buffer *bytes.Buffer
}
func (s *HttpSink) Init(config SinkConfig) error {
if len(config.Host) == 0 || len(config.Port) == 0 {
func (s *HttpSink) Init(config sinkConfig) error {
s.name = "HttpSink"
if len(config.Host) == 0 || len(config.Port) == 0 || len(config.Database) == 0 {
return errors.New("`host`, `port` and `database` config options required for TCP sink")
}
@@ -28,13 +30,13 @@ func (s *HttpSink) Init(config SinkConfig) error {
s.port = config.Port
s.jwt = config.Password
s.buffer = &bytes.Buffer{}
s.encoder = lp.NewEncoder(s.buffer)
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
return nil
}
func (s *HttpSink) Write(point lp.MutableMetric) error {
func (s *HttpSink) Write(point lp.CCMetric) error {
_, err := s.encoder.Encode(point)
return err
}

View File

@@ -5,15 +5,14 @@ import (
"crypto/tls"
"errors"
"fmt"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
lp "github.com/influxdata/line-protocol"
"log"
)
type InfluxSink struct {
Sink
sink
client influxdb2.Client
writeApi influxdb2Api.WriteAPIBlocking
retPolicy string
@@ -39,7 +38,8 @@ func (s *InfluxSink) connect() error {
return nil
}
func (s *InfluxSink) Init(config SinkConfig) error {
func (s *InfluxSink) Init(config sinkConfig) error {
s.name = "InfluxSink"
if len(config.Host) == 0 ||
len(config.Port) == 0 ||
len(config.Database) == 0 ||
@@ -54,15 +54,21 @@ func (s *InfluxSink) Init(config SinkConfig) error {
s.user = config.User
s.password = config.Password
s.ssl = config.SSL
s.meta_as_tags = config.MetaAsTags
return s.connect()
}
func (s *InfluxSink) Write(point lp.MutableMetric) error {
func (s *InfluxSink) Write(point lp.CCMetric) error {
tags := map[string]string{}
fields := map[string]interface{}{}
for _, t := range point.TagList() {
tags[t.Key] = t.Value
}
if s.meta_as_tags {
for _, m := range point.MetaList() {
tags[m.Key] = m.Value
}
}
for _, f := range point.FieldList() {
fields[f.Key] = f.Value
}

View File

@@ -2,21 +2,22 @@ package sinks
import (
// "time"
lp "github.com/influxdata/line-protocol"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
type SinkConfig struct {
Host string `json:"host"`
Port string `json:"port"`
Database string `json:"database"`
User string `json:"user"`
Password string `json:"password"`
Organization string `json:"organization"`
type sinkConfig struct {
Type string `json:"type"`
SSL bool `json:"ssl"`
Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"`
MetaAsTags bool `json:"meta_as_tags,omitempty"`
}
type Sink struct {
type sink struct {
host string
port string
user string
@@ -24,11 +25,18 @@ type Sink struct {
database string
organization string
ssl bool
meta_as_tags bool
name string
}
type SinkFuncs interface {
Init(config SinkConfig) error
Write(point lp.MutableMetric) error
type Sink interface {
Init(config sinkConfig) error
Write(point lp.CCMetric) error
Flush() error
Close()
Name() string
}
func (s *sink) Name() string {
return s.name
}

View File

@@ -4,16 +4,17 @@ import (
"bytes"
"errors"
"fmt"
lp "github.com/influxdata/line-protocol"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influx "github.com/influxdata/line-protocol"
nats "github.com/nats-io/nats.go"
"log"
"time"
)
type NatsSink struct {
Sink
sink
client *nats.Conn
encoder *lp.Encoder
encoder *influx.Encoder
buffer *bytes.Buffer
}
@@ -31,7 +32,8 @@ func (s *NatsSink) connect() error {
return nil
}
func (s *NatsSink) Init(config SinkConfig) error {
func (s *NatsSink) Init(config sinkConfig) error {
s.name = "NatsSink"
if len(config.Host) == 0 ||
len(config.Port) == 0 ||
len(config.Database) == 0 {
@@ -46,40 +48,31 @@ func (s *NatsSink) Init(config SinkConfig) error {
// Setup Influx line protocol
s.buffer = &bytes.Buffer{}
s.buffer.Grow(1025)
s.encoder = lp.NewEncoder(s.buffer)
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
s.encoder.SetMaxLineBytes(1024)
// Setup infos for connection
return s.connect()
}
func (s *NatsSink) Write(point lp.MutableMetric) error {
func (s *NatsSink) Write(point lp.CCMetric) error {
if s.client != nil {
// var tags map[string]string
// var fields map[string]interface{}
// for _, t := range point.TagList() {
// tags[t.Key] = t.Value
// }
// for _, f := range point.FieldList() {
// fields[f.Key] = f.Value
// }
// m, err := protocol.New(point.Name(), tags, fields, point.Time())
// if err != nil {
// log.Print(err)
// return err
// }
_, err := s.encoder.Encode(point)
if err != nil {
log.Print(err)
return err
}
s.client.Publish(s.database, s.buffer.Bytes())
s.buffer.Reset()
}
return nil
}
func (s *NatsSink) Flush() error {
if s.client != nil {
if err := s.client.Publish(s.database, s.buffer.Bytes()); err != nil {
return err
}
s.buffer.Reset()
}
return nil
}

166
sinks/sinkManager.go Normal file
View File

@@ -0,0 +1,166 @@
package sinks
import (
"encoding/json"
"os"
"sync"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
// Map of all available sinks
var AvailableSinks = map[string]Sink{
"influxdb": new(InfluxSink),
"stdout": new(StdoutSink),
"nats": new(NatsSink),
"http": new(HttpSink),
"ganglia": new(GangliaSink),
}
// Metric collector manager data structure
type sinkManager struct {
input chan lp.CCMetric // input channel
outputs []Sink // List of sinks to use
done chan bool // channel to finish / stop metric sink manager
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
config []sinkConfig // json encoded config for sink manager
}
// Sink manager access functions
type SinkManager interface {
Init(wg *sync.WaitGroup, sinkConfigFile string) error
AddInput(input chan lp.CCMetric)
AddOutput(config json.RawMessage) error
Start()
Close()
}
func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
sm.input = nil
sm.outputs = make([]Sink, 0)
sm.done = make(chan bool)
sm.wg = wg
sm.config = make([]sinkConfig, 0)
// Read sink config file
if len(sinkConfigFile) > 0 {
configFile, err := os.Open(sinkConfigFile)
if err != nil {
cclog.ComponentError("SinkManager", err.Error())
return err
}
defer configFile.Close()
jsonParser := json.NewDecoder(configFile)
var rawConfigs []json.RawMessage
err = jsonParser.Decode(&rawConfigs)
if err != nil {
cclog.ComponentError("SinkManager", err.Error())
return err
}
for _, raw := range rawConfigs {
err = sm.AddOutput(raw)
if err != nil {
continue
}
}
}
return nil
}
func (sm *sinkManager) Start() {
batchcount := 20
sm.wg.Add(1)
go func() {
defer sm.wg.Done()
// Sink manager is done
done := func() {
for _, s := range sm.outputs {
s.Flush()
s.Close()
}
close(sm.done)
cclog.ComponentDebug("SinkManager", "DONE")
}
for {
select {
case <-sm.done:
done()
return
case p := <-sm.input:
// Send received metric to all outputs
cclog.ComponentDebug("SinkManager", "WRITE", p)
for _, s := range sm.outputs {
s.Write(p)
}
// Flush all outputs
if batchcount == 0 {
cclog.ComponentDebug("SinkManager", "FLUSH")
for _, s := range sm.outputs {
s.Flush()
}
batchcount = 20
}
batchcount--
}
}
}()
// Sink manager is started
cclog.ComponentDebug("SinkManager", "STARTED")
}
// AddInput adds the input channel to the sink manager
func (sm *sinkManager) AddInput(input chan lp.CCMetric) {
sm.input = input
}
func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error {
var err error
var config sinkConfig
if len(rawConfig) > 3 {
err = json.Unmarshal(rawConfig, &config)
if err != nil {
cclog.ComponentError("SinkManager", "SKIP", config.Type, "JSON config error:", err.Error())
return err
}
}
if _, found := AvailableSinks[config.Type]; !found {
cclog.ComponentError("SinkManager", "SKIP", config.Type, "unknown sink:", err.Error())
return err
}
s := AvailableSinks[config.Type]
err = s.Init(config)
if err != nil {
cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error())
return err
}
sm.outputs = append(sm.outputs, s)
sm.config = append(sm.config, config)
cclog.ComponentDebug("SinkManager", "ADD SINK", s.Name())
return nil
}
// Close finishes / stops the sink manager
func (sm *sinkManager) Close() {
cclog.ComponentDebug("SinkManager", "CLOSE")
sm.done <- true
// wait for close of channel sm.done
<-sm.done
}
// New creates a new initialized sink manager
func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) {
sm := &sinkManager{}
err := sm.Init(wg, sinkConfigFile)
if err != nil {
return nil, err
}
return sm, err
}

View File

@@ -6,23 +6,30 @@ import (
"strings"
// "time"
lp "github.com/influxdata/line-protocol"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
type StdoutSink struct {
Sink
sink
}
func (s *StdoutSink) Init(config SinkConfig) error {
func (s *StdoutSink) Init(config sinkConfig) error {
s.name = "StdoutSink"
s.meta_as_tags = config.MetaAsTags
return nil
}
func (s *StdoutSink) Write(point lp.MutableMetric) error {
func (s *StdoutSink) Write(point lp.CCMetric) error {
var tagsstr []string
var fieldstr []string
for _, t := range point.TagList() {
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", t.Key, t.Value))
}
if s.meta_as_tags {
for _, m := range point.MetaList() {
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", m.Key, m.Value))
}
}
for _, f := range point.FieldList() {
switch f.Value.(type) {
case float64:
@@ -59,6 +66,4 @@ func (s *StdoutSink) Flush() error {
return nil
}
func (s *StdoutSink) Close() {
return
}
func (s *StdoutSink) Close() {}