Use pointer to metrics instead of forwarding full metric

This commit is contained in:
Thomas Roehl
2022-01-31 14:16:50 +01:00
parent 3e329c3324
commit 7f4de77de1
34 changed files with 199 additions and 180 deletions

View File

@@ -29,14 +29,14 @@ type metricAggregator struct {
functions []*metricAggregatorIntervalConfig
constants map[string]interface{}
language gval.Language
output chan lp.CCMetric
output chan *lp.CCMetric
}
type MetricAggregator interface {
AddAggregation(name, function, condition string, tags, meta map[string]string) error
DeleteAggregation(name string) error
Init(output chan lp.CCMetric) error
Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMetric)
Init(output chan *lp.CCMetric) error
Eval(starttime time.Time, endtime time.Time, metrics []*lp.CCMetric)
}
var metricCacheLanguage = gval.NewLanguage(
@@ -62,7 +62,7 @@ var metricCacheLanguage = gval.NewLanguage(
gval.Function("getCpuListOfType", getCpuListOfType),
)
func (c *metricAggregator) Init(output chan lp.CCMetric) error {
func (c *metricAggregator) Init(output chan *lp.CCMetric) error {
c.output = output
c.functions = make([]*metricAggregatorIntervalConfig, 0)
c.constants = make(map[string]interface{})
@@ -100,7 +100,7 @@ func (c *metricAggregator) Init(output chan lp.CCMetric) error {
return nil
}
func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMetric) {
func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics []*lp.CCMetric) {
vars := make(map[string]interface{})
for k, v := range c.constants {
vars[k] = v
@@ -110,9 +110,9 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics
for _, f := range c.functions {
cclog.ComponentDebug("MetricCache", "COLLECT", f.Name, "COND", f.Condition)
values := make([]float64, 0)
matches := make([]lp.CCMetric, 0)
matches := make([]*lp.CCMetric, 0)
for _, m := range metrics {
vars["metric"] = m
vars["metric"] = *m
//value, err := gval.Evaluate(f.Condition, vars, c.language)
value, err := f.gvalCond.EvalBool(context.Background(), vars)
if err != nil {
@@ -120,7 +120,7 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics
continue
}
if value {
v, valid := m.GetField("value")
v, valid := (*m).GetField("value")
if valid {
switch x := v.(type) {
case float64:
@@ -153,13 +153,14 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics
break
}
copy_tags := func(tags map[string]string, metrics []lp.CCMetric) map[string]string {
copy_tags := func(tags map[string]string, metrics []*lp.CCMetric) map[string]string {
out := make(map[string]string)
for key, value := range tags {
switch value {
case "<copy>":
for _, m := range metrics {
v, err := m.GetTag(key)
point := *m
v, err := point.GetTag(key)
if err {
out[key] = v
}
@@ -170,13 +171,14 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics
}
return out
}
copy_meta := func(meta map[string]string, metrics []lp.CCMetric) map[string]string {
copy_meta := func(meta map[string]string, metrics []*lp.CCMetric) map[string]string {
out := make(map[string]string)
for key, value := range meta {
switch value {
case "<copy>":
for _, m := range metrics {
v, err := m.GetMeta(key)
point := *m
v, err := point.GetMeta(key)
if err {
out[key] = v
}
@@ -210,7 +212,7 @@ func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics
}
cclog.ComponentDebug("MetricCache", "SEND", m)
select {
case c.output <- m:
case c.output <- &m:
default:
}
@@ -281,7 +283,7 @@ func (c *metricAggregator) AddFunction(name string, function func(args ...interf
c.language = gval.NewLanguage(c.language, gval.Function(name, function))
}
func NewAggregator(output chan lp.CCMetric) (MetricAggregator, error) {
func NewAggregator(output chan *lp.CCMetric) (MetricAggregator, error) {
a := new(metricAggregator)
err := a.Init(output)
if err != nil {

View File

@@ -15,7 +15,7 @@ type metricCachePeriod struct {
stopstamp time.Time
numMetrics int
sizeMetrics int
metrics []lp.CCMetric
metrics []*lp.CCMetric
}
// Metric cache data structure
@@ -27,21 +27,21 @@ type metricCache struct {
ticker mct.MultiChanTicker
tickchan chan time.Time
done chan bool
output chan lp.CCMetric
output chan *lp.CCMetric
aggEngine MetricAggregator
}
type MetricCache interface {
Init(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error
Init(output chan *lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error
Start()
Add(metric lp.CCMetric)
GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric)
Add(metric *lp.CCMetric)
GetPeriod(index int) (time.Time, time.Time, []*lp.CCMetric)
AddAggregation(name, function, condition string, tags, meta map[string]string) error
DeleteAggregation(name string) error
Close()
}
func (c *metricCache) Init(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error {
func (c *metricCache) Init(output chan *lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error {
var err error = nil
c.done = make(chan bool)
c.wg = wg
@@ -53,7 +53,7 @@ func (c *metricCache) Init(output chan lp.CCMetric, ticker mct.MultiChanTicker,
p := new(metricCachePeriod)
p.numMetrics = 0
p.sizeMetrics = 0
p.metrics = make([]lp.CCMetric, 0)
p.metrics = make([]*lp.CCMetric, 0)
c.intervals = append(c.intervals, p)
}
@@ -120,18 +120,18 @@ func (c *metricCache) Start() {
// Add a metric to the cache. The interval is defined by the global timer (rotate() in Start())
// The intervals list is used as round-robin buffer and the metric list grows dynamically and
// to avoid reallocations
func (c *metricCache) Add(metric lp.CCMetric) {
func (c *metricCache) Add(metric *lp.CCMetric) {
if c.curPeriod >= 0 && c.curPeriod < c.numPeriods {
p := c.intervals[c.curPeriod]
if p.numMetrics < p.sizeMetrics {
p.metrics[p.numMetrics] = metric
p.numMetrics = p.numMetrics + 1
p.stopstamp = metric.Time()
p.stopstamp = (*metric).Time()
} else {
p.metrics = append(p.metrics, metric)
p.numMetrics = p.numMetrics + 1
p.sizeMetrics = p.sizeMetrics + 1
p.stopstamp = metric.Time()
p.stopstamp = (*metric).Time()
}
}
}
@@ -147,7 +147,7 @@ func (c *metricCache) DeleteAggregation(name string) error {
// Get all metrics of a interval. The index is the difference to the current interval, so index=0
// is the current one, index=1 the last interval and so on. Returns and empty array if a wrong index
// is given (negative index, index larger than configured number of total intervals, ...)
func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric) {
func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []*lp.CCMetric) {
if index >= 0 && index < c.numPeriods {
pindex := c.curPeriod - index
if pindex < 0 {
@@ -157,7 +157,7 @@ func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric)
return c.intervals[pindex].startstamp, c.intervals[pindex].stopstamp, c.intervals[pindex].metrics
}
}
return time.Now(), time.Now(), make([]lp.CCMetric, 0)
return time.Now(), time.Now(), make([]*lp.CCMetric, 0)
}
// Close finishes / stops the metric cache
@@ -166,7 +166,7 @@ func (c *metricCache) Close() {
c.done <- true
}
func NewCache(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) (MetricCache, error) {
func NewCache(output chan *lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) (MetricCache, error) {
c := new(metricCache)
err := c.Init(output, ticker, wg, numPeriods)
if err != nil {

View File

@@ -33,10 +33,10 @@ type metricRouterConfig struct {
// Metric router data structure
type metricRouter struct {
hostname string // Hostname used in tags
coll_input chan lp.CCMetric // Input channel from CollectorManager
recv_input chan lp.CCMetric // Input channel from ReceiveManager
cache_input chan lp.CCMetric // Input channel from MetricCache
outputs []chan lp.CCMetric // List of all output channels
coll_input chan *lp.CCMetric // Input channel from CollectorManager
recv_input chan *lp.CCMetric // Input channel from ReceiveManager
cache_input chan *lp.CCMetric // Input channel from MetricCache
outputs []chan *lp.CCMetric // List of all output channels
done chan bool // channel to finish / stop metric router
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
timestamp time.Time // timestamp periodically updated by ticker each interval
@@ -50,9 +50,9 @@ type metricRouter struct {
// MetricRouter access functions
type MetricRouter interface {
Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error
AddCollectorInput(input chan lp.CCMetric)
AddReceiverInput(input chan lp.CCMetric)
AddOutput(output chan lp.CCMetric)
AddCollectorInput(input chan *lp.CCMetric)
AddReceiverInput(input chan *lp.CCMetric)
AddOutput(output chan *lp.CCMetric)
Start()
Close()
}
@@ -64,9 +64,9 @@ type MetricRouter interface {
// * ticker (from variable ticker)
// * configuration (read from config file in variable routerConfigFile)
func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error {
r.outputs = make([]chan lp.CCMetric, 0)
r.outputs = make([]chan *lp.CCMetric, 0)
r.done = make(chan bool)
r.cache_input = make(chan lp.CCMetric)
r.cache_input = make(chan *lp.CCMetric)
r.wg = wg
r.ticker = ticker
@@ -131,7 +131,8 @@ func (r *metricRouter) StartTimer() {
}
// EvalCondition evaluates condition cond for metric data from point
func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, error) {
func (r *metricRouter) EvalCondition(cond string, pptr *lp.CCMetric) (bool, error) {
point := *pptr
expression, err := govaluate.NewEvaluableExpression(cond)
if err != nil {
cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error())
@@ -162,7 +163,7 @@ func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, erro
}
// DoAddTags adds a tag when condition is fullfiled
func (r *metricRouter) DoAddTags(point lp.CCMetric) {
func (r *metricRouter) DoAddTags(point *lp.CCMetric) {
for _, m := range r.config.AddTags {
var conditionMatches bool
@@ -177,13 +178,13 @@ func (r *metricRouter) DoAddTags(point lp.CCMetric) {
}
}
if conditionMatches {
point.AddTag(m.Key, m.Value)
(*point).AddTag(m.Key, m.Value)
}
}
}
// DoDelTags removes a tag when condition is fullfiled
func (r *metricRouter) DoDelTags(point lp.CCMetric) {
func (r *metricRouter) DoDelTags(point *lp.CCMetric) {
for _, m := range r.config.DelTags {
var conditionMatches bool
@@ -198,7 +199,7 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) {
}
}
if conditionMatches {
point.RemoveTag(m.Key)
(*point).RemoveTag(m.Key)
}
}
}
@@ -220,8 +221,8 @@ func (r *metricRouter) Start() {
// Forward takes a received metric, adds or deletes tags
// and forwards it to the output channels
forward := func(point lp.CCMetric) {
cclog.ComponentDebug("MetricRouter", "FORWARD", point)
forward := func(point *lp.CCMetric) {
cclog.ComponentDebug("MetricRouter", "FORWARD", *point)
r.DoAddTags(point)
r.DoDelTags(point)
for _, o := range r.outputs {
@@ -243,9 +244,9 @@ func (r *metricRouter) Start() {
case p := <-r.coll_input:
// receive from metric collector
p.AddTag("hostname", r.hostname)
(*p).AddTag("hostname", r.hostname)
if r.config.IntervalStamp {
p.SetTime(r.timestamp)
(*p).SetTime(r.timestamp)
}
forward(p)
r.cache.Add(p)
@@ -253,13 +254,13 @@ func (r *metricRouter) Start() {
case p := <-r.recv_input:
// receive from receive manager
if r.config.IntervalStamp {
p.SetTime(r.timestamp)
(*p).SetTime(r.timestamp)
}
forward(p)
case p := <-r.cache_input:
// receive from metric collector
p.AddTag("hostname", r.hostname)
// receive from metric cache and aggregator
(*p).AddTag("hostname", r.hostname)
forward(p)
}
}
@@ -268,17 +269,17 @@ func (r *metricRouter) Start() {
}
// AddCollectorInput adds a channel between metric collector and metric router
func (r *metricRouter) AddCollectorInput(input chan lp.CCMetric) {
func (r *metricRouter) AddCollectorInput(input chan *lp.CCMetric) {
r.coll_input = input
}
// AddReceiverInput adds a channel between metric receiver and metric router
func (r *metricRouter) AddReceiverInput(input chan lp.CCMetric) {
func (r *metricRouter) AddReceiverInput(input chan *lp.CCMetric) {
r.recv_input = input
}
// AddOutput adds a output channel to the metric router
func (r *metricRouter) AddOutput(output chan lp.CCMetric) {
func (r *metricRouter) AddOutput(output chan *lp.CCMetric) {
r.outputs = append(r.outputs, output)
}