diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index 7b0a9b7..52e91e7 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -34,17 +34,18 @@ var AvailableCollectors = map[string]MetricCollector{ "nfsstat": new(NfsCollector), } +// Metric collector manager data structure type collectorManager struct { - collectors []MetricCollector - output chan lp.CCMetric // List of all output channels - done chan bool // channel to finish / stop metric collector manager - ticker mct.MultiChanTicker - duration time.Duration - wg *sync.WaitGroup - config map[string]json.RawMessage + collectors []MetricCollector // List of metric collectors to use + output chan lp.CCMetric // Output channels + done chan bool // channel to finish / stop metric collector manager + ticker mct.MultiChanTicker // periodically ticking once each interval + duration time.Duration // duration (for metrics that measure over a given duration) + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + config map[string]json.RawMessage // json encoded config for collector manager } -// Metric collector access functions +// Metric collector manager access functions type CollectorManager interface { Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error AddOutput(output chan lp.CCMetric) @@ -53,9 +54,9 @@ type CollectorManager interface { } // Init initializes a new metric collector manager by setting up: -// * output channels +// * output channel // * done channel -// * wait group synchronization (from variable wg) +// * wait group synchronization for goroutines (from variable wg) // * ticker (from variable ticker) // * configuration (read from config file in variable collectConfigFile) // Initialization is done for all configured collectors @@ -82,20 +83,20 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat } // Initialize configured collectors - for k, cfg := range cm.config { - if _, found := AvailableCollectors[k]; !found { - cclog.ComponentError("CollectorManager", "SKIP unknown collector", k) + for collectorName, collectorCfg := range cm.config { + if _, found := AvailableCollectors[collectorName]; !found { + cclog.ComponentError("CollectorManager", "SKIP unknown collector", collectorName) continue } - c := AvailableCollectors[k] + collector := AvailableCollectors[collectorName] - err = c.Init(cfg) + err = collector.Init(collectorCfg) if err != nil { - cclog.ComponentError("CollectorManager", "Collector", k, "initialization failed:", err.Error()) + cclog.ComponentError("CollectorManager", "Collector", collectorName, "initialization failed:", err.Error()) continue } - cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", c.Name()) - cm.collectors = append(cm.collectors, c) + cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name()) + cm.collectors = append(cm.collectors, collector) } return nil } @@ -157,7 +158,7 @@ func (cm *collectorManager) Close() { // New creates a new initialized metric collector manager func New(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) { - cm := &collectorManager{} + cm := new(collectorManager) err := cm.Init(ticker, duration, wg, collectConfigFile) if err != nil { return nil, err diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index a321aae..956ac11 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -24,19 +24,20 @@ type metricRouterTagConfig struct { type metricRouterConfig struct { AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met - IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically? + IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval? } +// Metric router data structure type metricRouter struct { - coll_input chan lp.CCMetric // Input channel from CollectorManager - recv_input chan lp.CCMetric // Input channel from ReceiveManager - outputs []chan lp.CCMetric // List of all output channels - done chan bool // channel to finish / stop metric router - wg *sync.WaitGroup - timestamp time.Time // timestamp - timerdone chan bool // channel to finish / stop timestamp updater - ticker mct.MultiChanTicker - config metricRouterConfig + coll_input chan lp.CCMetric // Input channel from CollectorManager + recv_input chan lp.CCMetric // Input channel from ReceiveManager + outputs []chan lp.CCMetric // List of all output channels + done chan bool // channel to finish / stop metric router + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + timestamp time.Time // timestamp periodically updated by ticker each interval + timerdone chan bool // channel to finish / stop timestamp updater + ticker mct.MultiChanTicker // periodically ticking once each interval + config metricRouterConfig // json encoded config for metric router } // MetricRouter access functions @@ -60,6 +61,8 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout r.done = make(chan bool) r.wg = wg r.ticker = ticker + + // Read metric router config file configFile, err := os.Open(routerConfigFile) if err != nil { cclog.ComponentError("MetricRouter", err.Error()) @@ -97,11 +100,11 @@ func (r *metricRouter) StartTimer() { cclog.ComponentDebug("MetricRouter", "TIMER START") } -// EvalCondition evaluates condition Cond for metric data from point -func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error) { - expression, err := govaluate.NewEvaluableExpression(Cond) +// EvalCondition evaluates condition cond for metric data from point +func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, error) { + expression, err := govaluate.NewEvaluableExpression(cond) if err != nil { - cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error()) + cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error()) return false, err } @@ -122,7 +125,7 @@ func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, erro // evaluate condition result, err := expression.Evaluate(params) if err != nil { - cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error()) + cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error()) return false, err } return bool(result.(bool)), err @@ -172,13 +175,20 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) { // Start starts the metric router func (r *metricRouter) Start() { + + // start timer if configured r.timestamp = time.Now() if r.config.IntervalStamp { r.StartTimer() } + + // Router manager is done done := func() { cclog.ComponentDebug("MetricRouter", "DONE") } + + // Forward takes a received metric, adds or deletes tags + // and forwards it to the output channels forward := func(point lp.CCMetric) { cclog.ComponentDebug("MetricRouter", "FORWARD", point) r.DoAddTags(point) @@ -192,17 +202,20 @@ func (r *metricRouter) Start() { go func() { defer r.wg.Done() for { - // RouterLoop: select { case <-r.done: done() return + case p := <-r.coll_input: + // receive from metric collector if r.config.IntervalStamp { p.SetTime(r.timestamp) } forward(p) + case p := <-r.recv_input: + // receive from receive manager if r.config.IntervalStamp { p.SetTime(r.timestamp) } @@ -213,11 +226,12 @@ func (r *metricRouter) Start() { cclog.ComponentDebug("MetricRouter", "STARTED") } -// AddInput adds a input channel to the metric router +// AddCollectorInput adds a channel between metric collector and metric router func (r *metricRouter) AddCollectorInput(input chan lp.CCMetric) { r.coll_input = input } +// AddReceiverInput adds a channel between metric receiver and metric router func (r *metricRouter) AddReceiverInput(input chan lp.CCMetric) { r.recv_input = input } diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index 4be8313..b4b3dc5 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -9,21 +9,24 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) +// Map of all available sinks var AvailableSinks = map[string]Sink{ - "influxdb": &InfluxSink{}, - "stdout": &StdoutSink{}, - "nats": &NatsSink{}, - "http": &HttpSink{}, + "influxdb": new(InfluxSink), + "stdout": new(StdoutSink), + "nats": new(NatsSink), + "http": new(HttpSink), } +// Metric collector manager data structure type sinkManager struct { - input chan lp.CCMetric - outputs []Sink - done chan bool - wg *sync.WaitGroup - config []sinkConfig + input chan lp.CCMetric // input channel + outputs []Sink // List of sinks to use + done chan bool // channel to finish / stop metric sink manager + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + config []sinkConfig // json encoded config for sink manager } +// Sink manager access functions type SinkManager interface { Init(wg *sync.WaitGroup, sinkConfigFile string) error AddInput(input chan lp.CCMetric) @@ -38,6 +41,8 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { sm.done = make(chan bool) sm.wg = wg sm.config = make([]sinkConfig, 0) + + // Read sink config file if len(sinkConfigFile) > 0 { configFile, err := os.Open(sinkConfigFile) if err != nil { @@ -63,27 +68,36 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { } func (sm *sinkManager) Start() { - sm.wg.Add(1) batchcount := 20 + + sm.wg.Add(1) go func() { + defer sm.wg.Done() + + // Sink manager is done done := func() { for _, s := range sm.outputs { s.Flush() s.Close() } - sm.wg.Done() + cclog.ComponentDebug("SinkManager", "DONE") } + for { select { case <-sm.done: done() return + case p := <-sm.input: + // Send received metric to all outputs cclog.ComponentDebug("SinkManager", "WRITE", p) for _, s := range sm.outputs { s.Write(p) } + + // Flush all outputs if batchcount == 0 { cclog.ComponentDebug("SinkManager", "FLUSH") for _, s := range sm.outputs { @@ -95,9 +109,12 @@ func (sm *sinkManager) Start() { } } }() + + // Sink manager is started cclog.ComponentDebug("SinkManager", "STARTED") } +// AddInput adds the input channel to the sink manager func (sm *sinkManager) AddInput(input chan lp.CCMetric) { sm.input = input } @@ -128,11 +145,13 @@ func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error { return nil } +// Close finishes / stops the sink manager func (sm *sinkManager) Close() { cclog.ComponentDebug("SinkManager", "CLOSE") sm.done <- true } +// New creates a new initialized sink manager func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) { sm := &sinkManager{} err := sm.Init(wg, sinkConfigFile)