mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-01-24 12:59:06 +01:00
7840de7b82
* Add cpu_used (all-cpu_idle) to CpustatCollector * Update cc-metric-collector.init * Allow selection of timestamp precision in HttpSink * Add comment about precision requirement for cc-metric-store * Fix for API changes in gofish@v0.15.0 * Update requirements to latest version * Read sensors through redfish * Update golang toolchain to 1.21 * Remove stray error check * Update main config in configuration.md * Update Release action to use golang 1.22 stable release, no golang RPMs anymore * Update runonce action to use golang 1.22 stable release, no golang RPMs anymore * Update README.md Use right JSON type in configuration * Update sink's README * Test whether ipmitool or ipmi-sensors can be executed without errors * Little fixes to the prometheus sink (#115) * Add uint64 to float64 cast option * Add prometheus sink to the list of available sinks * Add aggregated counters by gpu for nvlink errors --------- Co-authored-by: Michael Schwarz <schwarz@uni-paderborn.de> * Ccmessage migration (#119) * Add cpu_used (all-cpu_idle) to CpustatCollector * Update cc-metric-collector.init * Allow selection of timestamp precision in HttpSink * Add comment about precision requirement for cc-metric-store * Fix for API changes in gofish@v0.15.0 * Update requirements to latest version * Read sensors through redfish * Update golang toolchain to 1.21 * Remove stray error check * Update main config in configuration.md * Update Release action to use golang 1.22 stable release, no golang RPMs anymore * Update runonce action to use golang 1.22 stable release, no golang RPMs anymore * Switch to CCMessage for all files. --------- Co-authored-by: Holger Obermaier <Holger.Obermaier@kit.edu> Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> * Switch to ccmessage also for latest additions in nvidiaMetric * New Message processor (#118) * Add cpu_used (all-cpu_idle) to CpustatCollector * Update cc-metric-collector.init * Allow selection of timestamp precision in HttpSink * Add comment about precision requirement for cc-metric-store * Fix for API changes in gofish@v0.15.0 * Update requirements to latest version * Read sensors through redfish * Update golang toolchain to 1.21 * Remove stray error check * Update main config in configuration.md * Update Release action to use golang 1.22 stable release, no golang RPMs anymore * Update runonce action to use golang 1.22 stable release, no golang RPMs anymore * New message processor to check whether a message should be dropped or manipulate it in flight * Create a copy of message before manipulation --------- Co-authored-by: Holger Obermaier <Holger.Obermaier@kit.edu> Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> * Update collector's Makefile and go.mod/sum files * Use message processor in router, all sinks and all receivers * Add support for credential file (NKEY) to NATS sink and receiver * Fix JSON keys in message processor configuration * Update docs for message processor, router and the default router config file * Add link to expr syntax and fix regex matching docs * Update sample collectors * Minor style change in collector manager * Some helpers for ccTopology * LIKWID collector: write log owner change only once * Fix for metrics without units and reduce debugging messages for messageProcessor * Use shorted hostname for hostname added by router * Define default port for NATS * CPUstat collector: only add unit for applicable metrics * Add precision option to all sinks using Influx's encoder * Add message processor to all sink documentation * Add units to documentation of cpustat collector --------- Co-authored-by: Holger Obermaier <Holger.Obermaier@kit.edu> Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Co-authored-by: oscarminus <me@oscarminus.de> Co-authored-by: Michael Schwarz <schwarz@uni-paderborn.de>
193 lines
5.4 KiB
Go
193 lines
5.4 KiB
Go
package metricRouter
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
|
|
|
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
|
|
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
|
mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker"
|
|
)
|
|
|
|
type metricCachePeriod struct {
|
|
startstamp time.Time
|
|
stopstamp time.Time
|
|
numMetrics int
|
|
sizeMetrics int
|
|
metrics []lp.CCMessage
|
|
}
|
|
|
|
// Metric cache data structure
|
|
type metricCache struct {
|
|
numPeriods int
|
|
curPeriod int
|
|
lock sync.Mutex
|
|
intervals []*metricCachePeriod
|
|
wg *sync.WaitGroup
|
|
ticker mct.MultiChanTicker
|
|
tickchan chan time.Time
|
|
done chan bool
|
|
output chan lp.CCMessage
|
|
aggEngine agg.MetricAggregator
|
|
}
|
|
|
|
type MetricCache interface {
|
|
Init(output chan lp.CCMessage, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error
|
|
Start()
|
|
Add(metric lp.CCMessage)
|
|
GetPeriod(index int) (time.Time, time.Time, []lp.CCMessage)
|
|
AddAggregation(name, function, condition string, tags, meta map[string]string) error
|
|
DeleteAggregation(name string) error
|
|
Close()
|
|
}
|
|
|
|
func (c *metricCache) Init(output chan lp.CCMessage, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error {
|
|
var err error = nil
|
|
c.done = make(chan bool)
|
|
c.wg = wg
|
|
c.ticker = ticker
|
|
c.numPeriods = numPeriods
|
|
c.output = output
|
|
c.intervals = make([]*metricCachePeriod, 0)
|
|
for i := 0; i < c.numPeriods+1; i++ {
|
|
p := new(metricCachePeriod)
|
|
p.numMetrics = 0
|
|
p.sizeMetrics = 0
|
|
p.metrics = make([]lp.CCMessage, 0)
|
|
c.intervals = append(c.intervals, p)
|
|
}
|
|
|
|
// Create a new aggregation engine. No separate goroutine at the moment
|
|
// The code is executed by the MetricCache goroutine
|
|
c.aggEngine, err = agg.NewAggregator(c.output)
|
|
if err != nil {
|
|
cclog.ComponentError("MetricCache", "Cannot create aggregator")
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Start starts the metric cache
|
|
func (c *metricCache) Start() {
|
|
|
|
c.tickchan = make(chan time.Time)
|
|
c.ticker.AddChannel(c.tickchan)
|
|
// Router cache is done
|
|
done := func() {
|
|
cclog.ComponentDebug("MetricCache", "DONE")
|
|
close(c.done)
|
|
}
|
|
|
|
// Rotate cache interval
|
|
rotate := func(timestamp time.Time) int {
|
|
oldPeriod := c.curPeriod
|
|
c.curPeriod = oldPeriod + 1
|
|
if c.curPeriod >= c.numPeriods {
|
|
c.curPeriod = 0
|
|
}
|
|
c.intervals[oldPeriod].numMetrics = 0
|
|
c.intervals[oldPeriod].stopstamp = timestamp
|
|
c.intervals[c.curPeriod].startstamp = timestamp
|
|
c.intervals[c.curPeriod].stopstamp = timestamp
|
|
return oldPeriod
|
|
}
|
|
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
for {
|
|
select {
|
|
case <-c.done:
|
|
done()
|
|
return
|
|
case tick := <-c.tickchan:
|
|
c.lock.Lock()
|
|
old := rotate(tick)
|
|
// Get the last period and evaluate aggregation metrics
|
|
starttime, endtime, metrics := c.GetPeriod(old)
|
|
c.lock.Unlock()
|
|
if len(metrics) > 0 {
|
|
c.aggEngine.Eval(starttime, endtime, metrics)
|
|
} else {
|
|
// This message is also printed in the first interval after startup
|
|
cclog.ComponentDebug("MetricCache", "EMPTY INTERVAL?")
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
cclog.ComponentDebug("MetricCache", "START")
|
|
}
|
|
|
|
// Add a metric to the cache. The interval is defined by the global timer (rotate() in Start())
|
|
// The intervals list is used as round-robin buffer and the metric list grows dynamically and
|
|
// to avoid reallocations
|
|
func (c *metricCache) Add(metric lp.CCMessage) {
|
|
if c.curPeriod >= 0 && c.curPeriod < c.numPeriods {
|
|
c.lock.Lock()
|
|
p := c.intervals[c.curPeriod]
|
|
if p.numMetrics < p.sizeMetrics {
|
|
p.metrics[p.numMetrics] = metric
|
|
p.numMetrics = p.numMetrics + 1
|
|
p.stopstamp = metric.Time()
|
|
} else {
|
|
p.metrics = append(p.metrics, metric)
|
|
p.numMetrics = p.numMetrics + 1
|
|
p.sizeMetrics = p.sizeMetrics + 1
|
|
p.stopstamp = metric.Time()
|
|
}
|
|
c.lock.Unlock()
|
|
}
|
|
}
|
|
|
|
func (c *metricCache) AddAggregation(name, function, condition string, tags, meta map[string]string) error {
|
|
return c.aggEngine.AddAggregation(name, function, condition, tags, meta)
|
|
}
|
|
|
|
func (c *metricCache) DeleteAggregation(name string) error {
|
|
return c.aggEngine.DeleteAggregation(name)
|
|
}
|
|
|
|
// Get all metrics of a interval. The index is the difference to the current interval, so index=0
|
|
// is the current one, index=1 the last interval and so on. Returns and empty array if a wrong index
|
|
// is given (negative index, index larger than configured number of total intervals, ...)
|
|
func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMessage) {
|
|
var start time.Time = time.Now()
|
|
var stop time.Time = time.Now()
|
|
var metrics []lp.CCMessage
|
|
if index >= 0 && index < c.numPeriods {
|
|
pindex := c.curPeriod - index
|
|
if pindex < 0 {
|
|
pindex = c.numPeriods - pindex
|
|
}
|
|
if pindex >= 0 && pindex < c.numPeriods {
|
|
start = c.intervals[pindex].startstamp
|
|
stop = c.intervals[pindex].stopstamp
|
|
metrics = c.intervals[pindex].metrics
|
|
//return c.intervals[pindex].startstamp, c.intervals[pindex].stopstamp, c.intervals[pindex].metrics
|
|
} else {
|
|
metrics = make([]lp.CCMessage, 0)
|
|
}
|
|
} else {
|
|
metrics = make([]lp.CCMessage, 0)
|
|
}
|
|
return start, stop, metrics
|
|
}
|
|
|
|
// Close finishes / stops the metric cache
|
|
func (c *metricCache) Close() {
|
|
cclog.ComponentDebug("MetricCache", "CLOSE")
|
|
c.done <- true
|
|
}
|
|
|
|
func NewCache(output chan lp.CCMessage, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) (MetricCache, error) {
|
|
c := new(metricCache)
|
|
err := c.Init(output, ticker, wg, numPeriods)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return c, err
|
|
}
|