From 4e408f9490cee6ed816a1ca83cdc203d78c79e65 Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Fri, 28 Jan 2022 15:16:58 +0100 Subject: [PATCH 1/7] Add documentation --- collectors/collectorManager.go | 39 +++++++++++----------- internal/metricRouter/metricRouter.go | 48 +++++++++++++++++---------- sinks/sinkManager.go | 41 +++++++++++++++++------ 3 files changed, 81 insertions(+), 47 deletions(-) diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index 7b0a9b7..52e91e7 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -34,17 +34,18 @@ var AvailableCollectors = map[string]MetricCollector{ "nfsstat": new(NfsCollector), } +// Metric collector manager data structure type collectorManager struct { - collectors []MetricCollector - output chan lp.CCMetric // List of all output channels - done chan bool // channel to finish / stop metric collector manager - ticker mct.MultiChanTicker - duration time.Duration - wg *sync.WaitGroup - config map[string]json.RawMessage + collectors []MetricCollector // List of metric collectors to use + output chan lp.CCMetric // Output channels + done chan bool // channel to finish / stop metric collector manager + ticker mct.MultiChanTicker // periodically ticking once each interval + duration time.Duration // duration (for metrics that measure over a given duration) + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + config map[string]json.RawMessage // json encoded config for collector manager } -// Metric collector access functions +// Metric collector manager access functions type CollectorManager interface { Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error AddOutput(output chan lp.CCMetric) @@ -53,9 +54,9 @@ type CollectorManager interface { } // Init initializes a new metric collector manager by setting up: -// * output channels +// * output channel // * done channel -// * wait group synchronization (from variable wg) +// * wait group synchronization for goroutines (from variable wg) // * ticker (from variable ticker) // * configuration (read from config file in variable collectConfigFile) // Initialization is done for all configured collectors @@ -82,20 +83,20 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat } // Initialize configured collectors - for k, cfg := range cm.config { - if _, found := AvailableCollectors[k]; !found { - cclog.ComponentError("CollectorManager", "SKIP unknown collector", k) + for collectorName, collectorCfg := range cm.config { + if _, found := AvailableCollectors[collectorName]; !found { + cclog.ComponentError("CollectorManager", "SKIP unknown collector", collectorName) continue } - c := AvailableCollectors[k] + collector := AvailableCollectors[collectorName] - err = c.Init(cfg) + err = collector.Init(collectorCfg) if err != nil { - cclog.ComponentError("CollectorManager", "Collector", k, "initialization failed:", err.Error()) + cclog.ComponentError("CollectorManager", "Collector", collectorName, "initialization failed:", err.Error()) continue } - cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", c.Name()) - cm.collectors = append(cm.collectors, c) + cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name()) + cm.collectors = append(cm.collectors, collector) } return nil } @@ -157,7 +158,7 @@ func (cm *collectorManager) Close() { // New creates a new initialized metric collector manager func New(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) { - cm := &collectorManager{} + cm := new(collectorManager) err := cm.Init(ticker, duration, wg, collectConfigFile) if err != nil { return nil, err diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index a321aae..956ac11 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -24,19 +24,20 @@ type metricRouterTagConfig struct { type metricRouterConfig struct { AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met - IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically? + IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval? } +// Metric router data structure type metricRouter struct { - coll_input chan lp.CCMetric // Input channel from CollectorManager - recv_input chan lp.CCMetric // Input channel from ReceiveManager - outputs []chan lp.CCMetric // List of all output channels - done chan bool // channel to finish / stop metric router - wg *sync.WaitGroup - timestamp time.Time // timestamp - timerdone chan bool // channel to finish / stop timestamp updater - ticker mct.MultiChanTicker - config metricRouterConfig + coll_input chan lp.CCMetric // Input channel from CollectorManager + recv_input chan lp.CCMetric // Input channel from ReceiveManager + outputs []chan lp.CCMetric // List of all output channels + done chan bool // channel to finish / stop metric router + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + timestamp time.Time // timestamp periodically updated by ticker each interval + timerdone chan bool // channel to finish / stop timestamp updater + ticker mct.MultiChanTicker // periodically ticking once each interval + config metricRouterConfig // json encoded config for metric router } // MetricRouter access functions @@ -60,6 +61,8 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout r.done = make(chan bool) r.wg = wg r.ticker = ticker + + // Read metric router config file configFile, err := os.Open(routerConfigFile) if err != nil { cclog.ComponentError("MetricRouter", err.Error()) @@ -97,11 +100,11 @@ func (r *metricRouter) StartTimer() { cclog.ComponentDebug("MetricRouter", "TIMER START") } -// EvalCondition evaluates condition Cond for metric data from point -func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error) { - expression, err := govaluate.NewEvaluableExpression(Cond) +// EvalCondition evaluates condition cond for metric data from point +func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, error) { + expression, err := govaluate.NewEvaluableExpression(cond) if err != nil { - cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error()) + cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error()) return false, err } @@ -122,7 +125,7 @@ func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, erro // evaluate condition result, err := expression.Evaluate(params) if err != nil { - cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error()) + cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error()) return false, err } return bool(result.(bool)), err @@ -172,13 +175,20 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) { // Start starts the metric router func (r *metricRouter) Start() { + + // start timer if configured r.timestamp = time.Now() if r.config.IntervalStamp { r.StartTimer() } + + // Router manager is done done := func() { cclog.ComponentDebug("MetricRouter", "DONE") } + + // Forward takes a received metric, adds or deletes tags + // and forwards it to the output channels forward := func(point lp.CCMetric) { cclog.ComponentDebug("MetricRouter", "FORWARD", point) r.DoAddTags(point) @@ -192,17 +202,20 @@ func (r *metricRouter) Start() { go func() { defer r.wg.Done() for { - // RouterLoop: select { case <-r.done: done() return + case p := <-r.coll_input: + // receive from metric collector if r.config.IntervalStamp { p.SetTime(r.timestamp) } forward(p) + case p := <-r.recv_input: + // receive from receive manager if r.config.IntervalStamp { p.SetTime(r.timestamp) } @@ -213,11 +226,12 @@ func (r *metricRouter) Start() { cclog.ComponentDebug("MetricRouter", "STARTED") } -// AddInput adds a input channel to the metric router +// AddCollectorInput adds a channel between metric collector and metric router func (r *metricRouter) AddCollectorInput(input chan lp.CCMetric) { r.coll_input = input } +// AddReceiverInput adds a channel between metric receiver and metric router func (r *metricRouter) AddReceiverInput(input chan lp.CCMetric) { r.recv_input = input } diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index 4be8313..b4b3dc5 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -9,21 +9,24 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) +// Map of all available sinks var AvailableSinks = map[string]Sink{ - "influxdb": &InfluxSink{}, - "stdout": &StdoutSink{}, - "nats": &NatsSink{}, - "http": &HttpSink{}, + "influxdb": new(InfluxSink), + "stdout": new(StdoutSink), + "nats": new(NatsSink), + "http": new(HttpSink), } +// Metric collector manager data structure type sinkManager struct { - input chan lp.CCMetric - outputs []Sink - done chan bool - wg *sync.WaitGroup - config []sinkConfig + input chan lp.CCMetric // input channel + outputs []Sink // List of sinks to use + done chan bool // channel to finish / stop metric sink manager + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + config []sinkConfig // json encoded config for sink manager } +// Sink manager access functions type SinkManager interface { Init(wg *sync.WaitGroup, sinkConfigFile string) error AddInput(input chan lp.CCMetric) @@ -38,6 +41,8 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { sm.done = make(chan bool) sm.wg = wg sm.config = make([]sinkConfig, 0) + + // Read sink config file if len(sinkConfigFile) > 0 { configFile, err := os.Open(sinkConfigFile) if err != nil { @@ -63,27 +68,36 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { } func (sm *sinkManager) Start() { - sm.wg.Add(1) batchcount := 20 + + sm.wg.Add(1) go func() { + defer sm.wg.Done() + + // Sink manager is done done := func() { for _, s := range sm.outputs { s.Flush() s.Close() } - sm.wg.Done() + cclog.ComponentDebug("SinkManager", "DONE") } + for { select { case <-sm.done: done() return + case p := <-sm.input: + // Send received metric to all outputs cclog.ComponentDebug("SinkManager", "WRITE", p) for _, s := range sm.outputs { s.Write(p) } + + // Flush all outputs if batchcount == 0 { cclog.ComponentDebug("SinkManager", "FLUSH") for _, s := range sm.outputs { @@ -95,9 +109,12 @@ func (sm *sinkManager) Start() { } } }() + + // Sink manager is started cclog.ComponentDebug("SinkManager", "STARTED") } +// AddInput adds the input channel to the sink manager func (sm *sinkManager) AddInput(input chan lp.CCMetric) { sm.input = input } @@ -128,11 +145,13 @@ func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error { return nil } +// Close finishes / stops the sink manager func (sm *sinkManager) Close() { cclog.ComponentDebug("SinkManager", "CLOSE") sm.done <- true } +// New creates a new initialized sink manager func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) { sm := &sinkManager{} err := sm.Init(wg, sinkConfigFile) From d2e02ed36daf508d20fccd5f881fcd057c16003e Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Fri, 28 Jan 2022 19:31:27 +0100 Subject: [PATCH 2/7] Fix: Add missing hostname tag --- internal/metricRouter/metricRouter.go | 12 ++++++++++++ metric-collector.go | 9 --------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 956ac11..e75e77d 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -3,6 +3,7 @@ package metricRouter import ( "encoding/json" "os" + "strings" "sync" "time" @@ -29,6 +30,7 @@ type metricRouterConfig struct { // Metric router data structure type metricRouter struct { + hostname string // Hostname used in tags coll_input chan lp.CCMetric // Input channel from CollectorManager recv_input chan lp.CCMetric // Input channel from ReceiveManager outputs []chan lp.CCMetric // List of all output channels @@ -62,6 +64,15 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout r.wg = wg r.ticker = ticker + // Set hostname + hostname, err := os.Hostname() + if err != nil { + cclog.Error(err.Error()) + return err + } + // Drop domain part of host name + r.hostname = strings.SplitN(hostname, `.`, 2)[0] + // Read metric router config file configFile, err := os.Open(routerConfigFile) if err != nil { @@ -209,6 +220,7 @@ func (r *metricRouter) Start() { case p := <-r.coll_input: // receive from metric collector + p.AddTag("hostname", r.hostname) if r.config.IntervalStamp { p.SetTime(r.timestamp) } diff --git a/metric-collector.go b/metric-collector.go index 3975b62..b709512 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -5,7 +5,6 @@ import ( "flag" "os" "os/signal" - "strings" "syscall" "github.com/ClusterCockpit/cc-metric-collector/collectors" @@ -45,7 +44,6 @@ func LoadCentralConfiguration(file string, config *CentralConfigFile) error { } type RuntimeConfig struct { - Hostname string Interval time.Duration Duration time.Duration CliArgs map[string]string @@ -213,13 +211,6 @@ func mainFunc() int { } rcfg.Duration = time.Duration(rcfg.ConfigFile.Duration) * time.Second - rcfg.Hostname, err = os.Hostname() - if err != nil { - cclog.Error(err.Error()) - return 1 - } - // Drop domain part of host name - rcfg.Hostname = strings.SplitN(rcfg.Hostname, `.`, 2)[0] // err = CreatePidfile(rcfg.CliArgs["pidfile"]) // Set log file From 7316de281357f49ac81fc7cd68015863f2d85d20 Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Fri, 28 Jan 2022 19:49:46 +0100 Subject: [PATCH 3/7] Fix crash caused by: * not running a collector manager when collector manager config file is missing * not running a metric router when metric router config file is missing * not running a sink manager when sink manager config file is missing --- metric-collector.go | 65 ++++++++++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 28 deletions(-) diff --git a/metric-collector.go b/metric-collector.go index b709512..8121141 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -211,6 +211,21 @@ func mainFunc() int { } rcfg.Duration = time.Duration(rcfg.ConfigFile.Duration) * time.Second + if len(rcfg.ConfigFile.RouterConfigFile) == 0 { + cclog.Error("Metric router configuration file must be set") + return 1 + } + + if len(rcfg.ConfigFile.SinkConfigFile) == 0 { + cclog.Error("Sink configuration file must be set") + return 1 + } + + if len(rcfg.ConfigFile.CollectorConfigFile) == 0 { + cclog.Error("Metric collector configuration file must be set") + return 1 + } + // err = CreatePidfile(rcfg.CliArgs["pidfile"]) // Set log file @@ -222,42 +237,36 @@ func mainFunc() int { rcfg.MultiChanTicker = mct.NewTicker(rcfg.Interval) // Create new metric router - if len(rcfg.ConfigFile.RouterConfigFile) > 0 { - rcfg.MetricRouter, err = mr.New(rcfg.MultiChanTicker, &rcfg.Sync, rcfg.ConfigFile.RouterConfigFile) - if err != nil { - cclog.Error(err.Error()) - return 1 - } + rcfg.MetricRouter, err = mr.New(rcfg.MultiChanTicker, &rcfg.Sync, rcfg.ConfigFile.RouterConfigFile) + if err != nil { + cclog.Error(err.Error()) + return 1 } // Create new sink - if len(rcfg.ConfigFile.SinkConfigFile) > 0 { - rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile) - if err != nil { - cclog.Error(err.Error()) - return 1 - } - - // Connect metric router to sink manager - RouterToSinksChannel := make(chan lp.CCMetric, 200) - rcfg.SinkManager.AddInput(RouterToSinksChannel) - rcfg.MetricRouter.AddOutput(RouterToSinksChannel) + rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile) + if err != nil { + cclog.Error(err.Error()) + return 1 } + // Connect metric router to sink manager + RouterToSinksChannel := make(chan lp.CCMetric, 200) + rcfg.SinkManager.AddInput(RouterToSinksChannel) + rcfg.MetricRouter.AddOutput(RouterToSinksChannel) + // Create new collector manager - if len(rcfg.ConfigFile.CollectorConfigFile) > 0 { - rcfg.CollectManager, err = collectors.New(rcfg.MultiChanTicker, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile) - if err != nil { - cclog.Error(err.Error()) - return 1 - } - - // Connect collector manager to metric router - CollectToRouterChannel := make(chan lp.CCMetric, 200) - rcfg.CollectManager.AddOutput(CollectToRouterChannel) - rcfg.MetricRouter.AddCollectorInput(CollectToRouterChannel) + rcfg.CollectManager, err = collectors.New(rcfg.MultiChanTicker, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile) + if err != nil { + cclog.Error(err.Error()) + return 1 } + // Connect collector manager to metric router + CollectToRouterChannel := make(chan lp.CCMetric, 200) + rcfg.CollectManager.AddOutput(CollectToRouterChannel) + rcfg.MetricRouter.AddCollectorInput(CollectToRouterChannel) + // Create new receive manager if len(rcfg.ConfigFile.ReceiverConfigFile) > 0 { rcfg.ReceiveManager, err = receivers.New(&rcfg.Sync, rcfg.ConfigFile.ReceiverConfigFile) From 8df58c051fa4c8d71424d8a90cf6bf5d66d86697 Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Sat, 29 Jan 2022 10:04:31 +0100 Subject: [PATCH 4/7] Lower minimum required golang version to 1.16. --- collectors/gpfsMetric.go | 15 +++++++++++---- go.mod | 9 +-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/collectors/gpfsMetric.go b/collectors/gpfsMetric.go index bc1852b..53db1c2 100644 --- a/collectors/gpfsMetric.go +++ b/collectors/gpfsMetric.go @@ -130,14 +130,21 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { continue } - timestampInt, err := strconv.ParseInt(key_value["_t_"]+key_value["_tu_"], 10, 64) - timestamp := time.UnixMicro(timestampInt) + sec, err := strconv.ParseInt(key_value["_t_"], 10, 64) if err != nil { fmt.Fprintf(os.Stderr, - "GpfsCollector.Read(): Failed to convert time stamp '%s': %s\n", - key_value["_t_"]+key_value["_tu_"], err.Error()) + "GpfsCollector.Read(): Failed to convert seconds to int '%s': %v\n", + key_value["_t_"], err) continue } + msec, err := strconv.ParseInt(key_value["_tu_"], 10, 64) + if err != nil { + fmt.Fprintf(os.Stderr, + "GpfsCollector.Read(): Failed to convert micro seconds to int '%s': %v\n", + key_value["_tu_"], err) + continue + } + timestamp := time.Unix(sec, msec*1000) // bytes read bytesRead, err := strconv.ParseInt(key_value["_br_"], 10, 64) diff --git a/go.mod b/go.mod index da4f3ea..0789f7e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ClusterCockpit/cc-metric-collector -go 1.17 +go 1.16 require ( github.com/NVIDIA/go-nvml v0.11.1-0 @@ -12,14 +12,7 @@ require ( ) require ( - github.com/deepmap/oapi-codegen v1.8.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/nats-io/nats-server/v2 v2.7.0 // indirect - github.com/nats-io/nkeys v0.3.0 // indirect - github.com/nats-io/nuid v1.0.1 // indirect - github.com/pkg/errors v0.9.1 // indirect - golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect - golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect google.golang.org/protobuf v1.27.1 // indirect - gopkg.in/yaml.v2 v2.3.0 // indirect ) From 9e99e47d73ea2e9718e91f9717725cbe4c554b19 Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Sun, 30 Jan 2022 12:08:33 +0100 Subject: [PATCH 5/7] Wait for close of done channel, to ensure manager finished. --- collectors/collectorManager.go | 3 +++ internal/metricRouter/metricRouter.go | 6 ++++++ internal/multiChanTicker/multiChanTicker.go | 3 +++ sinks/sinkManager.go | 3 +++ 4 files changed, 15 insertions(+) diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index 52e91e7..f91db20 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -115,6 +115,7 @@ func (cm *collectorManager) Start() { for _, c := range cm.collectors { c.Close() } + close(cm.done) cclog.ComponentDebug("CollectorManager", "DONE") } @@ -154,6 +155,8 @@ func (cm *collectorManager) AddOutput(output chan lp.CCMetric) { func (cm *collectorManager) Close() { cclog.ComponentDebug("CollectorManager", "CLOSE") cm.done <- true + // wait for close of channel cm.done + <-cm.done } // New creates a new initialized metric collector manager diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index e75e77d..96a2f05 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -101,6 +101,7 @@ func (r *metricRouter) StartTimer() { for { select { case <-r.timerdone: + close(r.timerdone) cclog.ComponentDebug("MetricRouter", "TIMER DONE") return case t := <-m: @@ -195,6 +196,7 @@ func (r *metricRouter) Start() { // Router manager is done done := func() { + close(r.done) cclog.ComponentDebug("MetricRouter", "DONE") } @@ -257,9 +259,13 @@ func (r *metricRouter) AddOutput(output chan lp.CCMetric) { func (r *metricRouter) Close() { cclog.ComponentDebug("MetricRouter", "CLOSE") r.done <- true + // wait for close of channel r.done + <-r.done if r.config.IntervalStamp { cclog.ComponentDebug("MetricRouter", "TIMER CLOSE") r.timerdone <- true + // wait for close of channel r.timerdone + <-r.timerdone } } diff --git a/internal/multiChanTicker/multiChanTicker.go b/internal/multiChanTicker/multiChanTicker.go index a9394ab..e0eca43 100644 --- a/internal/multiChanTicker/multiChanTicker.go +++ b/internal/multiChanTicker/multiChanTicker.go @@ -23,6 +23,7 @@ func (t *multiChanTicker) Init(duration time.Duration) { t.done = make(chan bool) go func() { done := func() { + close(t.done) cclog.ComponentDebug("MultiChanTicker", "DONE") } for { @@ -52,6 +53,8 @@ func (t *multiChanTicker) AddChannel(channel chan time.Time) { func (t *multiChanTicker) Close() { cclog.ComponentDebug("MultiChanTicker", "CLOSE") t.done <- true + // wait for close of channel t.done + <-t.done } func NewTicker(duration time.Duration) MultiChanTicker { diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index b4b3dc5..8d2872a 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -81,6 +81,7 @@ func (sm *sinkManager) Start() { s.Close() } + close(sm.done) cclog.ComponentDebug("SinkManager", "DONE") } @@ -149,6 +150,8 @@ func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error { func (sm *sinkManager) Close() { cclog.ComponentDebug("SinkManager", "CLOSE") sm.done <- true + // wait for close of channel sm.done + <-sm.done } // New creates a new initialized sink manager From 4541e50bea556036c666d48408da3b41495a3304 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Sun, 30 Jan 2022 14:29:25 +0100 Subject: [PATCH 6/7] Minor fixes in ccLogger --- internal/ccLogger/cclogger.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/ccLogger/cclogger.go b/internal/ccLogger/cclogger.go index 38e7e6b..5135780 100644 --- a/internal/ccLogger/cclogger.go +++ b/internal/ccLogger/cclogger.go @@ -38,7 +38,7 @@ func initLogger() { func Print(e ...interface{}) { initLogger() - defaultLog.Print(e) + defaultLog.Print(e...) } func ComponentPrint(component string, e ...interface{}) { @@ -48,7 +48,7 @@ func ComponentPrint(component string, e ...interface{}) { func Info(e ...interface{}) { initLogger() - infoLog.Print(e) + infoLog.Print(e...) } func ComponentInfo(component string, e ...interface{}) { @@ -58,14 +58,14 @@ func ComponentInfo(component string, e ...interface{}) { func Debug(e ...interface{}) { initLogger() - if globalDebug == true { - debugLog.Print(e) + if globalDebug { + debugLog.Print(e...) } } func ComponentDebug(component string, e ...interface{}) { initLogger() - if globalDebug == true && debugLog != nil { + if globalDebug && debugLog != nil { //CCComponentPrint(debugLog, component, e) debugLog.Print(fmt.Sprintf("[%s] ", component), e) } From d3f56115411dd8f9812f9354b7243bfd51f494e6 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Sun, 30 Jan 2022 14:30:06 +0100 Subject: [PATCH 7/7] Add functions to get the fields of a CCMetric and export some more CCMetric functions --- internal/ccMetric/ccMetric.go | 37 ++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/internal/ccMetric/ccMetric.go b/internal/ccMetric/ccMetric.go index 6b6bda9..05f81ff 100644 --- a/internal/ccMetric/ccMetric.go +++ b/internal/ccMetric/ccMetric.go @@ -2,9 +2,10 @@ package ccmetric import ( "fmt" - lp "github.com/influxdata/line-protocol" // MIT license "sort" "time" + + lp "github.com/influxdata/line-protocol" // MIT license ) // Most functions are derived from github.com/influxdata/line-protocol/metric.go @@ -24,6 +25,11 @@ type CCMetric interface { AddMeta(key, value string) MetaList() []*lp.Tag RemoveTag(key string) + GetTag(key string) (string, bool) + GetMeta(key string) (string, bool) + GetField(key string) (interface{}, bool) + HasField(key string) bool + RemoveField(key string) } func (m *ccMetric) Meta() map[string]string { @@ -187,6 +193,35 @@ func (m *ccMetric) AddField(key string, value interface{}) { m.fields = append(m.fields, &lp.Field{Key: key, Value: convertField(value)}) } +func (m *ccMetric) GetField(key string) (interface{}, bool) { + for _, field := range m.fields { + if field.Key == key { + return field.Value, true + } + } + return "", false +} + +func (m *ccMetric) HasField(key string) bool { + for _, field := range m.fields { + if field.Key == key { + return true + } + } + return false +} + +func (m *ccMetric) RemoveField(key string) { + for i, field := range m.fields { + if field.Key == key { + copy(m.fields[i:], m.fields[i+1:]) + m.fields[len(m.fields)-1] = nil + m.fields = m.fields[:len(m.fields)-1] + return + } + } +} + func New( name string, tags map[string]string,