diff --git a/sinks/gangliaSink.go b/sinks/gangliaSink.go index 5324123..01ba118 100644 --- a/sinks/gangliaSink.go +++ b/sinks/gangliaSink.go @@ -11,6 +11,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) const GMETRIC_EXEC = `gmetric` @@ -29,9 +30,10 @@ type GangliaSinkConfig struct { type GangliaSink struct { sink - gmetric_path string - gmetric_config string - config GangliaSinkConfig + gmetric_path string + gmetric_config string + config GangliaSinkConfig + statsSentMetrics int64 } func (s *GangliaSink) Write(point lp.CCMetric) error { @@ -78,6 +80,8 @@ func (s *GangliaSink) Write(point lp.CCMetric) error { command := exec.Command(s.gmetric_path, argstr...) command.Wait() _, err = command.Output() + s.statsSentMetrics++ + stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) return err } @@ -120,5 +124,6 @@ func NewGangliaSink(name string, config json.RawMessage) (Sink, error) { if len(s.config.GmetricConfig) > 0 { s.gmetric_config = s.config.GmetricConfig } + s.statsSentMetrics = 0 return s, nil } diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 7713638..e481e50 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -11,6 +11,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" influx "github.com/influxdata/line-protocol" ) @@ -36,6 +37,8 @@ type HttpSink struct { idleConnTimeout time.Duration timeout time.Duration flushDelay time.Duration + statsProcessed int64 + statsFlushes int64 } func (s *HttpSink) Write(m lp.CCMetric) error { @@ -63,6 +66,8 @@ func (s *HttpSink) Write(m lp.CCMetric) error { cclog.ComponentError(s.name, "encoding failed:", err.Error()) return err } + s.statsProcessed++ + stats.ComponentStatInt(s.name, "processed_metrics", s.statsProcessed) // Flush synchronously if "flush_delay" is zero if s.flushDelay == 0 { @@ -112,6 +117,8 @@ func (s *HttpSink) Flush() error { cclog.ComponentError(s.name, "application error:", err.Error()) return err } + s.statsFlushes++ + stats.ComponentStatInt(s.name, "flushes", s.statsFlushes) return nil } @@ -177,5 +184,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { s.buffer = &bytes.Buffer{} s.encoder = influx.NewEncoder(s.buffer) s.encoder.SetPrecision(time.Second) + s.statsFlushes = 0 + s.statsProcessed = 0 return s, nil } diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index e22f941..9ed4828 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -10,6 +10,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" ) @@ -42,6 +43,9 @@ type InfluxAsyncSink struct { config InfluxAsyncSinkConfig influxRetryInterval uint influxMaxRetryTime uint + sentMetrics int64 + statsFlushes int64 + statsErrors int64 } func (s *InfluxAsyncSink) connect() error { @@ -105,11 +109,15 @@ func (s *InfluxAsyncSink) Write(m lp.CCMetric) error { s.writeApi.WritePoint( m.ToPoint(s.meta_as_tags), ) + s.sentMetrics++ + stats.ComponentStatInt(s.name, "send_metrics", s.sentMetrics) return nil } func (s *InfluxAsyncSink) Flush() error { s.writeApi.Flush() + s.statsFlushes++ + stats.ComponentStatInt(s.name, "flushes", s.statsFlushes) return nil } @@ -189,12 +197,17 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { } // Start background: Read from error channel + s.statsErrors = 0 s.errors = s.writeApi.Errors() go func() { for err := range s.errors { + s.statsErrors++ + stats.ComponentStatInt(s.name, "errors", s.statsErrors) cclog.ComponentError(s.name, err.Error()) } }() + s.sentMetrics = 0 + s.statsFlushes = 0 return s, nil } diff --git a/sinks/influxSink.go b/sinks/influxSink.go index e8b16d8..8749aee 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -11,6 +11,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" "github.com/influxdata/influxdb-client-go/v2/api/write" @@ -37,15 +38,17 @@ type InfluxSinkConfig struct { type InfluxSink struct { sink - client influxdb2.Client - writeApi influxdb2Api.WriteAPIBlocking - config InfluxSinkConfig - influxRetryInterval uint - influxMaxRetryTime uint - batch []*write.Point - flushTimer *time.Timer - flushDelay time.Duration - lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer + client influxdb2.Client + writeApi influxdb2Api.WriteAPIBlocking + config InfluxSinkConfig + influxRetryInterval uint + influxMaxRetryTime uint + batch []*write.Point + flushTimer *time.Timer + flushDelay time.Duration + lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer + statsSentMetrics int64 + statsProcessedMetrics int64 //influxMaxRetryDelay uint } @@ -123,8 +126,10 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { } p := m.ToPoint(s.meta_as_tags) s.lock.Lock() + s.statsProcessedMetrics++ s.batch = append(s.batch, p) s.lock.Unlock() + stats.ComponentStatInt(s.name, "processed_metrics", s.statsProcessedMetrics) // Flush synchronously if "flush_delay" is zero if s.flushDelay == 0 { @@ -145,6 +150,8 @@ func (s *InfluxSink) Flush() error { cclog.ComponentError(s.name, "flush failed:", err.Error()) return err } + s.statsSentMetrics += int64(len(s.batch)) + stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) s.batch = s.batch[:0] return nil } @@ -211,5 +218,7 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { if err := s.connect(); err != nil { return nil, fmt.Errorf("unable to connect: %v", err) } + s.statsSentMetrics = 0 + s.statsProcessedMetrics = 0 return s, nil } diff --git a/sinks/libgangliaSink.go b/sinks/libgangliaSink.go index 3651584..8cae9a0 100644 --- a/sinks/libgangliaSink.go +++ b/sinks/libgangliaSink.go @@ -73,6 +73,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" "github.com/NVIDIA/go-nvml/pkg/dl" ) @@ -102,11 +103,12 @@ type LibgangliaSinkConfig struct { type LibgangliaSink struct { sink - config LibgangliaSinkConfig - global_context C.Ganglia_pool - gmond_config C.Ganglia_gmond_config - send_channels C.Ganglia_udp_send_channels - cstrCache map[string]*C.char + config LibgangliaSinkConfig + global_context C.Ganglia_pool + gmond_config C.Ganglia_gmond_config + send_channels C.Ganglia_udp_send_channels + cstrCache map[string]*C.char + statsSentMetrics int64 } func (s *LibgangliaSink) Write(point lp.CCMetric) error { @@ -202,6 +204,8 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error { C.Ganglia_metric_destroy(gmetric) // Free the value C string, the only one not stored in the cache C.free(unsafe.Pointer(c_value)) + s.statsSentMetrics++ + stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) return err } @@ -247,7 +251,7 @@ func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) { if err != nil { return nil, fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err) } - + s.statsSentMetrics = 0 // Set up cache for the C strings s.cstrCache = make(map[string]*C.char) // s.cstrCache["globals"] = C.CString("globals") diff --git a/sinks/prometheusSink.go b/sinks/prometheusSink.go index 5011ac0..aca8ed2 100644 --- a/sinks/prometheusSink.go +++ b/sinks/prometheusSink.go @@ -11,6 +11,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -29,11 +30,12 @@ type PrometheusSinkConfig struct { type PrometheusSink struct { sink - config PrometheusSinkConfig - labelMetrics map[string]*prometheus.GaugeVec - nodeMetrics map[string]prometheus.Gauge - promWg sync.WaitGroup - promServer *http.Server + config PrometheusSinkConfig + labelMetrics map[string]*prometheus.GaugeVec + nodeMetrics map[string]prometheus.Gauge + promWg sync.WaitGroup + promServer *http.Server + statsSentMetrics int64 } func intToFloat64(input interface{}) (float64, error) { @@ -113,6 +115,8 @@ func (s *PrometheusSink) newMetric(metric lp.CCMetric) error { s.nodeMetrics[name] = new prometheus.Register(new) } + s.statsSentMetrics++ + stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) return nil } @@ -146,6 +150,8 @@ func (s *PrometheusSink) updateMetric(metric lp.CCMetric) error { } s.nodeMetrics[name].Set(value) } + s.statsSentMetrics++ + stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) return nil } diff --git a/sinks/sampleSink.go b/sinks/sampleSink.go index 2a823e6..ede5873 100644 --- a/sinks/sampleSink.go +++ b/sinks/sampleSink.go @@ -7,6 +7,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) type SampleSinkConfig struct { @@ -14,12 +15,15 @@ type SampleSinkConfig struct { // See: metricSink.go defaultSinkConfig // Additional config options, for SampleSink + } type SampleSink struct { // declares elements 'name' and 'meta_as_tags' (string to bool map!) sink config SampleSinkConfig // entry point to the SampleSinkConfig + // Stats counters + statsSentMetrics int64 } // Implement functions required for Sink interface @@ -30,6 +34,8 @@ type SampleSink struct { func (s *SampleSink) Write(point lp.CCMetric) error { // based on s.meta_as_tags use meta infos as tags log.Print(point) + s.statsSentMetrics++ + stats.ComponentStatInt(s.name, "sent_metrics", s.statsSentMetrics) return nil } @@ -63,6 +69,9 @@ func NewSampleSink(name string, config json.RawMessage) (Sink, error) { } } + // Initalize stats counters + s.statsSentMetrics = 0 + // Create lookup map to use meta infos as tags in the output metric s.meta_as_tags = make(map[string]bool) for _, k := range s.config.MetaAsTags { diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index f531f5d..f317f6c 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -102,13 +102,19 @@ func (sm *sinkManager) Start() { } toTheSinks := func(p lp.CCMetric) { + var wg sync.WaitGroup // Send received metric to all outputs cclog.ComponentDebug("SinkManager", "WRITE", p) for _, s := range sm.sinks { - if err := s.Write(p); err != nil { - cclog.ComponentError("SinkManager", "WRITE", s.Name(), "write failed:", err.Error()) - } + wg.Add(1) + go func(s Sink) { + if err := s.Write(p); err != nil { + cclog.ComponentError("SinkManager", "WRITE", s.Name(), "write failed:", err.Error()) + } + wg.Done() + }(s) } + wg.Wait() } for { diff --git a/sinks/stdoutSink.go b/sinks/stdoutSink.go index e091af3..0445b69 100644 --- a/sinks/stdoutSink.go +++ b/sinks/stdoutSink.go @@ -8,6 +8,7 @@ import ( // "time" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" ) type StdoutSink struct { @@ -17,6 +18,7 @@ type StdoutSink struct { defaultSinkConfig Output string `json:"output_file,omitempty"` } + sentMetrics int64 } func (s *StdoutSink) Write(m lp.CCMetric) error { @@ -24,6 +26,8 @@ func (s *StdoutSink) Write(m lp.CCMetric) error { s.output, m.ToLineProtocol(s.meta_as_tags), ) + s.sentMetrics++ + stats.ComponentStatInt(s.name, "sent_metrics", s.sentMetrics) return nil } @@ -68,6 +72,7 @@ func NewStdoutSink(name string, config json.RawMessage) (Sink, error) { for _, k := range s.config.MetaAsTags { s.meta_as_tags[k] = true } + s.sentMetrics = 0 return s, nil }