Fix for interval_timestamp option

This commit is contained in:
Thomas Roehl 2022-04-04 02:26:04 +02:00
parent 28348bd108
commit 4d5b1adbc8

View File

@ -48,7 +48,6 @@ type metricRouter struct {
done chan bool // channel to finish / stop metric router done chan bool // channel to finish / stop metric router
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
timestamp time.Time // timestamp periodically updated by ticker each interval timestamp time.Time // timestamp periodically updated by ticker each interval
timerdone chan bool // channel to finish / stop timestamp updater
ticker mct.MultiChanTicker // periodically ticking once each interval ticker mct.MultiChanTicker // periodically ticking once each interval
config metricRouterConfig // json encoded config for metric router config metricRouterConfig // json encoded config for metric router
cache MetricCache // pointer to MetricCache cache MetricCache // pointer to MetricCache
@ -124,29 +123,6 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
return nil return nil
} }
// StartTimer starts a timer which updates timestamp periodically
func (r *metricRouter) StartTimer() {
m := make(chan time.Time)
r.ticker.AddChannel(m)
r.timerdone = make(chan bool)
r.wg.Add(1)
go func() {
defer r.wg.Done()
for {
select {
case <-r.timerdone:
close(r.timerdone)
cclog.ComponentDebug("MetricRouter", "TIMER DONE")
return
case t := <-m:
r.timestamp = t
}
}
}()
cclog.ComponentDebug("MetricRouter", "TIMER START")
}
func getParamMap(point lp.CCMetric) map[string]interface{} { func getParamMap(point lp.CCMetric) map[string]interface{} {
params := make(map[string]interface{}) params := make(map[string]interface{})
params["metric"] = point params["metric"] = point
@ -235,8 +211,9 @@ func (r *metricRouter) dropMetric(point lp.CCMetric) bool {
func (r *metricRouter) Start() { func (r *metricRouter) Start() {
// start timer if configured // start timer if configured
r.timestamp = time.Now() r.timestamp = time.Now()
timeChan := make(chan time.Time)
if r.config.IntervalStamp { if r.config.IntervalStamp {
r.StartTimer() r.ticker.AddChannel(timeChan)
} }
// Router manager is done // Router manager is done
@ -316,6 +293,9 @@ func (r *metricRouter) Start() {
done() done()
return return
case timestamp := <-timeChan:
r.timestamp = timestamp
case p := <-r.coll_input: case p := <-r.coll_input:
coll_forward(p) coll_forward(p)
for i := 0; len(r.coll_input) > 0 && i < (r.maxForward-1); i++ { for i := 0; len(r.coll_input) > 0 && i < (r.maxForward-1); i++ {
@ -361,14 +341,6 @@ func (r *metricRouter) Close() {
// wait for close of channel r.done // wait for close of channel r.done
<-r.done <-r.done
// stop timer
if r.config.IntervalStamp {
cclog.ComponentDebug("MetricRouter", "TIMER CLOSE")
r.timerdone <- true
// wait for close of channel r.timerdone
<-r.timerdone
}
// stop metric cache // stop metric cache
if r.config.NumCacheIntervals > 0 { if r.config.NumCacheIntervals > 0 {
cclog.ComponentDebug("MetricRouter", "CACHE CLOSE") cclog.ComponentDebug("MetricRouter", "CACHE CLOSE")