Try to operate on multiple metrics if channels if filled

This commit is contained in:
Thomas Roehl 2022-02-14 18:12:50 +01:00
parent 342f09fabf
commit 247fb23de1
2 changed files with 73 additions and 31 deletions

View File

@ -14,6 +14,8 @@ import (
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
) )
const ROUTER_MAX_FORWARD = 50
// Metric router tag configuration // Metric router tag configuration
type metricRouterTagConfig struct { type metricRouterTagConfig struct {
Key string `json:"key"` // Tag name Key string `json:"key"` // Tag name
@ -49,6 +51,7 @@ type metricRouter struct {
config metricRouterConfig // json encoded config for metric router config metricRouterConfig // json encoded config for metric router
cache MetricCache // pointer to MetricCache cache MetricCache // pointer to MetricCache
cachewg sync.WaitGroup // wait group for MetricCache cachewg sync.WaitGroup // wait group for MetricCache
maxForward int // number of metrics to forward maximally in one iteration
} }
// MetricRouter access functions // MetricRouter access functions
@ -73,6 +76,7 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
r.cache_input = make(chan lp.CCMetric) r.cache_input = make(chan lp.CCMetric)
r.wg = wg r.wg = wg
r.ticker = ticker r.ticker = ticker
r.maxForward = ROUTER_MAX_FORWARD
// Set hostname // Set hostname
hostname, err := os.Hostname() hostname, err := os.Hostname()
@ -242,6 +246,43 @@ func (r *metricRouter) Start() {
} }
} }
// Foward message received from collector channel
coll_forward := func(p lp.CCMetric) {
// receive from metric collector
p.AddTag("hostname", r.hostname)
if r.config.IntervalStamp {
p.SetTime(r.timestamp)
}
if !r.dropMetric(p) {
forward(p)
}
// even if the metric is dropped, it is stored in the cache for
// aggregations
if r.config.NumCacheIntervals > 0 {
r.cache.Add(p)
}
}
// Foward message received from receivers channel
recv_forward := func(p lp.CCMetric) {
// receive from receive manager
if r.config.IntervalStamp {
p.SetTime(r.timestamp)
}
if !r.dropMetric(p) {
forward(p)
}
}
// Foward message received from cache channel
cache_forward := func(p lp.CCMetric) {
// receive from metric collector
if !r.dropMetric(p) {
p.AddTag("hostname", r.hostname)
forward(p)
}
}
// Start Metric Cache // Start Metric Cache
if r.config.NumCacheIntervals > 0 { if r.config.NumCacheIntervals > 0 {
r.cache.Start() r.cache.Start()
@ -250,6 +291,7 @@ func (r *metricRouter) Start() {
r.wg.Add(1) r.wg.Add(1)
go func() { go func() {
defer r.wg.Done() defer r.wg.Done()
for { for {
select { select {
case <-r.done: case <-r.done:
@ -257,34 +299,21 @@ func (r *metricRouter) Start() {
return return
case p := <-r.coll_input: case p := <-r.coll_input:
// receive from metric collector coll_forward(p)
p.AddTag("hostname", r.hostname) for i := 0; len(r.coll_input) > 0 && i < r.maxForward; i++ {
if r.config.IntervalStamp { coll_forward(<-r.coll_input)
p.SetTime(r.timestamp)
}
if !r.dropMetric(p) {
forward(p)
}
// even if the metric is dropped, it is stored in the cache for
// aggregations
if r.config.NumCacheIntervals > 0 {
r.cache.Add(p)
} }
case p := <-r.recv_input: case p := <-r.recv_input:
// receive from receive manager recv_forward(p)
if r.config.IntervalStamp { for i := 0; len(r.recv_input) > 0 && i < r.maxForward; i++ {
p.SetTime(r.timestamp) recv_forward(<-r.recv_input)
}
if !r.dropMetric(p) {
forward(p)
} }
case p := <-r.cache_input: case p := <-r.cache_input:
// receive from metric collector cache_forward(p)
if !r.dropMetric(p) { for i := 0; len(r.cache_input) > 0 && i < r.maxForward; i++ {
p.AddTag("hostname", r.hostname) cache_forward(<-r.cache_input)
forward(p)
} }
} }
} }

View File

@ -10,6 +10,8 @@ import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
const SINK_MAX_FORWARD = 50
// Map of all available sinks // Map of all available sinks
var AvailableSinks = map[string]Sink{ var AvailableSinks = map[string]Sink{
"influxdb": new(InfluxSink), "influxdb": new(InfluxSink),
@ -22,10 +24,11 @@ var AvailableSinks = map[string]Sink{
// Metric collector manager data structure // Metric collector manager data structure
type sinkManager struct { type sinkManager struct {
input chan lp.CCMetric // input channel input chan lp.CCMetric // input channel
done chan bool // channel to finish / stop metric sink manager done chan bool // channel to finish / stop metric sink manager
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
sinks map[string]Sink // Mapping sink name to sink sinks map[string]Sink // Mapping sink name to sink
maxForward int // number of metrics to write maximally in one iteration
} }
// Sink manager access functions // Sink manager access functions
@ -45,6 +48,7 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
sm.done = make(chan bool) sm.done = make(chan bool)
sm.wg = wg sm.wg = wg
sm.sinks = make(map[string]Sink, 0) sm.sinks = make(map[string]Sink, 0)
sm.maxForward = SINK_MAX_FORWARD
if len(sinkConfigFile) == 0 { if len(sinkConfigFile) == 0 {
return nil return nil
@ -97,12 +101,8 @@ func (sm *sinkManager) Start() {
} }
for { for {
select {
case <-sm.done:
done()
return
case p := <-sm.input: toTheSinks := func(p lp.CCMetric) {
// Send received metric to all outputs // Send received metric to all outputs
cclog.ComponentDebug("SinkManager", "WRITE", p) cclog.ComponentDebug("SinkManager", "WRITE", p)
for _, s := range sm.sinks { for _, s := range sm.sinks {
@ -111,6 +111,19 @@ func (sm *sinkManager) Start() {
} }
} }
} }
select {
case <-sm.done:
done()
return
case p := <-sm.input:
toTheSinks(p)
for i := 0; len(sm.input) > 0 && i < sm.maxForward; i++ {
p := <-sm.input
toTheSinks(p)
}
}
} }
}() }()