cc-metric-collector/sinks
Thomas Gruber b3c27e0af5
Merge latest development changes (#80)
* Cleanup: Remove unused code

* Use Golang duration parser for 'interval' and 'duration'
 in main config

* Update handling of LIKWID headers. Download only if not already present in the system. Fixes #73

* Units with cc-units (#64)

* Add option to normalize units with cc-unit

* Add unit conversion to router

* Add option to change unit prefix in the router

* Add to MetricRouter README

* Add order of operations in router to README

* Use second add_tags/del_tags only if metric gets renamed

* Skip disks in DiskstatCollector that have size=0

* Check readability of sensor files in TempCollector

* Fix for --once option

* Rename `cpu` type to `hwthread` (#69)

* Rename 'cpu' type to 'hwthread' to avoid naming clashes with MetricStore and CC-Webfrontend

* Collectors in parallel (#74)

* Provide info to CollectorManager whether the collector can be executed in parallel with others

* Split serial and parallel collectors. Read in parallel first

* Update NvidiaCollector with new metrics, MIG and NvLink support (#75)

* CC topology module update (#76)

* Rename CPU to hardware thread, write some comments

* Do renaming in other parts

* Remove CpuList and SocketList function from metricCollector. Available in ccTopology

* Option to use MIG UUID as subtype-id in NvidiaCollector

* Option to use MIG slice name as subtype-id in NvidiaCollector

* MetricRouter: Fix JSON in README

* Fix for Github Action to really use the selected version

* Remove Ganglia installation in runonce Action and add Go 1.18

* Fix daemon options in init script

* Add separate go.mod files to use it with deprecated 1.16

* Minor updates for Makefiles

* fix string comparison

* AMD ROCm SMI collector (#77)

* Add collector for AMD ROCm SMI metrics

* Fix import path

* Fix imports

* Remove Board Number

* store GPU index explicitly

* Remove board number from description

* Use http instead of ftp to download likwid

* Fix serial number in rocmCollector

* Improved http sink (#78)

* automatic flush in NatsSink

* tweak default options of HttpSink

* shorter cirt. section and retries for HttpSink

* fix error handling

* Remove file added by mistake.

* Use http instead of ftp to download likwid

* Fix serial number in rocmCollector

Co-authored-by: Thomas Roehl <thomas.roehl@fau.de>

* Fix: When sending metrics failed the batch size could be exceeded

* Improved dropping of metrics failed to send

* Add memstats and topprocs metric

* Updated to latest modules

* Check that at least one sink is running

* Add drop rate, when send buffer is full

* Allow only one timer at a time

* Use mutex to ensure only on flush timer is running

* Fix for NvidiaCollector when devices are not in MiG mode

* Remove Golang version 1.16 an 1.17 from Action. Latest commits require Golang 1.18

* Use Golang 1.18 in Release action to build RPMs

* Change unit of CpufreqCollector to Hz. That's what the sysfs outputs

* Make wget quiet in Release action to reduce log size

Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>
Co-authored-by: Lou <lou.knauer@gmx.de>
2022-07-13 10:09:49 +02:00
..
gangliaCommon.go Use old metric name in Ganglia if rename has happened in the router (#60) 2022-03-11 13:44:32 +01:00
gangliaSink.go Use old metric name in Ganglia if rename has happened in the router (#60) 2022-03-11 13:44:32 +01:00
gangliaSink.md Sink specific configuration maps (#25) 2022-02-04 18:12:24 +01:00
httpSink.go Merge latest development changes to main branch (#79) 2022-06-08 15:25:40 +02:00
httpSink.md Automatically flush batched writes in the HTTP sink (#31) 2022-02-10 13:12:32 +01:00
influxAsyncSink.go Merge latest development changes to main branch (#79) 2022-06-08 15:25:40 +02:00
influxAsyncSink.md Add config options for retry intervals of InfluxDB clients (#59) 2022-03-11 13:43:03 +01:00
influxSink.go Merge latest development changes (#80) 2022-07-13 10:09:49 +02:00
influxSink.md InfluxSink: Use batch&flush logic from HttpSink 2022-04-01 18:37:45 +02:00
libgangliaSink.go Use old metric name in Ganglia if rename has happened in the router (#60) 2022-03-11 13:44:32 +01:00
libgangliaSink.md Ganglia sink using libganglia.so directly (#35) 2022-02-16 18:33:46 +01:00
metricSink.go Meta to tags list and map for sinks (#63) 2022-03-15 16:16:26 +01:00
natsSink.go Merge latest development changes to main branch (#79) 2022-06-08 15:25:40 +02:00
natsSink.md Sink specific configuration maps (#25) 2022-02-04 18:12:24 +01:00
prometheusSink.go Add sink for Prometheus monitoring system (#46) 2022-02-25 14:33:20 +01:00
prometheusSink.md Add sink for Prometheus monitoring system (#46) 2022-02-25 14:33:20 +01:00
README.md Add sink for Prometheus monitoring system (#46) 2022-02-25 14:33:20 +01:00
sampleSink.go Meta to tags list and map for sinks (#63) 2022-03-15 16:16:26 +01:00
sinkManager.go Merge latest development changes (#80) 2022-07-13 10:09:49 +02:00
stdoutSink.go Meta to tags list and map for sinks (#63) 2022-03-15 16:16:26 +01:00
stdoutSink.md Sink specific configuration maps (#25) 2022-02-04 18:12:24 +01:00

CCMetric sinks

This folder contains the SinkManager and sink implementations for the cc-metric-collector.

Available sinks:

Configuration

The configuration file for the sinks is a list of configurations. The type field in each specifies which sink to initialize.

[
  "mystdout" : {
    "type" : "stdout",
    "meta_as_tags" : false
  },
  "metricstore" : {
    "type" : "http",
    "host" : "localhost",
    "port" : "4123",
    "database" : "ccmetric",
    "password" : "<jwt token>"
  }
]

Contributing own sinks

A sink contains five functions and is derived from the type sink:

  • Init(name string, config json.RawMessage) error
  • Write(point CCMetric) error
  • Flush() error
  • Close()
  • New<Typename>(name string, config json.RawMessage) (Sink, error) (calls the Init() function)

The data structures should be set up in Init() like opening a file or server connection. The Write() function writes/sends the data. For non-blocking sinks, the Flush() method tells the sink to drain its internal buffers. The Close() function should tear down anything created in Init().

Finally, the sink needs to be registered in the sinkManager.go. There is a list of sinks called AvailableSinks which is a map (sink_type_string -> pointer to sink interface). Add a new entry with a descriptive name and the new sink.

Sample sink

package sinks

import (
	"encoding/json"
	"log"
	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)

type SampleSinkConfig struct {
	defaultSinkConfig  // defines JSON tags for 'name' and 'meta_as_tags'
}

type SampleSink struct {
	sink              // declarate 'name' and 'meta_as_tags'
	config StdoutSinkConfig // entry point to the SampleSinkConfig
}

// Initialize the sink by giving it a name and reading in the config JSON
func (s *SampleSink) Init(name string, config json.RawMessage) error {
	s.name = fmt.Sprintf("SampleSink(%s)", name)   // Always specify a name here
  // Read in the config JSON
	if len(config) > 0 {
		err := json.Unmarshal(config, &s.config)
		if err != nil {
			return err
		}
	}
	return nil
}

// Code to submit a single CCMetric to the sink
func (s *SampleSink) Write(point lp.CCMetric) error {
	log.Print(point)
	return nil
}

// If the sink uses batched sends internally, you can tell to flush its buffers
func (s *SampleSink) Flush() error {
	return nil
}


// Close sink: close network connection, close files, close libraries, ...
func (s *SampleSink) Close() {}


// New function to create a new instance of the sink
func NewSampleSink(name string, config json.RawMessage) (Sink, error) {
	s := new(SampleSink)
	err := s.Init(name, config)
	return s, err
}