mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-07-21 20:31:41 +02:00
Compare commits
16 Commits
http_stats
...
v0.5.1
Author | SHA1 | Date | |
---|---|---|---|
|
36dd440864 | ||
|
7b098e0b1b | ||
|
229a57b16a | ||
|
70a9530aba | ||
|
2f0b6057ca | ||
|
69f7c19659 | ||
|
ecdb4c1bcf | ||
|
4d5b1adbc8 | ||
|
4763733d8d | ||
|
16e898ecca | ||
|
4851382ad7 | ||
|
3f76947f54 | ||
|
3157386b3e | ||
|
ff08eaeb43 | ||
|
64c41be34c | ||
|
f4af520b2a |
@@ -329,7 +329,11 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv
|
|||||||
gctr := C.GoString(counter)
|
gctr := C.GoString(counter)
|
||||||
for _, tid := range m.cpu2tid {
|
for _, tid := range m.cpu2tid {
|
||||||
res := C.perfmon_getLastResult(evset.gid, C.int(eidx), C.int(tid))
|
res := C.perfmon_getLastResult(evset.gid, C.int(eidx), C.int(tid))
|
||||||
evset.results[tid][gctr] = float64(res)
|
fres := float64(res)
|
||||||
|
if m.config.InvalidToZero && (math.IsNaN(fres) || math.IsInf(fres, 0)) {
|
||||||
|
fres = 0.0
|
||||||
|
}
|
||||||
|
evset.results[tid][gctr] = fres
|
||||||
evset.results[tid]["time"] = interval.Seconds()
|
evset.results[tid]["time"] = interval.Seconds()
|
||||||
evset.results[tid]["inverseClock"] = invClock
|
evset.results[tid]["inverseClock"] = invClock
|
||||||
}
|
}
|
||||||
@@ -348,15 +352,12 @@ func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interv
|
|||||||
value, err := agg.EvalFloat64Condition(metric.Calc, evset.results[tid])
|
value, err := agg.EvalFloat64Condition(metric.Calc, evset.results[tid])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
|
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
|
||||||
continue
|
value = 0.0
|
||||||
|
}
|
||||||
|
if m.config.InvalidToZero && (math.IsNaN(value) || math.IsInf(value, 0)) {
|
||||||
|
value = 0.0
|
||||||
}
|
}
|
||||||
evset.metrics[tid][metric.Name] = value
|
evset.metrics[tid][metric.Name] = value
|
||||||
if m.config.InvalidToZero && math.IsNaN(value) {
|
|
||||||
value = 0.0
|
|
||||||
}
|
|
||||||
if m.config.InvalidToZero && math.IsInf(value, 0) {
|
|
||||||
value = 0.0
|
|
||||||
}
|
|
||||||
// Now we have the result, send it with the proper tags
|
// Now we have the result, send it with the proper tags
|
||||||
if !math.IsNaN(value) {
|
if !math.IsNaN(value) {
|
||||||
if metric.Publish {
|
if metric.Publish {
|
||||||
@@ -400,15 +401,12 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
|
|||||||
value, err := agg.EvalFloat64Condition(metric.Calc, params)
|
value, err := agg.EvalFloat64Condition(metric.Calc, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
|
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
|
||||||
continue
|
value = 0.0
|
||||||
|
}
|
||||||
|
if m.config.InvalidToZero && (math.IsNaN(value) || math.IsInf(value, 0)) {
|
||||||
|
value = 0.0
|
||||||
}
|
}
|
||||||
m.gmresults[tid][metric.Name] = value
|
m.gmresults[tid][metric.Name] = value
|
||||||
if m.config.InvalidToZero && math.IsNaN(value) {
|
|
||||||
value = 0.0
|
|
||||||
}
|
|
||||||
if m.config.InvalidToZero && math.IsInf(value, 0) {
|
|
||||||
value = 0.0
|
|
||||||
}
|
|
||||||
// Now we have the result, send it with the proper tags
|
// Now we have the result, send it with the proper tags
|
||||||
if !math.IsNaN(value) {
|
if !math.IsNaN(value) {
|
||||||
if metric.Publish {
|
if metric.Publish {
|
||||||
|
@@ -48,7 +48,6 @@ type metricRouter struct {
|
|||||||
done chan bool // channel to finish / stop metric router
|
done chan bool // channel to finish / stop metric router
|
||||||
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
|
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
|
||||||
timestamp time.Time // timestamp periodically updated by ticker each interval
|
timestamp time.Time // timestamp periodically updated by ticker each interval
|
||||||
timerdone chan bool // channel to finish / stop timestamp updater
|
|
||||||
ticker mct.MultiChanTicker // periodically ticking once each interval
|
ticker mct.MultiChanTicker // periodically ticking once each interval
|
||||||
config metricRouterConfig // json encoded config for metric router
|
config metricRouterConfig // json encoded config for metric router
|
||||||
cache MetricCache // pointer to MetricCache
|
cache MetricCache // pointer to MetricCache
|
||||||
@@ -124,29 +123,6 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartTimer starts a timer which updates timestamp periodically
|
|
||||||
func (r *metricRouter) StartTimer() {
|
|
||||||
m := make(chan time.Time)
|
|
||||||
r.ticker.AddChannel(m)
|
|
||||||
r.timerdone = make(chan bool)
|
|
||||||
|
|
||||||
r.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer r.wg.Done()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-r.timerdone:
|
|
||||||
close(r.timerdone)
|
|
||||||
cclog.ComponentDebug("MetricRouter", "TIMER DONE")
|
|
||||||
return
|
|
||||||
case t := <-m:
|
|
||||||
r.timestamp = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
cclog.ComponentDebug("MetricRouter", "TIMER START")
|
|
||||||
}
|
|
||||||
|
|
||||||
func getParamMap(point lp.CCMetric) map[string]interface{} {
|
func getParamMap(point lp.CCMetric) map[string]interface{} {
|
||||||
params := make(map[string]interface{})
|
params := make(map[string]interface{})
|
||||||
params["metric"] = point
|
params["metric"] = point
|
||||||
@@ -235,8 +211,9 @@ func (r *metricRouter) dropMetric(point lp.CCMetric) bool {
|
|||||||
func (r *metricRouter) Start() {
|
func (r *metricRouter) Start() {
|
||||||
// start timer if configured
|
// start timer if configured
|
||||||
r.timestamp = time.Now()
|
r.timestamp = time.Now()
|
||||||
|
timeChan := make(chan time.Time)
|
||||||
if r.config.IntervalStamp {
|
if r.config.IntervalStamp {
|
||||||
r.StartTimer()
|
r.ticker.AddChannel(timeChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Router manager is done
|
// Router manager is done
|
||||||
@@ -316,6 +293,10 @@ func (r *metricRouter) Start() {
|
|||||||
done()
|
done()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
case timestamp := <-timeChan:
|
||||||
|
r.timestamp = timestamp
|
||||||
|
cclog.ComponentDebug("MetricRouter", "Update timestamp", r.timestamp.UnixNano())
|
||||||
|
|
||||||
case p := <-r.coll_input:
|
case p := <-r.coll_input:
|
||||||
coll_forward(p)
|
coll_forward(p)
|
||||||
for i := 0; len(r.coll_input) > 0 && i < (r.maxForward-1); i++ {
|
for i := 0; len(r.coll_input) > 0 && i < (r.maxForward-1); i++ {
|
||||||
@@ -361,14 +342,6 @@ func (r *metricRouter) Close() {
|
|||||||
// wait for close of channel r.done
|
// wait for close of channel r.done
|
||||||
<-r.done
|
<-r.done
|
||||||
|
|
||||||
// stop timer
|
|
||||||
if r.config.IntervalStamp {
|
|
||||||
cclog.ComponentDebug("MetricRouter", "TIMER CLOSE")
|
|
||||||
r.timerdone <- true
|
|
||||||
// wait for close of channel r.timerdone
|
|
||||||
<-r.timerdone
|
|
||||||
}
|
|
||||||
|
|
||||||
// stop metric cache
|
// stop metric cache
|
||||||
if r.config.NumCacheIntervals > 0 {
|
if r.config.NumCacheIntervals > 0 {
|
||||||
cclog.ComponentDebug("MetricRouter", "CACHE CLOSE")
|
cclog.ComponentDebug("MetricRouter", "CACHE CLOSE")
|
||||||
|
@@ -6,12 +6,14 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||||
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||||
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
|
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
|
||||||
|
influxdb2ApiHttp "github.com/influxdata/influxdb-client-go/v2/api/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
type InfluxAsyncSinkConfig struct {
|
type InfluxAsyncSinkConfig struct {
|
||||||
@@ -32,6 +34,8 @@ type InfluxAsyncSinkConfig struct {
|
|||||||
InfluxExponentialBase uint `json:"retry_exponential_base,omitempty"`
|
InfluxExponentialBase uint `json:"retry_exponential_base,omitempty"`
|
||||||
InfluxMaxRetries uint `json:"max_retries,omitempty"`
|
InfluxMaxRetries uint `json:"max_retries,omitempty"`
|
||||||
InfluxMaxRetryTime string `json:"max_retry_time,omitempty"`
|
InfluxMaxRetryTime string `json:"max_retry_time,omitempty"`
|
||||||
|
CustomFlushInterval string `json:"custom_flush_interval,omitempty"`
|
||||||
|
MaxRetryAttempts uint `json:"max_retry_attempts,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type InfluxAsyncSink struct {
|
type InfluxAsyncSink struct {
|
||||||
@@ -42,6 +46,8 @@ type InfluxAsyncSink struct {
|
|||||||
config InfluxAsyncSinkConfig
|
config InfluxAsyncSinkConfig
|
||||||
influxRetryInterval uint
|
influxRetryInterval uint
|
||||||
influxMaxRetryTime uint
|
influxMaxRetryTime uint
|
||||||
|
customFlushInterval time.Duration
|
||||||
|
flushTimer *time.Timer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *InfluxAsyncSink) connect() error {
|
func (s *InfluxAsyncSink) connect() error {
|
||||||
@@ -98,10 +104,23 @@ func (s *InfluxAsyncSink) connect() error {
|
|||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("connection to %s not healthy", uri)
|
return fmt.Errorf("connection to %s not healthy", uri)
|
||||||
}
|
}
|
||||||
|
s.writeApi.SetWriteFailedCallback(func(batch string, err influxdb2ApiHttp.Error, retryAttempts uint) bool {
|
||||||
|
mlist := strings.Split(batch, "\n")
|
||||||
|
cclog.ComponentError(s.name, fmt.Sprintf("Failed to write batch with %d metrics %d times (max: %d): %s", len(mlist), retryAttempts, s.config.MaxRetryAttempts, err.Error()))
|
||||||
|
return retryAttempts <= s.config.MaxRetryAttempts
|
||||||
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
|
func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
|
||||||
|
if s.customFlushInterval != 0 && s.flushTimer == nil {
|
||||||
|
// Run a batched flush for all lines that have arrived in the defined interval
|
||||||
|
s.flushTimer = time.AfterFunc(s.customFlushInterval, func() {
|
||||||
|
if err := s.Flush(); err != nil {
|
||||||
|
cclog.ComponentError(s.name, "flush failed:", err.Error())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
s.writeApi.WritePoint(
|
s.writeApi.WritePoint(
|
||||||
m.ToPoint(s.meta_as_tags),
|
m.ToPoint(s.meta_as_tags),
|
||||||
)
|
)
|
||||||
@@ -109,7 +128,11 @@ func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *InfluxAsyncSink) Flush() error {
|
func (s *InfluxAsyncSink) Flush() error {
|
||||||
|
cclog.ComponentDebug(s.name, "Flushing")
|
||||||
s.writeApi.Flush()
|
s.writeApi.Flush()
|
||||||
|
if s.customFlushInterval != 0 && s.flushTimer != nil {
|
||||||
|
s.flushTimer = nil
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -132,6 +155,9 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
s.config.InfluxMaxRetries = 0
|
s.config.InfluxMaxRetries = 0
|
||||||
s.config.InfluxExponentialBase = 0
|
s.config.InfluxExponentialBase = 0
|
||||||
s.config.FlushInterval = 0
|
s.config.FlushInterval = 0
|
||||||
|
s.config.CustomFlushInterval = ""
|
||||||
|
s.customFlushInterval = time.Duration(0)
|
||||||
|
s.config.MaxRetryAttempts = 1
|
||||||
|
|
||||||
// Default retry intervals (in seconds)
|
// Default retry intervals (in seconds)
|
||||||
// 1 2
|
// 1 2
|
||||||
@@ -183,6 +209,15 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
|
s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
|
||||||
s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
|
s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
|
||||||
|
|
||||||
|
// Use a own timer for calling Flush()
|
||||||
|
if len(s.config.CustomFlushInterval) > 0 {
|
||||||
|
t, err := time.ParseDuration(s.config.CustomFlushInterval)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid duration in 'custom_flush_interval': %v", err)
|
||||||
|
}
|
||||||
|
s.customFlushInterval = t
|
||||||
|
}
|
||||||
|
|
||||||
// Connect to InfluxDB server
|
// Connect to InfluxDB server
|
||||||
if err := s.connect(); err != nil {
|
if err := s.connect(); err != nil {
|
||||||
return nil, fmt.Errorf("unable to connect: %v", err)
|
return nil, fmt.Errorf("unable to connect: %v", err)
|
||||||
|
Reference in New Issue
Block a user