From 247fb23de18d58b54f88246fbea4e019c99ae193 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Mon, 14 Feb 2022 18:12:50 +0100 Subject: [PATCH] Try to operate on multiple metrics if channels if filled --- internal/metricRouter/metricRouter.go | 73 +++++++++++++++++++-------- sinks/sinkManager.go | 31 ++++++++---- 2 files changed, 73 insertions(+), 31 deletions(-) diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 5b254f8..c5ff0bd 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -14,6 +14,8 @@ import ( mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" ) +const ROUTER_MAX_FORWARD = 50 + // Metric router tag configuration type metricRouterTagConfig struct { Key string `json:"key"` // Tag name @@ -49,6 +51,7 @@ type metricRouter struct { config metricRouterConfig // json encoded config for metric router cache MetricCache // pointer to MetricCache cachewg sync.WaitGroup // wait group for MetricCache + maxForward int // number of metrics to forward maximally in one iteration } // MetricRouter access functions @@ -73,6 +76,7 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout r.cache_input = make(chan lp.CCMetric) r.wg = wg r.ticker = ticker + r.maxForward = ROUTER_MAX_FORWARD // Set hostname hostname, err := os.Hostname() @@ -242,6 +246,43 @@ func (r *metricRouter) Start() { } } + // Foward message received from collector channel + coll_forward := func(p lp.CCMetric) { + // receive from metric collector + p.AddTag("hostname", r.hostname) + if r.config.IntervalStamp { + p.SetTime(r.timestamp) + } + if !r.dropMetric(p) { + forward(p) + } + // even if the metric is dropped, it is stored in the cache for + // aggregations + if r.config.NumCacheIntervals > 0 { + r.cache.Add(p) + } + } + + // Foward message received from receivers channel + recv_forward := func(p lp.CCMetric) { + // receive from receive manager + if r.config.IntervalStamp { + p.SetTime(r.timestamp) + } + if !r.dropMetric(p) { + forward(p) + } + } + + // Foward message received from cache channel + cache_forward := func(p lp.CCMetric) { + // receive from metric collector + if !r.dropMetric(p) { + p.AddTag("hostname", r.hostname) + forward(p) + } + } + // Start Metric Cache if r.config.NumCacheIntervals > 0 { r.cache.Start() @@ -250,6 +291,7 @@ func (r *metricRouter) Start() { r.wg.Add(1) go func() { defer r.wg.Done() + for { select { case <-r.done: @@ -257,34 +299,21 @@ func (r *metricRouter) Start() { return case p := <-r.coll_input: - // receive from metric collector - p.AddTag("hostname", r.hostname) - if r.config.IntervalStamp { - p.SetTime(r.timestamp) - } - if !r.dropMetric(p) { - forward(p) - } - // even if the metric is dropped, it is stored in the cache for - // aggregations - if r.config.NumCacheIntervals > 0 { - r.cache.Add(p) + coll_forward(p) + for i := 0; len(r.coll_input) > 0 && i < r.maxForward; i++ { + coll_forward(<-r.coll_input) } case p := <-r.recv_input: - // receive from receive manager - if r.config.IntervalStamp { - p.SetTime(r.timestamp) - } - if !r.dropMetric(p) { - forward(p) + recv_forward(p) + for i := 0; len(r.recv_input) > 0 && i < r.maxForward; i++ { + recv_forward(<-r.recv_input) } case p := <-r.cache_input: - // receive from metric collector - if !r.dropMetric(p) { - p.AddTag("hostname", r.hostname) - forward(p) + cache_forward(p) + for i := 0; len(r.cache_input) > 0 && i < r.maxForward; i++ { + cache_forward(<-r.cache_input) } } } diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index ff6e01d..e2d01a7 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -10,6 +10,8 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) +const SINK_MAX_FORWARD = 50 + // Map of all available sinks var AvailableSinks = map[string]Sink{ "influxdb": new(InfluxSink), @@ -22,10 +24,11 @@ var AvailableSinks = map[string]Sink{ // Metric collector manager data structure type sinkManager struct { - input chan lp.CCMetric // input channel - done chan bool // channel to finish / stop metric sink manager - wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector - sinks map[string]Sink // Mapping sink name to sink + input chan lp.CCMetric // input channel + done chan bool // channel to finish / stop metric sink manager + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + sinks map[string]Sink // Mapping sink name to sink + maxForward int // number of metrics to write maximally in one iteration } // Sink manager access functions @@ -45,6 +48,7 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { sm.done = make(chan bool) sm.wg = wg sm.sinks = make(map[string]Sink, 0) + sm.maxForward = SINK_MAX_FORWARD if len(sinkConfigFile) == 0 { return nil @@ -97,12 +101,8 @@ func (sm *sinkManager) Start() { } for { - select { - case <-sm.done: - done() - return - case p := <-sm.input: + toTheSinks := func(p lp.CCMetric) { // Send received metric to all outputs cclog.ComponentDebug("SinkManager", "WRITE", p) for _, s := range sm.sinks { @@ -111,6 +111,19 @@ func (sm *sinkManager) Start() { } } } + + select { + case <-sm.done: + done() + return + + case p := <-sm.input: + toTheSinks(p) + for i := 0; len(sm.input) > 0 && i < sm.maxForward; i++ { + p := <-sm.input + toTheSinks(p) + } + } } }()