mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-04-17 02:15:55 +02:00
Merge branch 'develop' into fix-ci-config
This commit is contained in:
commit
08e979dec9
@ -34,17 +34,18 @@ var AvailableCollectors = map[string]MetricCollector{
|
||||
"nfsstat": new(NfsCollector),
|
||||
}
|
||||
|
||||
// Metric collector manager data structure
|
||||
type collectorManager struct {
|
||||
collectors []MetricCollector
|
||||
output chan lp.CCMetric // List of all output channels
|
||||
done chan bool // channel to finish / stop metric collector manager
|
||||
ticker mct.MultiChanTicker
|
||||
duration time.Duration
|
||||
wg *sync.WaitGroup
|
||||
config map[string]json.RawMessage
|
||||
collectors []MetricCollector // List of metric collectors to use
|
||||
output chan lp.CCMetric // Output channels
|
||||
done chan bool // channel to finish / stop metric collector manager
|
||||
ticker mct.MultiChanTicker // periodically ticking once each interval
|
||||
duration time.Duration // duration (for metrics that measure over a given duration)
|
||||
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
|
||||
config map[string]json.RawMessage // json encoded config for collector manager
|
||||
}
|
||||
|
||||
// Metric collector access functions
|
||||
// Metric collector manager access functions
|
||||
type CollectorManager interface {
|
||||
Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error
|
||||
AddOutput(output chan lp.CCMetric)
|
||||
@ -53,9 +54,9 @@ type CollectorManager interface {
|
||||
}
|
||||
|
||||
// Init initializes a new metric collector manager by setting up:
|
||||
// * output channels
|
||||
// * output channel
|
||||
// * done channel
|
||||
// * wait group synchronization (from variable wg)
|
||||
// * wait group synchronization for goroutines (from variable wg)
|
||||
// * ticker (from variable ticker)
|
||||
// * configuration (read from config file in variable collectConfigFile)
|
||||
// Initialization is done for all configured collectors
|
||||
@ -82,20 +83,20 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
|
||||
}
|
||||
|
||||
// Initialize configured collectors
|
||||
for k, cfg := range cm.config {
|
||||
if _, found := AvailableCollectors[k]; !found {
|
||||
cclog.ComponentError("CollectorManager", "SKIP unknown collector", k)
|
||||
for collectorName, collectorCfg := range cm.config {
|
||||
if _, found := AvailableCollectors[collectorName]; !found {
|
||||
cclog.ComponentError("CollectorManager", "SKIP unknown collector", collectorName)
|
||||
continue
|
||||
}
|
||||
c := AvailableCollectors[k]
|
||||
collector := AvailableCollectors[collectorName]
|
||||
|
||||
err = c.Init(cfg)
|
||||
err = collector.Init(collectorCfg)
|
||||
if err != nil {
|
||||
cclog.ComponentError("CollectorManager", "Collector", k, "initialization failed:", err.Error())
|
||||
cclog.ComponentError("CollectorManager", "Collector", collectorName, "initialization failed:", err.Error())
|
||||
continue
|
||||
}
|
||||
cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", c.Name())
|
||||
cm.collectors = append(cm.collectors, c)
|
||||
cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name())
|
||||
cm.collectors = append(cm.collectors, collector)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -114,6 +115,7 @@ func (cm *collectorManager) Start() {
|
||||
for _, c := range cm.collectors {
|
||||
c.Close()
|
||||
}
|
||||
close(cm.done)
|
||||
cclog.ComponentDebug("CollectorManager", "DONE")
|
||||
}
|
||||
|
||||
@ -153,11 +155,13 @@ func (cm *collectorManager) AddOutput(output chan lp.CCMetric) {
|
||||
func (cm *collectorManager) Close() {
|
||||
cclog.ComponentDebug("CollectorManager", "CLOSE")
|
||||
cm.done <- true
|
||||
// wait for close of channel cm.done
|
||||
<-cm.done
|
||||
}
|
||||
|
||||
// New creates a new initialized metric collector manager
|
||||
func New(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) {
|
||||
cm := &collectorManager{}
|
||||
cm := new(collectorManager)
|
||||
err := cm.Init(ticker, duration, wg, collectConfigFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -130,21 +130,21 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
continue
|
||||
}
|
||||
|
||||
timestampSec, err := strconv.ParseInt(key_value["_t_"], 10, 64)
|
||||
sec, err := strconv.ParseInt(key_value["_t_"], 10, 64)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr,
|
||||
"GpfsCollector.Read(): Failed to convert time stamp seconds '%s': %s\n",
|
||||
key_value["_t_"], err.Error())
|
||||
"GpfsCollector.Read(): Failed to convert seconds to int '%s': %v\n",
|
||||
key_value["_t_"], err)
|
||||
continue
|
||||
}
|
||||
timestampNano, err := strconv.ParseInt(key_value["_tu_"], 10, 64)
|
||||
msec, err := strconv.ParseInt(key_value["_tu_"], 10, 64)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr,
|
||||
"GpfsCollector.Read(): Failed to convert time stamp nanoseconds '%s': %s\n",
|
||||
key_value["_tu_"], err.Error())
|
||||
"GpfsCollector.Read(): Failed to convert micro seconds to int '%s': %v\n",
|
||||
key_value["_tu_"], err)
|
||||
continue
|
||||
}
|
||||
timestamp := time.Unix(timestampSec, timestampNano)
|
||||
timestamp := time.Unix(sec, msec*1000)
|
||||
|
||||
// bytes read
|
||||
bytesRead, err := strconv.ParseInt(key_value["_br_"], 10, 64)
|
||||
|
7
go.mod
7
go.mod
@ -12,14 +12,7 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/deepmap/oapi-codegen v1.8.2 // indirect
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/nats-io/nats-server/v2 v2.7.0 // indirect
|
||||
github.com/nats-io/nkeys v0.3.0 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
|
||||
google.golang.org/protobuf v1.27.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.3.0 // indirect
|
||||
)
|
||||
|
@ -38,7 +38,7 @@ func initLogger() {
|
||||
|
||||
func Print(e ...interface{}) {
|
||||
initLogger()
|
||||
defaultLog.Print(e)
|
||||
defaultLog.Print(e...)
|
||||
}
|
||||
|
||||
func ComponentPrint(component string, e ...interface{}) {
|
||||
@ -48,7 +48,7 @@ func ComponentPrint(component string, e ...interface{}) {
|
||||
|
||||
func Info(e ...interface{}) {
|
||||
initLogger()
|
||||
infoLog.Print(e)
|
||||
infoLog.Print(e...)
|
||||
}
|
||||
|
||||
func ComponentInfo(component string, e ...interface{}) {
|
||||
@ -58,14 +58,14 @@ func ComponentInfo(component string, e ...interface{}) {
|
||||
|
||||
func Debug(e ...interface{}) {
|
||||
initLogger()
|
||||
if globalDebug == true {
|
||||
debugLog.Print(e)
|
||||
if globalDebug {
|
||||
debugLog.Print(e...)
|
||||
}
|
||||
}
|
||||
|
||||
func ComponentDebug(component string, e ...interface{}) {
|
||||
initLogger()
|
||||
if globalDebug == true && debugLog != nil {
|
||||
if globalDebug && debugLog != nil {
|
||||
//CCComponentPrint(debugLog, component, e)
|
||||
debugLog.Print(fmt.Sprintf("[%s] ", component), e)
|
||||
}
|
||||
|
@ -2,9 +2,10 @@ package ccmetric
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
lp "github.com/influxdata/line-protocol" // MIT license
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
lp "github.com/influxdata/line-protocol" // MIT license
|
||||
)
|
||||
|
||||
// Most functions are derived from github.com/influxdata/line-protocol/metric.go
|
||||
@ -24,6 +25,11 @@ type CCMetric interface {
|
||||
AddMeta(key, value string)
|
||||
MetaList() []*lp.Tag
|
||||
RemoveTag(key string)
|
||||
GetTag(key string) (string, bool)
|
||||
GetMeta(key string) (string, bool)
|
||||
GetField(key string) (interface{}, bool)
|
||||
HasField(key string) bool
|
||||
RemoveField(key string)
|
||||
}
|
||||
|
||||
func (m *ccMetric) Meta() map[string]string {
|
||||
@ -187,6 +193,35 @@ func (m *ccMetric) AddField(key string, value interface{}) {
|
||||
m.fields = append(m.fields, &lp.Field{Key: key, Value: convertField(value)})
|
||||
}
|
||||
|
||||
func (m *ccMetric) GetField(key string) (interface{}, bool) {
|
||||
for _, field := range m.fields {
|
||||
if field.Key == key {
|
||||
return field.Value, true
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
func (m *ccMetric) HasField(key string) bool {
|
||||
for _, field := range m.fields {
|
||||
if field.Key == key {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *ccMetric) RemoveField(key string) {
|
||||
for i, field := range m.fields {
|
||||
if field.Key == key {
|
||||
copy(m.fields[i:], m.fields[i+1:])
|
||||
m.fields[len(m.fields)-1] = nil
|
||||
m.fields = m.fields[:len(m.fields)-1]
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func New(
|
||||
name string,
|
||||
tags map[string]string,
|
||||
|
@ -3,6 +3,7 @@ package metricRouter
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -24,19 +25,21 @@ type metricRouterTagConfig struct {
|
||||
type metricRouterConfig struct {
|
||||
AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met
|
||||
DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met
|
||||
IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically?
|
||||
IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval?
|
||||
}
|
||||
|
||||
// Metric router data structure
|
||||
type metricRouter struct {
|
||||
coll_input chan lp.CCMetric // Input channel from CollectorManager
|
||||
recv_input chan lp.CCMetric // Input channel from ReceiveManager
|
||||
outputs []chan lp.CCMetric // List of all output channels
|
||||
done chan bool // channel to finish / stop metric router
|
||||
wg *sync.WaitGroup
|
||||
timestamp time.Time // timestamp
|
||||
timerdone chan bool // channel to finish / stop timestamp updater
|
||||
ticker mct.MultiChanTicker
|
||||
config metricRouterConfig
|
||||
hostname string // Hostname used in tags
|
||||
coll_input chan lp.CCMetric // Input channel from CollectorManager
|
||||
recv_input chan lp.CCMetric // Input channel from ReceiveManager
|
||||
outputs []chan lp.CCMetric // List of all output channels
|
||||
done chan bool // channel to finish / stop metric router
|
||||
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
|
||||
timestamp time.Time // timestamp periodically updated by ticker each interval
|
||||
timerdone chan bool // channel to finish / stop timestamp updater
|
||||
ticker mct.MultiChanTicker // periodically ticking once each interval
|
||||
config metricRouterConfig // json encoded config for metric router
|
||||
}
|
||||
|
||||
// MetricRouter access functions
|
||||
@ -60,6 +63,17 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
|
||||
r.done = make(chan bool)
|
||||
r.wg = wg
|
||||
r.ticker = ticker
|
||||
|
||||
// Set hostname
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
// Drop domain part of host name
|
||||
r.hostname = strings.SplitN(hostname, `.`, 2)[0]
|
||||
|
||||
// Read metric router config file
|
||||
configFile, err := os.Open(routerConfigFile)
|
||||
if err != nil {
|
||||
cclog.ComponentError("MetricRouter", err.Error())
|
||||
@ -87,6 +101,7 @@ func (r *metricRouter) StartTimer() {
|
||||
for {
|
||||
select {
|
||||
case <-r.timerdone:
|
||||
close(r.timerdone)
|
||||
cclog.ComponentDebug("MetricRouter", "TIMER DONE")
|
||||
return
|
||||
case t := <-m:
|
||||
@ -97,11 +112,11 @@ func (r *metricRouter) StartTimer() {
|
||||
cclog.ComponentDebug("MetricRouter", "TIMER START")
|
||||
}
|
||||
|
||||
// EvalCondition evaluates condition Cond for metric data from point
|
||||
func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error) {
|
||||
expression, err := govaluate.NewEvaluableExpression(Cond)
|
||||
// EvalCondition evaluates condition cond for metric data from point
|
||||
func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, error) {
|
||||
expression, err := govaluate.NewEvaluableExpression(cond)
|
||||
if err != nil {
|
||||
cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error())
|
||||
cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error())
|
||||
return false, err
|
||||
}
|
||||
|
||||
@ -122,7 +137,7 @@ func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, erro
|
||||
// evaluate condition
|
||||
result, err := expression.Evaluate(params)
|
||||
if err != nil {
|
||||
cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error())
|
||||
cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error())
|
||||
return false, err
|
||||
}
|
||||
return bool(result.(bool)), err
|
||||
@ -172,13 +187,21 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) {
|
||||
|
||||
// Start starts the metric router
|
||||
func (r *metricRouter) Start() {
|
||||
|
||||
// start timer if configured
|
||||
r.timestamp = time.Now()
|
||||
if r.config.IntervalStamp {
|
||||
r.StartTimer()
|
||||
}
|
||||
|
||||
// Router manager is done
|
||||
done := func() {
|
||||
close(r.done)
|
||||
cclog.ComponentDebug("MetricRouter", "DONE")
|
||||
}
|
||||
|
||||
// Forward takes a received metric, adds or deletes tags
|
||||
// and forwards it to the output channels
|
||||
forward := func(point lp.CCMetric) {
|
||||
cclog.ComponentDebug("MetricRouter", "FORWARD", point)
|
||||
r.DoAddTags(point)
|
||||
@ -192,17 +215,21 @@ func (r *metricRouter) Start() {
|
||||
go func() {
|
||||
defer r.wg.Done()
|
||||
for {
|
||||
// RouterLoop:
|
||||
select {
|
||||
case <-r.done:
|
||||
done()
|
||||
return
|
||||
|
||||
case p := <-r.coll_input:
|
||||
// receive from metric collector
|
||||
p.AddTag("hostname", r.hostname)
|
||||
if r.config.IntervalStamp {
|
||||
p.SetTime(r.timestamp)
|
||||
}
|
||||
forward(p)
|
||||
|
||||
case p := <-r.recv_input:
|
||||
// receive from receive manager
|
||||
if r.config.IntervalStamp {
|
||||
p.SetTime(r.timestamp)
|
||||
}
|
||||
@ -213,11 +240,12 @@ func (r *metricRouter) Start() {
|
||||
cclog.ComponentDebug("MetricRouter", "STARTED")
|
||||
}
|
||||
|
||||
// AddInput adds a input channel to the metric router
|
||||
// AddCollectorInput adds a channel between metric collector and metric router
|
||||
func (r *metricRouter) AddCollectorInput(input chan lp.CCMetric) {
|
||||
r.coll_input = input
|
||||
}
|
||||
|
||||
// AddReceiverInput adds a channel between metric receiver and metric router
|
||||
func (r *metricRouter) AddReceiverInput(input chan lp.CCMetric) {
|
||||
r.recv_input = input
|
||||
}
|
||||
@ -231,9 +259,13 @@ func (r *metricRouter) AddOutput(output chan lp.CCMetric) {
|
||||
func (r *metricRouter) Close() {
|
||||
cclog.ComponentDebug("MetricRouter", "CLOSE")
|
||||
r.done <- true
|
||||
// wait for close of channel r.done
|
||||
<-r.done
|
||||
if r.config.IntervalStamp {
|
||||
cclog.ComponentDebug("MetricRouter", "TIMER CLOSE")
|
||||
r.timerdone <- true
|
||||
// wait for close of channel r.timerdone
|
||||
<-r.timerdone
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,7 @@ func (t *multiChanTicker) Init(duration time.Duration) {
|
||||
t.done = make(chan bool)
|
||||
go func() {
|
||||
done := func() {
|
||||
close(t.done)
|
||||
cclog.ComponentDebug("MultiChanTicker", "DONE")
|
||||
}
|
||||
for {
|
||||
@ -52,6 +53,8 @@ func (t *multiChanTicker) AddChannel(channel chan time.Time) {
|
||||
func (t *multiChanTicker) Close() {
|
||||
cclog.ComponentDebug("MultiChanTicker", "CLOSE")
|
||||
t.done <- true
|
||||
// wait for close of channel t.done
|
||||
<-t.done
|
||||
}
|
||||
|
||||
func NewTicker(duration time.Duration) MultiChanTicker {
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"flag"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/ClusterCockpit/cc-metric-collector/collectors"
|
||||
@ -45,7 +44,6 @@ func LoadCentralConfiguration(file string, config *CentralConfigFile) error {
|
||||
}
|
||||
|
||||
type RuntimeConfig struct {
|
||||
Hostname string
|
||||
Interval time.Duration
|
||||
Duration time.Duration
|
||||
CliArgs map[string]string
|
||||
@ -213,13 +211,21 @@ func mainFunc() int {
|
||||
}
|
||||
rcfg.Duration = time.Duration(rcfg.ConfigFile.Duration) * time.Second
|
||||
|
||||
rcfg.Hostname, err = os.Hostname()
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
if len(rcfg.ConfigFile.RouterConfigFile) == 0 {
|
||||
cclog.Error("Metric router configuration file must be set")
|
||||
return 1
|
||||
}
|
||||
// Drop domain part of host name
|
||||
rcfg.Hostname = strings.SplitN(rcfg.Hostname, `.`, 2)[0]
|
||||
|
||||
if len(rcfg.ConfigFile.SinkConfigFile) == 0 {
|
||||
cclog.Error("Sink configuration file must be set")
|
||||
return 1
|
||||
}
|
||||
|
||||
if len(rcfg.ConfigFile.CollectorConfigFile) == 0 {
|
||||
cclog.Error("Metric collector configuration file must be set")
|
||||
return 1
|
||||
}
|
||||
|
||||
// err = CreatePidfile(rcfg.CliArgs["pidfile"])
|
||||
|
||||
// Set log file
|
||||
@ -231,42 +237,36 @@ func mainFunc() int {
|
||||
rcfg.MultiChanTicker = mct.NewTicker(rcfg.Interval)
|
||||
|
||||
// Create new metric router
|
||||
if len(rcfg.ConfigFile.RouterConfigFile) > 0 {
|
||||
rcfg.MetricRouter, err = mr.New(rcfg.MultiChanTicker, &rcfg.Sync, rcfg.ConfigFile.RouterConfigFile)
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
rcfg.MetricRouter, err = mr.New(rcfg.MultiChanTicker, &rcfg.Sync, rcfg.ConfigFile.RouterConfigFile)
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
// Create new sink
|
||||
if len(rcfg.ConfigFile.SinkConfigFile) > 0 {
|
||||
rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile)
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
// Connect metric router to sink manager
|
||||
RouterToSinksChannel := make(chan lp.CCMetric, 200)
|
||||
rcfg.SinkManager.AddInput(RouterToSinksChannel)
|
||||
rcfg.MetricRouter.AddOutput(RouterToSinksChannel)
|
||||
rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile)
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
// Connect metric router to sink manager
|
||||
RouterToSinksChannel := make(chan lp.CCMetric, 200)
|
||||
rcfg.SinkManager.AddInput(RouterToSinksChannel)
|
||||
rcfg.MetricRouter.AddOutput(RouterToSinksChannel)
|
||||
|
||||
// Create new collector manager
|
||||
if len(rcfg.ConfigFile.CollectorConfigFile) > 0 {
|
||||
rcfg.CollectManager, err = collectors.New(rcfg.MultiChanTicker, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile)
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
// Connect collector manager to metric router
|
||||
CollectToRouterChannel := make(chan lp.CCMetric, 200)
|
||||
rcfg.CollectManager.AddOutput(CollectToRouterChannel)
|
||||
rcfg.MetricRouter.AddCollectorInput(CollectToRouterChannel)
|
||||
rcfg.CollectManager, err = collectors.New(rcfg.MultiChanTicker, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile)
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
// Connect collector manager to metric router
|
||||
CollectToRouterChannel := make(chan lp.CCMetric, 200)
|
||||
rcfg.CollectManager.AddOutput(CollectToRouterChannel)
|
||||
rcfg.MetricRouter.AddCollectorInput(CollectToRouterChannel)
|
||||
|
||||
// Create new receive manager
|
||||
if len(rcfg.ConfigFile.ReceiverConfigFile) > 0 {
|
||||
rcfg.ReceiveManager, err = receivers.New(&rcfg.Sync, rcfg.ConfigFile.ReceiverConfigFile)
|
||||
|
@ -9,21 +9,24 @@ import (
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
)
|
||||
|
||||
// Map of all available sinks
|
||||
var AvailableSinks = map[string]Sink{
|
||||
"influxdb": &InfluxSink{},
|
||||
"stdout": &StdoutSink{},
|
||||
"nats": &NatsSink{},
|
||||
"http": &HttpSink{},
|
||||
"influxdb": new(InfluxSink),
|
||||
"stdout": new(StdoutSink),
|
||||
"nats": new(NatsSink),
|
||||
"http": new(HttpSink),
|
||||
}
|
||||
|
||||
// Metric collector manager data structure
|
||||
type sinkManager struct {
|
||||
input chan lp.CCMetric
|
||||
outputs []Sink
|
||||
done chan bool
|
||||
wg *sync.WaitGroup
|
||||
config []sinkConfig
|
||||
input chan lp.CCMetric // input channel
|
||||
outputs []Sink // List of sinks to use
|
||||
done chan bool // channel to finish / stop metric sink manager
|
||||
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
|
||||
config []sinkConfig // json encoded config for sink manager
|
||||
}
|
||||
|
||||
// Sink manager access functions
|
||||
type SinkManager interface {
|
||||
Init(wg *sync.WaitGroup, sinkConfigFile string) error
|
||||
AddInput(input chan lp.CCMetric)
|
||||
@ -38,6 +41,8 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
|
||||
sm.done = make(chan bool)
|
||||
sm.wg = wg
|
||||
sm.config = make([]sinkConfig, 0)
|
||||
|
||||
// Read sink config file
|
||||
if len(sinkConfigFile) > 0 {
|
||||
configFile, err := os.Open(sinkConfigFile)
|
||||
if err != nil {
|
||||
@ -63,27 +68,37 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
|
||||
}
|
||||
|
||||
func (sm *sinkManager) Start() {
|
||||
sm.wg.Add(1)
|
||||
batchcount := 20
|
||||
|
||||
sm.wg.Add(1)
|
||||
go func() {
|
||||
defer sm.wg.Done()
|
||||
|
||||
// Sink manager is done
|
||||
done := func() {
|
||||
for _, s := range sm.outputs {
|
||||
s.Flush()
|
||||
s.Close()
|
||||
}
|
||||
sm.wg.Done()
|
||||
|
||||
close(sm.done)
|
||||
cclog.ComponentDebug("SinkManager", "DONE")
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-sm.done:
|
||||
done()
|
||||
return
|
||||
|
||||
case p := <-sm.input:
|
||||
// Send received metric to all outputs
|
||||
cclog.ComponentDebug("SinkManager", "WRITE", p)
|
||||
for _, s := range sm.outputs {
|
||||
s.Write(p)
|
||||
}
|
||||
|
||||
// Flush all outputs
|
||||
if batchcount == 0 {
|
||||
cclog.ComponentDebug("SinkManager", "FLUSH")
|
||||
for _, s := range sm.outputs {
|
||||
@ -95,9 +110,12 @@ func (sm *sinkManager) Start() {
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Sink manager is started
|
||||
cclog.ComponentDebug("SinkManager", "STARTED")
|
||||
}
|
||||
|
||||
// AddInput adds the input channel to the sink manager
|
||||
func (sm *sinkManager) AddInput(input chan lp.CCMetric) {
|
||||
sm.input = input
|
||||
}
|
||||
@ -128,11 +146,15 @@ func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close finishes / stops the sink manager
|
||||
func (sm *sinkManager) Close() {
|
||||
cclog.ComponentDebug("SinkManager", "CLOSE")
|
||||
sm.done <- true
|
||||
// wait for close of channel sm.done
|
||||
<-sm.done
|
||||
}
|
||||
|
||||
// New creates a new initialized sink manager
|
||||
func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) {
|
||||
sm := &sinkManager{}
|
||||
err := sm.Init(wg, sinkConfigFile)
|
||||
|
Loading…
x
Reference in New Issue
Block a user