Locking in MetricCache

This commit is contained in:
Thomas Roehl 2022-02-10 15:21:26 +01:00
parent 442e512f2d
commit 184d60cc58

View File

@ -23,6 +23,7 @@ type metricCachePeriod struct {
type metricCache struct { type metricCache struct {
numPeriods int numPeriods int
curPeriod int curPeriod int
lock sync.Mutex
intervals []*metricCachePeriod intervals []*metricCachePeriod
wg *sync.WaitGroup wg *sync.WaitGroup
ticker mct.MultiChanTicker ticker mct.MultiChanTicker
@ -103,9 +104,11 @@ func (c *metricCache) Start() {
done() done()
return return
case tick := <-c.tickchan: case tick := <-c.tickchan:
c.lock.Lock()
old := rotate(tick) old := rotate(tick)
// Get the last period and evaluate aggregation metrics // Get the last period and evaluate aggregation metrics
starttime, endtime, metrics := c.GetPeriod(old) starttime, endtime, metrics := c.GetPeriod(old)
c.lock.Unlock()
if len(metrics) > 0 { if len(metrics) > 0 {
c.aggEngine.Eval(starttime, endtime, metrics) c.aggEngine.Eval(starttime, endtime, metrics)
} else { } else {
@ -123,6 +126,7 @@ func (c *metricCache) Start() {
// to avoid reallocations // to avoid reallocations
func (c *metricCache) Add(metric lp.CCMetric) { func (c *metricCache) Add(metric lp.CCMetric) {
if c.curPeriod >= 0 && c.curPeriod < c.numPeriods { if c.curPeriod >= 0 && c.curPeriod < c.numPeriods {
c.lock.Lock()
p := c.intervals[c.curPeriod] p := c.intervals[c.curPeriod]
if p.numMetrics < p.sizeMetrics { if p.numMetrics < p.sizeMetrics {
p.metrics[p.numMetrics] = metric p.metrics[p.numMetrics] = metric
@ -134,6 +138,7 @@ func (c *metricCache) Add(metric lp.CCMetric) {
p.sizeMetrics = p.sizeMetrics + 1 p.sizeMetrics = p.sizeMetrics + 1
p.stopstamp = metric.Time() p.stopstamp = metric.Time()
} }
c.lock.Unlock()
} }
} }
@ -149,16 +154,26 @@ func (c *metricCache) DeleteAggregation(name string) error {
// is the current one, index=1 the last interval and so on. Returns and empty array if a wrong index // is the current one, index=1 the last interval and so on. Returns and empty array if a wrong index
// is given (negative index, index larger than configured number of total intervals, ...) // is given (negative index, index larger than configured number of total intervals, ...)
func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric) { func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric) {
var start time.Time = time.Now()
var stop time.Time = time.Now()
var metrics []lp.CCMetric
if index >= 0 && index < c.numPeriods { if index >= 0 && index < c.numPeriods {
pindex := c.curPeriod - index pindex := c.curPeriod - index
if pindex < 0 { if pindex < 0 {
pindex = c.numPeriods - pindex pindex = c.numPeriods - pindex
} }
if pindex >= 0 && pindex < c.numPeriods { if pindex >= 0 && pindex < c.numPeriods {
return c.intervals[pindex].startstamp, c.intervals[pindex].stopstamp, c.intervals[pindex].metrics start = c.intervals[pindex].startstamp
stop = c.intervals[pindex].stopstamp
metrics = c.intervals[pindex].metrics
//return c.intervals[pindex].startstamp, c.intervals[pindex].stopstamp, c.intervals[pindex].metrics
} else {
metrics = make([]lp.CCMetric, 0)
} }
} else {
metrics = make([]lp.CCMetric, 0)
} }
return time.Now(), time.Now(), make([]lp.CCMetric, 0) return start, stop, metrics
} }
// Close finishes / stops the metric cache // Close finishes / stops the metric cache