Merge current development version into main (#48)

* DiskstatCollector: cast part_max_used metric to int

* Add uint types to GangliaSink and LibgangliaSink

* Use new sink instances to allow multiple of same sink type

* Update sink README and SampleSink

* Use new receiver instances to allow multiple of same receiver type

* Fix metric scope in likwid configuration script

* Mention likwid config script in LikwidCollector README

* Refactor: Embed Init() into New() function

* Refactor: Embed Init() into New() function

* Fix: MetricReceiver uses uninitialized values, when initialization fails

* Use Ganglia configuration (#44)

* Copy all metric configurations from original Ganglia code

* Use metric configurations from Ganglia for some metrics

* Format value string also for known metrics

* Numa-aware memstat collector (#45)

* Add samples for collectors, sinks and receivers

* Ping InfluxDB server after connecting to recognize faulty connections

* Add sink for Prometheus monitoring system (#46)

* Add sink for Prometheus monitoring system

* Add prometheus sink to README

* Add scraper for Prometheus clients (#47)

Co-authored-by: Holger Obermaier <holgerob@gmx.de>
Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>
This commit is contained in:
Thomas Gruber
2022-02-25 14:49:49 +01:00
committed by GitHub
parent b4cc6d54ea
commit d98076c792
27 changed files with 1509 additions and 517 deletions

View File

@@ -6,9 +6,11 @@ This folder contains the SinkManager and sink implementations for the cc-metric-
- [`stdout`](./stdoutSink.md): Print all metrics to `stdout`, `stderr` or a file
- [`http`](./httpSink.md): Send metrics to an HTTP server as POST requests
- [`influxdb`](./influxSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database
- [`influxasync`](./influxAsyncSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database with non-blocking write API
- [`nats`](./natsSink.md): Publish metrics to the [NATS](https://nats.io/) network overlay system
- [`ganglia`](./gangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) using the `gmetric` CLI tool
- [`libganglia`](./libgangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) directly using `libganglia.so`
- [`prometeus`](./prometheusSink.md): Publish metrics for the [Prometheus Monitoring System](https://prometheus.io/)
# Configuration
@@ -34,11 +36,12 @@ The configuration file for the sinks is a list of configurations. The `type` fie
# Contributing own sinks
A sink contains four functions and is derived from the type `sink`:
* `Init(config json.RawMessage) error`
A sink contains five functions and is derived from the type `sink`:
* `Init(name string, config json.RawMessage) error`
* `Write(point CCMetric) error`
* `Flush() error`
* `Close()`
* `New<Typename>(name string, config json.RawMessage) (Sink, error)` (calls the `Init()` function)
The data structures should be set up in `Init()` like opening a file or server connection. The `Write()` function writes/sends the data. For non-blocking sinks, the `Flush()` method tells the sink to drain its internal buffers. The `Close()` function should tear down anything created in `Init()`.
@@ -65,8 +68,8 @@ type SampleSink struct {
}
// Initialize the sink by giving it a name and reading in the config JSON
func (s *SampleSink) Init(config json.RawMessage) error {
s.name = "SampleSink" // Always specify a name here
func (s *SampleSink) Init(name string, config json.RawMessage) error {
s.name = fmt.Sprintf("SampleSink(%s)", name) // Always specify a name here
// Read in the config JSON
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
@@ -91,4 +94,13 @@ func (s *SampleSink) Flush() error {
// Close sink: close network connection, close files, close libraries, ...
func (s *SampleSink) Close() {}
// New function to create a new instance of the sink
func NewSampleSink(name string, config json.RawMessage) (Sink, error) {
s := new(SampleSink)
err := s.Init(name, config)
return s, err
}
```

View File

@@ -1,6 +1,7 @@
package sinks
import (
"fmt"
"strings"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
@@ -23,11 +24,8 @@ func GangliaMetricName(point lp.CCMetric) string {
return name
}
func GangliaMetricRename(point lp.CCMetric) string {
name := point.Name()
if name == "mem_total" || name == "swap_total" {
return name
} else if name == "net_bytes_in" {
func GangliaMetricRename(name string) string {
if name == "net_bytes_in" {
return "bytes_in"
} else if name == "net_bytes_out" {
return "bytes_out"
@@ -48,3 +46,213 @@ func GangliaSlopeType(point lp.CCMetric) uint {
}
return 3
}
const DEFAULT_GANGLIA_METRIC_TMAX = 300
const DEFAULT_GANGLIA_METRIC_SLOPE = "both"
type GangliaMetric struct {
Name string
Type string
Slope string
Tmax int
Unit string
}
type GangliaMetricGroup struct {
Name string
Metrics []GangliaMetric
}
var CommonGangliaMetrics = []GangliaMetricGroup{
{
Name: "memory",
Metrics: []GangliaMetric{
{"mem_total", "float", "zero", 1200, "KB"},
{"swap_total", "float", "zero", 1200, "KB"},
{"mem_free", "float", "both", 180, "KB"},
{"mem_shared", "float", "both", 180, "KB"},
{"mem_buffers", "float", "both", 180, "KB"},
{"mem_cached", "float", "both", 180, "KB"},
{"swap_free", "float", "both", 180, "KB"},
{"mem_sreclaimable", "float", "both", 180, "KB"},
{"mem_slab", "float", "both", 180, "KB"},
},
},
{
Name: "cpu",
Metrics: []GangliaMetric{
{"cpu_num", "uint32", "zero", 1200, "CPUs"},
{"cpu_speed", "uint32", "zero", 1200, "MHz"},
{"cpu_user", "float", "both", 90, "%"},
{"cpu_nice", "float", "both", 90, "%"},
{"cpu_system", "float", "both", 90, "%"},
{"cpu_idle", "float", "both", 3800, "%"},
{"cpu_aidle", "float", "both", 90, "%"},
{"cpu_wio", "float", "both", 90, "%"},
{"cpu_intr", "float", "both", 90, "%"},
{"cpu_sintr", "float", "both", 90, "%"},
{"cpu_steal", "float", "both", 90, "%"},
{"cpu_guest", "float", "both", 90, "%"},
{"cpu_gnice", "float", "both", 90, "%"},
},
},
{
Name: "load",
Metrics: []GangliaMetric{
{"load_one", "float", "both", 70, ""},
{"load_five", "float", "both", 325, ""},
{"load_fifteen", "float", "both", 950, ""},
},
},
{
Name: "disk",
Metrics: []GangliaMetric{
{"disk_total", "double", "both", 1200, "GB"},
{"disk_free", "double", "both", 180, "GB"},
{"part_max_used", "float", "both", 180, "%"},
},
},
{
Name: "network",
Metrics: []GangliaMetric{
{"bytes_out", "float", "both", 300, "bytes/sec"},
{"bytes_in", "float", "both", 300, "bytes/sec"},
{"pkts_in", "float", "both", 300, "packets/sec"},
{"pkts_out", "float", "both", 300, "packets/sec"},
},
},
{
Name: "process",
Metrics: []GangliaMetric{
{"proc_run", "uint32", "both", 950, ""},
{"proc_total", "uint32", "both", 950, ""},
},
},
{
Name: "system",
Metrics: []GangliaMetric{
{"boottime", "uint32", "zero", 1200, "s"},
{"sys_clock", "uint32", "zero", 1200, "s"},
{"machine_type", "string", "zero", 1200, ""},
{"os_name", "string", "zero", 1200, ""},
{"os_release", "string", "zero", 1200, ""},
{"mtu", "uint32", "both", 1200, ""},
},
},
}
type GangliaMetricConfig struct {
Type string
Slope string
Tmax int
Unit string
Group string
Value string
}
func GetCommonGangliaConfig(point lp.CCMetric) GangliaMetricConfig {
mname := GangliaMetricRename(point.Name())
for _, group := range CommonGangliaMetrics {
for _, metric := range group.Metrics {
if metric.Name == mname {
valueStr := ""
value, ok := point.GetField("value")
if ok {
switch real := value.(type) {
case float64:
valueStr = fmt.Sprintf("%f", real)
case float32:
valueStr = fmt.Sprintf("%f", real)
case int64:
valueStr = fmt.Sprintf("%d", real)
case int32:
valueStr = fmt.Sprintf("%d", real)
case int:
valueStr = fmt.Sprintf("%d", real)
case uint64:
valueStr = fmt.Sprintf("%d", real)
case uint32:
valueStr = fmt.Sprintf("%d", real)
case uint:
valueStr = fmt.Sprintf("%d", real)
case string:
valueStr = real
default:
}
}
return GangliaMetricConfig{
Group: group.Name,
Type: metric.Type,
Slope: metric.Slope,
Tmax: metric.Tmax,
Unit: metric.Unit,
Value: valueStr,
}
}
}
}
return GangliaMetricConfig{
Group: "",
Type: "",
Slope: "",
Tmax: 0,
Unit: "",
Value: "",
}
}
func GetGangliaConfig(point lp.CCMetric) GangliaMetricConfig {
group := ""
if g, ok := point.GetMeta("group"); ok {
group = g
}
unit := ""
if u, ok := point.GetMeta("unit"); ok {
unit = u
}
valueType := "double"
valueStr := ""
value, ok := point.GetField("value")
if ok {
switch real := value.(type) {
case float64:
valueStr = fmt.Sprintf("%f", real)
valueType = "double"
case float32:
valueStr = fmt.Sprintf("%f", real)
valueType = "float"
case int64:
valueStr = fmt.Sprintf("%d", real)
valueType = "int32"
case int32:
valueStr = fmt.Sprintf("%d", real)
valueType = "int32"
case int:
valueStr = fmt.Sprintf("%d", real)
valueType = "int32"
case uint64:
valueStr = fmt.Sprintf("%d", real)
valueType = "uint32"
case uint32:
valueStr = fmt.Sprintf("%d", real)
valueType = "uint32"
case uint:
valueStr = fmt.Sprintf("%d", real)
valueType = "uint32"
case string:
valueStr = real
valueType = "string"
default:
valueType = "invalid"
}
}
return GangliaMetricConfig{
Group: group,
Type: valueType,
Slope: DEFAULT_GANGLIA_METRIC_SLOPE,
Tmax: DEFAULT_GANGLIA_METRIC_TMAX,
Unit: unit,
Value: valueStr,
}
}

View File

@@ -24,6 +24,7 @@ type GangliaSinkConfig struct {
AddTagsAsDesc bool `json:"add_tags_as_desc,omitempty"`
ClusterName string `json:"cluster_name,omitempty"`
AddTypeToName bool `json:"add_type_to_name,omitempty"`
AddUnits bool `json:"add_units,omitempty"`
}
type GangliaSink struct {
@@ -33,16 +34,73 @@ type GangliaSink struct {
config GangliaSinkConfig
}
func (s *GangliaSink) Init(config json.RawMessage) error {
func (s *GangliaSink) Write(point lp.CCMetric) error {
var err error = nil
s.name = "GangliaSink"
//var tagsstr []string
var argstr []string
// Get metric name
metricname := GangliaMetricRename(point.Name())
// Get metric config (type, value, ... in suitable format)
conf := GetCommonGangliaConfig(point)
if len(conf.Type) == 0 {
conf = GetGangliaConfig(point)
}
if len(conf.Type) == 0 {
return fmt.Errorf("metric %s has no 'value' field", metricname)
}
if s.config.AddGangliaGroup {
argstr = append(argstr, fmt.Sprintf("--group=%s", conf.Group))
}
if s.config.AddUnits && len(conf.Unit) > 0 {
argstr = append(argstr, fmt.Sprintf("--units=%s", conf.Unit))
}
if len(s.config.ClusterName) > 0 {
argstr = append(argstr, fmt.Sprintf("--cluster=%s", s.config.ClusterName))
}
// if s.config.AddTagsAsDesc && len(tagsstr) > 0 {
// argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ",")))
// }
if len(s.gmetric_config) > 0 {
argstr = append(argstr, fmt.Sprintf("--conf=%s", s.gmetric_config))
}
if s.config.AddTypeToName {
argstr = append(argstr, fmt.Sprintf("--name=%s", GangliaMetricName(point)))
} else {
argstr = append(argstr, fmt.Sprintf("--name=%s", metricname))
}
argstr = append(argstr, fmt.Sprintf("--slope=%s", conf.Slope))
argstr = append(argstr, fmt.Sprintf("--value=%s", conf.Value))
argstr = append(argstr, fmt.Sprintf("--type=%s", conf.Type))
argstr = append(argstr, fmt.Sprintf("--tmax=%d", conf.Tmax))
cclog.ComponentDebug(s.name, s.gmetric_path, strings.Join(argstr, " "))
command := exec.Command(s.gmetric_path, argstr...)
command.Wait()
_, err = command.Output()
return err
}
func (s *GangliaSink) Flush() error {
return nil
}
func (s *GangliaSink) Close() {
}
func NewGangliaSink(name string, config json.RawMessage) (Sink, error) {
s := new(GangliaSink)
s.name = fmt.Sprintf("GangliaSink(%s)", name)
s.config.AddTagsAsDesc = false
s.config.AddGangliaGroup = false
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
return err
return nil, err
}
}
s.gmetric_path = ""
@@ -60,98 +118,10 @@ func (s *GangliaSink) Init(config json.RawMessage) error {
}
}
if len(s.gmetric_path) == 0 {
err = errors.New("cannot find executable 'gmetric'")
return nil, errors.New("cannot find executable 'gmetric'")
}
if len(s.config.GmetricConfig) > 0 {
s.gmetric_config = s.config.GmetricConfig
}
return err
}
func (s *GangliaSink) Write(point lp.CCMetric) error {
var err error = nil
var tagsstr []string
var argstr []string
if s.config.AddGangliaGroup {
if point.HasTag("group") {
g, _ := point.GetTag("group")
argstr = append(argstr, fmt.Sprintf("--group=%s", g))
} else if point.HasMeta("group") {
g, _ := point.GetMeta("group")
argstr = append(argstr, fmt.Sprintf("--group=%s", g))
}
}
for key, value := range point.Tags() {
switch key {
case "unit":
argstr = append(argstr, fmt.Sprintf("--units=%s", value))
default:
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value))
}
}
if s.config.MetaAsTags {
for key, value := range point.Meta() {
switch key {
case "unit":
argstr = append(argstr, fmt.Sprintf("--units=%s", value))
default:
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value))
}
}
}
if len(s.config.ClusterName) > 0 {
argstr = append(argstr, fmt.Sprintf("--cluster=%s", s.config.ClusterName))
}
if s.config.AddTagsAsDesc && len(tagsstr) > 0 {
argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ",")))
}
if len(s.gmetric_config) > 0 {
argstr = append(argstr, fmt.Sprintf("--conf=%s", s.gmetric_config))
}
name := GangliaMetricRename(point)
if s.config.AddTypeToName {
argstr = append(argstr, fmt.Sprintf("--name=%s", GangliaMetricName(point)))
} else {
argstr = append(argstr, fmt.Sprintf("--name=%s", name))
}
slope := GangliaSlopeType(point)
slopeStr := "both"
if slope == 0 {
slopeStr = "zero"
}
argstr = append(argstr, fmt.Sprintf("--slope=%s", slopeStr))
for k, v := range point.Fields() {
if k == "value" {
switch value := v.(type) {
case float64:
argstr = append(argstr,
fmt.Sprintf("--value=%v", value), "--type=double")
case float32:
argstr = append(argstr,
fmt.Sprintf("--value=%v", value), "--type=float")
case int:
argstr = append(argstr,
fmt.Sprintf("--value=%d", value), "--type=int32")
case int64:
argstr = append(argstr,
fmt.Sprintf("--value=%d", value), "--type=int32")
case string:
argstr = append(argstr,
fmt.Sprintf("--value=%q", value), "--type=string")
}
}
}
command := exec.Command(s.gmetric_path, argstr...)
command.Wait()
_, err = command.Output()
return err
}
func (s *GangliaSink) Flush() error {
return nil
}
func (s *GangliaSink) Close() {
return s, nil
}

View File

@@ -38,57 +38,6 @@ type HttpSink struct {
flushDelay time.Duration
}
func (s *HttpSink) Init(config json.RawMessage) error {
// Set default values
s.name = "HttpSink"
s.config.MaxIdleConns = 10
s.config.IdleConnTimeout = "5s"
s.config.Timeout = "5s"
s.config.FlushDelay = "1s"
// Read config
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
if len(s.config.URL) == 0 {
return errors.New("`url` config option is required for HTTP sink")
}
if s.config.MaxIdleConns > 0 {
s.maxIdleConns = s.config.MaxIdleConns
}
if len(s.config.IdleConnTimeout) > 0 {
t, err := time.ParseDuration(s.config.IdleConnTimeout)
if err == nil {
s.idleConnTimeout = t
}
}
if len(s.config.Timeout) > 0 {
t, err := time.ParseDuration(s.config.Timeout)
if err == nil {
s.timeout = t
}
}
if len(s.config.FlushDelay) > 0 {
t, err := time.ParseDuration(s.config.FlushDelay)
if err == nil {
s.flushDelay = t
}
}
tr := &http.Transport{
MaxIdleConns: s.maxIdleConns,
IdleConnTimeout: s.idleConnTimeout,
}
s.client = &http.Client{Transport: tr, Timeout: s.timeout}
s.buffer = &bytes.Buffer{}
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
return nil
}
func (s *HttpSink) Write(m lp.CCMetric) error {
if s.buffer.Len() == 0 && s.flushDelay != 0 {
// This is the first write since the last flush, start the flushTimer!
@@ -169,3 +118,54 @@ func (s *HttpSink) Close() {
}
s.client.CloseIdleConnections()
}
func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
s := new(HttpSink)
// Set default values
s.name = fmt.Sprintf("HttpSink(%s)", name)
s.config.MaxIdleConns = 10
s.config.IdleConnTimeout = "5s"
s.config.Timeout = "5s"
s.config.FlushDelay = "1s"
// Read config
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
}
}
if len(s.config.URL) == 0 {
return nil, errors.New("`url` config option is required for HTTP sink")
}
if s.config.MaxIdleConns > 0 {
s.maxIdleConns = s.config.MaxIdleConns
}
if len(s.config.IdleConnTimeout) > 0 {
t, err := time.ParseDuration(s.config.IdleConnTimeout)
if err == nil {
s.idleConnTimeout = t
}
}
if len(s.config.Timeout) > 0 {
t, err := time.ParseDuration(s.config.Timeout)
if err == nil {
s.timeout = t
}
}
if len(s.config.FlushDelay) > 0 {
t, err := time.ParseDuration(s.config.FlushDelay)
if err == nil {
s.flushDelay = t
}
}
tr := &http.Transport{
MaxIdleConns: s.maxIdleConns,
IdleConnTimeout: s.idleConnTimeout,
}
s.client = &http.Client{Transport: tr, Timeout: s.timeout}
s.buffer = &bytes.Buffer{}
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
return s, nil
}

View File

@@ -1,6 +1,7 @@
package sinks
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
@@ -30,11 +31,10 @@ type InfluxAsyncSinkConfig struct {
type InfluxAsyncSink struct {
sink
client influxdb2.Client
writeApi influxdb2Api.WriteAPI
retPolicy string
errors <-chan error
config InfluxAsyncSinkConfig
client influxdb2.Client
writeApi influxdb2Api.WriteAPI
errors <-chan error
config InfluxAsyncSinkConfig
}
func (s *InfluxAsyncSink) connect() error {
@@ -65,42 +65,16 @@ func (s *InfluxAsyncSink) connect() error {
)
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database)
ok, err := s.client.Ping(context.Background())
if err != nil {
return err
}
if !ok {
return fmt.Errorf("connection to %s not healthy", uri)
}
return nil
}
func (s *InfluxAsyncSink) Init(config json.RawMessage) error {
s.name = "InfluxSink"
// Set default for maximum number of points sent to server in single request.
s.config.BatchSize = 100
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return errors.New("not all configuration variables set required by InfluxAsyncSink")
}
// Connect to InfluxDB server
err := s.connect()
// Start background: Read from error channel
s.errors = s.writeApi.Errors()
go func() {
for err := range s.errors {
cclog.ComponentError(s.name, err.Error())
}
}()
return err
}
func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
s.writeApi.WritePoint(
m.ToPoint(s.config.MetaAsTags),
@@ -118,3 +92,40 @@ func (s *InfluxAsyncSink) Close() {
s.writeApi.Flush()
s.client.Close()
}
func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
s := new(InfluxAsyncSink)
s.name = fmt.Sprintf("InfluxSink(%s)", name)
// Set default for maximum number of points sent to server in single request.
s.config.BatchSize = 100
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return nil, errors.New("not all configuration variables set required by InfluxAsyncSink")
}
// Connect to InfluxDB server
if err := s.connect(); err != nil {
return nil, fmt.Errorf("unable to connect: %v", err)
}
// Start background: Read from error channel
s.errors = s.writeApi.Errors()
go func() {
for err := range s.errors {
cclog.ComponentError(s.name, err.Error())
}
}()
return s, nil
}

View File

@@ -54,29 +54,16 @@ func (s *InfluxSink) connect() error {
)
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database)
ok, err := s.client.Ping(context.Background())
if err != nil {
return err
}
if !ok {
return fmt.Errorf("connection to %s not healthy", uri)
}
return nil
}
func (s *InfluxSink) Init(config json.RawMessage) error {
s.name = "InfluxSink"
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return errors.New("not all configuration variables set required by InfluxSink")
}
// Connect to InfluxDB server
return s.connect()
}
func (s *InfluxSink) Write(m lp.CCMetric) error {
err :=
s.writeApi.WritePoint(
@@ -94,3 +81,27 @@ func (s *InfluxSink) Close() {
cclog.ComponentDebug(s.name, "Closing InfluxDB connection")
s.client.Close()
}
func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
s := new(InfluxSink)
s.name = fmt.Sprintf("InfluxSink(%s)", name)
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return nil, errors.New("not all configuration variables set required by InfluxSink")
}
// Connect to InfluxDB server
if err := s.connect(); err != nil {
return nil, fmt.Errorf("unable to connect: %v", err)
}
return s, nil
}

View File

@@ -82,21 +82,21 @@ const (
GMOND_CONFIG_FILE = `/etc/ganglia/gmond.conf`
)
type LibgangliaSinkSpecialMetric struct {
MetricName string `json:"metric_name,omitempty"`
NewName string `json:"new_name,omitempty"`
Slope string `json:"slope,omitempty"`
}
// type LibgangliaSinkSpecialMetric struct {
// MetricName string `json:"metric_name,omitempty"`
// NewName string `json:"new_name,omitempty"`
// Slope string `json:"slope,omitempty"`
// }
type LibgangliaSinkConfig struct {
defaultSinkConfig
GangliaLib string `json:"libganglia_path,omitempty"`
GmondConfig string `json:"gmond_config,omitempty"`
AddGangliaGroup bool `json:"add_ganglia_group,omitempty"`
AddTypeToName bool `json:"add_type_to_name,omitempty"`
AddUnits bool `json:"add_units,omitempty"`
ClusterName string `json:"cluster_name,omitempty"`
SpecialMetrics map[string]LibgangliaSinkSpecialMetric `json:"rename_metrics,omitempty"` // Map to rename metric name from key to value
GangliaLib string `json:"libganglia_path,omitempty"`
GmondConfig string `json:"gmond_config,omitempty"`
AddGangliaGroup bool `json:"add_ganglia_group,omitempty"`
AddTypeToName bool `json:"add_type_to_name,omitempty"`
AddUnits bool `json:"add_units,omitempty"`
ClusterName string `json:"cluster_name,omitempty"`
//SpecialMetrics map[string]LibgangliaSinkSpecialMetric `json:"rename_metrics,omitempty"` // Map to rename metric name from key to value
//AddTagsAsDesc bool `json:"add_tags_as_desc,omitempty"`
}
@@ -109,65 +109,6 @@ type LibgangliaSink struct {
cstrCache map[string]*C.char
}
func (s *LibgangliaSink) Init(config json.RawMessage) error {
var err error = nil
s.name = "LibgangliaSink"
//s.config.AddTagsAsDesc = false
s.config.AddGangliaGroup = false
s.config.AddTypeToName = false
s.config.AddUnits = true
s.config.GmondConfig = string(GMOND_CONFIG_FILE)
s.config.GangliaLib = string(GANGLIA_LIB_NAME)
if len(config) > 0 {
err = json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config:", err.Error())
return err
}
}
lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS)
if lib == nil {
return fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib)
}
err = lib.Open()
if err != nil {
return fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err)
}
// Set up cache for the C strings
s.cstrCache = make(map[string]*C.char)
// s.cstrCache["globals"] = C.CString("globals")
// s.cstrCache["override_hostname"] = C.CString("override_hostname")
// s.cstrCache["override_ip"] = C.CString("override_ip")
// Add some constant strings
s.cstrCache["GROUP"] = C.CString("GROUP")
s.cstrCache["CLUSTER"] = C.CString("CLUSTER")
s.cstrCache[""] = C.CString("")
// Add cluster name for lookup in Write()
if len(s.config.ClusterName) > 0 {
s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName)
}
// Add supported types for later lookup in Write()
s.cstrCache["double"] = C.CString("double")
s.cstrCache["int32"] = C.CString("int32")
s.cstrCache["string"] = C.CString("string")
// Create Ganglia pool
s.global_context = C.Ganglia_pool_create(nil)
// Load Ganglia configuration
s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig)
s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0)
//globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"])
//override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"])
//override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"])
s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config)
return nil
}
func (s *LibgangliaSink) Write(point lp.CCMetric) error {
var err error = nil
var c_name *C.char
@@ -184,72 +125,48 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error {
}
// Get metric name
metricname := GangliaMetricRename(point)
if s.config.AddTypeToName {
c_name = lookup(GangliaMetricName(point))
} else {
c_name = lookup(metricname)
}
metricname := GangliaMetricRename(point.Name())
// Get the value C string and lookup the type string in the cache
value, ok := point.GetField("value")
if !ok {
conf := GetCommonGangliaConfig(point)
if len(conf.Type) == 0 {
conf = GetGangliaConfig(point)
}
if len(conf.Type) == 0 {
return fmt.Errorf("metric %s has no 'value' field", metricname)
}
switch real := value.(type) {
case float64:
c_value = C.CString(fmt.Sprintf("%f", real))
c_type = lookup("double")
case float32:
c_value = C.CString(fmt.Sprintf("%f", real))
c_type = lookup("float")
case int64:
c_value = C.CString(fmt.Sprintf("%d", real))
c_type = lookup("int32")
case int32:
c_value = C.CString(fmt.Sprintf("%d", real))
c_type = lookup("int32")
case int:
c_value = C.CString(fmt.Sprintf("%d", real))
c_type = lookup("int32")
case string:
c_value = C.CString(real)
c_type = lookup("string")
default:
return fmt.Errorf("metric %s has invalid 'value' type for %s", point.Name(), s.name)
if s.config.AddTypeToName {
metricname = GangliaMetricName(point)
}
c_value = C.CString(conf.Value)
c_type = lookup(conf.Type)
c_name = lookup(metricname)
// Add unit
unit := ""
if s.config.AddUnits {
if tagunit, tagok := point.GetTag("unit"); tagok {
c_unit = lookup(tagunit)
} else if metaunit, metaok := point.GetMeta("unit"); metaok {
c_unit = lookup(metaunit)
} else {
c_unit = lookup("")
}
} else {
c_unit = lookup("")
unit = conf.Unit
}
c_unit = lookup(unit)
// Determine the slope of the metric. Ganglia's own collector mostly use
// 'both' but the mem and swap total uses 'zero'.
slope := GangliaSlopeType(point)
slope_type := C.GANGLIA_SLOPE_BOTH
switch slope {
case 0:
switch conf.Slope {
case "zero":
slope_type = C.GANGLIA_SLOPE_ZERO
case "both":
slope_type = C.GANGLIA_SLOPE_BOTH
}
// Create a new Ganglia metric
gmetric := C.Ganglia_metric_create(s.global_context)
// Set name, value, type and unit in the Ganglia metric
// Since we don't have this information from the collectors,
// we assume that the metric value can go up and down (slope),
// and there is no maximum for 'dmax' and 'tmax'.
// Ganglia's collectors set 'tmax' but not 'dmax'
// The default slope_type is both directions, so up and down. Some metrics want 'zero' slope, probably constant.
// The 'tmax' value is by default 300.
rval := C.int(0)
rval = C.Ganglia_metric_set(gmetric, c_name, c_value, c_type, c_unit, C.uint(slope_type), 0, 0)
rval = C.Ganglia_metric_set(gmetric, c_name, c_value, c_type, c_unit, C.uint(slope_type), C.uint(conf.Tmax), 0)
switch rval {
case 1:
C.free(unsafe.Pointer(c_value))
@@ -259,10 +176,10 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error {
return errors.New("one of your parameters has an invalid character '\"'")
case 3:
C.free(unsafe.Pointer(c_value))
return fmt.Errorf("the type parameter \"%s\" is not a valid type", C.GoString(c_type))
return fmt.Errorf("the type parameter \"%s\" is not a valid type", conf.Type)
case 4:
C.free(unsafe.Pointer(c_value))
return fmt.Errorf("the value parameter \"%s\" does not represent a number", C.GoString(c_value))
return fmt.Errorf("the value parameter \"%s\" does not represent a number", conf.Value)
default:
}
@@ -271,8 +188,8 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error {
C.Ganglia_metadata_add(gmetric, lookup("CLUSTER"), lookup(s.config.ClusterName))
}
// Set the group metadata in the Ganglia metric if configured
if group, ok := point.GetMeta("group"); ok && s.config.AddGangliaGroup {
c_group := lookup(group)
if s.config.AddGangliaGroup {
c_group := lookup(conf.Group)
C.Ganglia_metadata_add(gmetric, lookup("GROUP"), c_group)
}
@@ -307,3 +224,63 @@ func (s *LibgangliaSink) Close() {
C.free(unsafe.Pointer(cstr))
}
}
func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) {
s := new(LibgangliaSink)
var err error = nil
s.name = fmt.Sprintf("LibgangliaSink(%s)", name)
//s.config.AddTagsAsDesc = false
s.config.AddGangliaGroup = false
s.config.AddTypeToName = false
s.config.AddUnits = true
s.config.GmondConfig = string(GMOND_CONFIG_FILE)
s.config.GangliaLib = string(GANGLIA_LIB_NAME)
if len(config) > 0 {
err = json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config:", err.Error())
return nil, err
}
}
lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS)
if lib == nil {
return nil, fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib)
}
err = lib.Open()
if err != nil {
return nil, fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err)
}
// Set up cache for the C strings
s.cstrCache = make(map[string]*C.char)
// s.cstrCache["globals"] = C.CString("globals")
// s.cstrCache["override_hostname"] = C.CString("override_hostname")
// s.cstrCache["override_ip"] = C.CString("override_ip")
// Add some constant strings
s.cstrCache["GROUP"] = C.CString("GROUP")
s.cstrCache["CLUSTER"] = C.CString("CLUSTER")
s.cstrCache[""] = C.CString("")
// Add cluster name for lookup in Write()
if len(s.config.ClusterName) > 0 {
s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName)
}
// Add supported types for later lookup in Write()
s.cstrCache["double"] = C.CString("double")
s.cstrCache["int32"] = C.CString("int32")
s.cstrCache["string"] = C.CString("string")
// Create Ganglia pool
s.global_context = C.Ganglia_pool_create(nil)
// Load Ganglia configuration
s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig)
s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0)
//globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"])
//override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"])
//override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"])
s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config)
return s, nil
}

View File

@@ -1,8 +1,6 @@
package sinks
import (
"encoding/json"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
@@ -17,7 +15,6 @@ type sink struct {
}
type Sink interface {
Init(config json.RawMessage) error
Write(point lp.CCMetric) error
Flush() error
Close()

View File

@@ -53,30 +53,6 @@ func (s *NatsSink) connect() error {
return nil
}
func (s *NatsSink) Init(config json.RawMessage) error {
s.name = "NatsSink"
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
return err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 {
return errors.New("not all configuration variables set required by NatsSink")
}
// Setup Influx line protocol
s.buffer = &bytes.Buffer{}
s.buffer.Grow(1025)
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
s.encoder.SetMaxLineBytes(1024)
// Setup infos for connection
return s.connect()
}
func (s *NatsSink) Write(m lp.CCMetric) error {
if s.client != nil {
_, err := s.encoder.Encode(m.ToPoint(s.config.MetaAsTags))
@@ -105,3 +81,31 @@ func (s *NatsSink) Close() {
s.client.Close()
}
}
func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
s := new(NatsSink)
s.name = fmt.Sprintf("NatsSink(%s)", name)
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
return nil, err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 {
return nil, errors.New("not all configuration variables set required by NatsSink")
}
// Setup Influx line protocol
s.buffer = &bytes.Buffer{}
s.buffer.Grow(1025)
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
s.encoder.SetMaxLineBytes(1024)
// Setup infos for connection
if err := s.connect(); err != nil {
return nil, fmt.Errorf("Unable to connect: %v", err)
}
return s, nil
}

199
sinks/prometheusSink.go Normal file
View File

@@ -0,0 +1,199 @@
package sinks
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type PrometheusSinkConfig struct {
defaultSinkConfig
Host string `json:"host,omitempty"`
Port string `json:"port"`
Path string `json:"path,omitempty"`
GroupAsNameSpace bool `json:"group_as_namespace,omitempty"`
// User string `json:"user,omitempty"`
// Password string `json:"password,omitempty"`
// FlushDelay string `json:"flush_delay,omitempty"`
}
type PrometheusSink struct {
sink
config PrometheusSinkConfig
labelMetrics map[string]*prometheus.GaugeVec
nodeMetrics map[string]prometheus.Gauge
promWg sync.WaitGroup
promServer *http.Server
}
func intToFloat64(input interface{}) (float64, error) {
switch value := input.(type) {
case float64:
return value, nil
case float32:
return float64(value), nil
case int:
return float64(value), nil
case int32:
return float64(value), nil
case int64:
return float64(value), nil
}
return 0, errors.New("cannot cast value to float64")
}
func getLabelValue(metric lp.CCMetric) []string {
labelValues := []string{}
if tid, tidok := metric.GetTag("type-id"); tidok && metric.HasTag("type") {
labelValues = append(labelValues, tid)
}
if d, ok := metric.GetTag("device"); ok {
labelValues = append(labelValues, d)
} else if d, ok := metric.GetMeta("device"); ok {
labelValues = append(labelValues, d)
}
return labelValues
}
func getLabelNames(metric lp.CCMetric) []string {
labelNames := []string{}
if t, tok := metric.GetTag("type"); tok && metric.HasTag("type-id") {
labelNames = append(labelNames, t)
}
if _, ok := metric.GetTag("device"); ok {
labelNames = append(labelNames, "device")
} else if _, ok := metric.GetMeta("device"); ok {
labelNames = append(labelNames, "device")
}
return labelNames
}
func (s *PrometheusSink) newMetric(metric lp.CCMetric) error {
var value float64 = 0
name := metric.Name()
opts := prometheus.GaugeOpts{
Name: name,
}
labels := getLabelNames(metric)
labelValues := getLabelValue(metric)
if len(labels) > 0 && len(labels) != len(labelValues) {
return fmt.Errorf("cannot detect metric labels for metric %s", name)
}
if metricValue, ok := metric.GetField("value"); ok {
if floatValue, err := intToFloat64(metricValue); err == nil {
value = floatValue
} else {
return fmt.Errorf("metric %s with value '%v' cannot be casted to float64", name, metricValue)
}
}
if s.config.GroupAsNameSpace && metric.HasMeta("group") {
g, _ := metric.GetMeta("group")
opts.Namespace = strings.ToLower(g)
}
if len(labels) > 0 {
new := prometheus.NewGaugeVec(opts, labels)
new.WithLabelValues(labelValues...).Set(value)
s.labelMetrics[name] = new
prometheus.Register(new)
} else {
new := prometheus.NewGauge(opts)
new.Set(value)
s.nodeMetrics[name] = new
prometheus.Register(new)
}
return nil
}
func (s *PrometheusSink) updateMetric(metric lp.CCMetric) error {
var value float64 = 0.0
name := metric.Name()
labelValues := getLabelValue(metric)
if metricValue, ok := metric.GetField("value"); ok {
if floatValue, err := intToFloat64(metricValue); err == nil {
value = floatValue
} else {
return fmt.Errorf("metric %s with value '%v' cannot be casted to float64", name, metricValue)
}
}
if len(labelValues) > 0 {
if _, ok := s.labelMetrics[name]; !ok {
err := s.newMetric(metric)
if err != nil {
return err
}
}
s.labelMetrics[name].WithLabelValues(labelValues...).Set(value)
} else {
if _, ok := s.labelMetrics[name]; !ok {
err := s.newMetric(metric)
if err != nil {
return err
}
}
s.nodeMetrics[name].Set(value)
}
return nil
}
func (s *PrometheusSink) Write(m lp.CCMetric) error {
return s.updateMetric(m)
}
func (s *PrometheusSink) Flush() error {
return nil
}
func (s *PrometheusSink) Close() {
cclog.ComponentDebug(s.name, "CLOSE")
s.promServer.Shutdown(context.Background())
s.promWg.Wait()
}
func NewPrometheusSink(name string, config json.RawMessage) (Sink, error) {
s := new(PrometheusSink)
s.name = "PrometheusSink"
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
return nil, err
}
}
if len(s.config.Port) == 0 {
err := errors.New("not all configuration variables set required by PrometheusSink")
cclog.ComponentError(s.name, err.Error())
return nil, err
}
s.labelMetrics = make(map[string]*prometheus.GaugeVec)
s.nodeMetrics = make(map[string]prometheus.Gauge)
s.promWg.Add(1)
go func() {
router := mux.NewRouter()
// Prometheus endpoint
router.Path("/" + s.config.Path).Handler(promhttp.Handler())
url := fmt.Sprintf("%s:%s", s.config.Host, s.config.Port)
cclog.ComponentDebug(s.name, "Serving Prometheus metrics at", fmt.Sprintf("%s:%s/%s", s.config.Host, s.config.Port, s.config.Path))
s.promServer = &http.Server{Addr: url, Handler: router}
err := s.promServer.ListenAndServe()
if err != nil && err.Error() != "http: Server closed" {
cclog.ComponentError(s.name, err.Error())
}
s.promWg.Done()
}()
return s, nil
}

23
sinks/prometheusSink.md Normal file
View File

@@ -0,0 +1,23 @@
## `prometheus` sink
The `prometheus` sink publishes all metrics via an HTTP server ready to be scraped by a [Prometheus](https://prometheus.io) server. It creates gauge metrics for all node metrics and gauge vectors for all metrics with a subtype like 'device', 'cpu' or 'socket'.
### Configuration structure
```json
{
"<name>": {
"type": "prometheus",
"host": "localhost",
"port": "8080",
"path": "metrics"
}
}
```
- `type`: makes the sink an `prometheus` sink
- `host`: The HTTP server gets bound to that IP/hostname
- `port`: Portnumber (as string) for the HTTP server
- `path`: Path where the metrics should be servered. The metrics will be published at `host`:`port`/`path`
- `group_as_namespace`: Most metrics contain a group as meta information like 'memory', 'load'. With this the metric names are extended to `group`_`name` if possible.

62
sinks/sampleSink.go Normal file
View File

@@ -0,0 +1,62 @@
package sinks
import (
"encoding/json"
"fmt"
"log"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
type SampleSinkConfig struct {
defaultSinkConfig // defines JSON tags for 'type' and 'meta_as_tags'
// Add additional options
}
type SampleSink struct {
sink // declarate 'name' and 'meta_as_tags'
config SampleSinkConfig // entry point to the SampleSinkConfig
}
// Code to submit a single CCMetric to the sink
func (s *SampleSink) Write(point lp.CCMetric) error {
log.Print(point)
return nil
}
// If the sink uses batched sends internally, you can tell to flush its buffers
func (s *SampleSink) Flush() error {
return nil
}
// Close sink: close network connection, close files, close libraries, ...
func (s *SampleSink) Close() {
cclog.ComponentDebug(s.name, "CLOSE")
}
// New function to create a new instance of the sink
// Initialize the sink by giving it a name and reading in the config JSON
func NewSampleSink(name string, config json.RawMessage) (Sink, error) {
s := new(SampleSink)
s.name = fmt.Sprintf("SampleSink(%s)", name) // Always specify a name here
// Set defaults in s.config
// Read in the config JSON
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
}
}
// Check if all required fields in the config are set
// Use 'len(s.config.Option) > 0' for string settings
// Establish connection to the server, library, ...
// Check required files exist and lookup path(s) of executable(s)
// Return (nil, meaningful error message) in case of errors
return s, nil
}

View File

@@ -13,14 +13,14 @@ import (
const SINK_MAX_FORWARD = 50
// Map of all available sinks
var AvailableSinks = map[string]Sink{
"influxdb": new(InfluxSink),
"stdout": new(StdoutSink),
"nats": new(NatsSink),
"http": new(HttpSink),
"ganglia": new(GangliaSink),
"influxasync": new(InfluxAsyncSink),
"libganglia": new(LibgangliaSink),
var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink, error){
"ganglia": NewGangliaSink,
"libganglia": NewLibgangliaSink,
"stdout": NewStdoutSink,
"nats": NewNatsSink,
"influxdb": NewInfluxSink,
"influxasync": NewInfluxAsyncSink,
"http": NewHttpSink,
}
// Metric collector manager data structure
@@ -149,8 +149,7 @@ func (sm *sinkManager) AddOutput(name string, rawConfig json.RawMessage) error {
cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", sinkConfig.Type)
return err
}
s := AvailableSinks[sinkConfig.Type]
err = s.Init(rawConfig)
s, err := AvailableSinks[sinkConfig.Type](name, rawConfig)
if err != nil {
cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error())
return err

View File

@@ -19,34 +19,6 @@ type StdoutSink struct {
}
}
func (s *StdoutSink) Init(config json.RawMessage) error {
s.name = "StdoutSink"
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
s.output = os.Stdout
if len(s.config.Output) > 0 {
switch strings.ToLower(s.config.Output) {
case "stdout":
s.output = os.Stdout
case "stderr":
s.output = os.Stderr
default:
f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600))
if err != nil {
return err
}
s.output = f
}
}
s.meta_as_tags = s.config.MetaAsTags
return nil
}
func (s *StdoutSink) Write(m lp.CCMetric) error {
fmt.Fprint(
s.output,
@@ -65,3 +37,33 @@ func (s *StdoutSink) Close() {
s.output.Close()
}
}
func NewStdoutSink(name string, config json.RawMessage) (Sink, error) {
s := new(StdoutSink)
s.name = fmt.Sprintf("StdoutSink(%s)", name)
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
}
}
s.output = os.Stdout
if len(s.config.Output) > 0 {
switch strings.ToLower(s.config.Output) {
case "stdout":
s.output = os.Stdout
case "stderr":
s.output = os.Stderr
default:
f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600))
if err != nil {
return nil, err
}
s.output = f
}
}
s.meta_as_tags = s.config.MetaAsTags
return s, nil
}