From bac1f18b1d9f0952edc5be3e0b2d8fa0fc66ceb2 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Fri, 25 Feb 2022 13:47:19 +0100 Subject: [PATCH] Add samples for collectors, sinks and receivers --- collectors/sampleMetric.go | 81 +++++++++++++++++++++ collectors/sampleTimerMetric.go | 122 ++++++++++++++++++++++++++++++++ receivers/sampleReceiver.go | 77 ++++++++++++++++++++ sinks/sampleSink.go | 62 ++++++++++++++++ 4 files changed, 342 insertions(+) create mode 100644 collectors/sampleMetric.go create mode 100644 collectors/sampleTimerMetric.go create mode 100644 receivers/sampleReceiver.go create mode 100644 sinks/sampleSink.go diff --git a/collectors/sampleMetric.go b/collectors/sampleMetric.go new file mode 100644 index 0000000..cd3b4cc --- /dev/null +++ b/collectors/sampleMetric.go @@ -0,0 +1,81 @@ +package collectors + +import ( + "encoding/json" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" +) + +// These are the fields we read from the JSON configuration +type SampleCollectorConfig struct { + Interval string `json:"interval"` +} + +// This contains all variables we need during execution and the variables +// defined by metricCollector (name, init, ...) +type SampleCollector struct { + metricCollector + config SampleTimerCollectorConfig // the configuration structure + meta map[string]string // default meta information + tags map[string]string // default tags +} + +func (m *SampleCollector) Init(config json.RawMessage) error { + var err error = nil + // Always set the name early in Init() to use it in cclog.Component* functions + m.name = "InternalCollector" + // This is for later use, also call it early + m.setup() + // Define meta information sent with each metric + // (Can also be dynamic or this is the basic set with extension through AddMeta()) + m.meta = map[string]string{"source": m.name, "group": "SAMPLE"} + // Define tags sent with each metric + // The 'type' tag is always needed, it defines the granulatity of the metric + // node -> whole system + // socket -> CPU socket (requires socket ID as 'type-id' tag) + // cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag) + m.tags = map[string]string{"type": "node"} + // Read in the JSON configuration + if len(config) > 0 { + err = json.Unmarshal(config, &m.config) + if err != nil { + cclog.ComponentError(m.name, "Error reading config:", err.Error()) + return err + } + } + + // Set up everything that the collector requires during the Read() execution + // Check files required, test execution of some commands, create data structure + // for all topological entities (sockets, NUMA domains, ...) + // Return some useful error message in case of any failures + + // Set this flag only if everything is initialized properly, all required files exist, ... + m.init = true + return err +} + +func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMetric) { + // Create a sample metric + timestamp := time.Now() + + value := 1.0 + // If you want to measure something for a specific amout of time, use interval + // start := readState() + // time.Sleep(interval) + // stop := readState() + // value = (stop - start) / interval.Seconds() + + y, err := lp.New("sample_metric", m.tags, m.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil { + // Send it to output channel + output <- y + } + +} + +func (m *SampleCollector) Close() { + // Unset flag + m.init = false +} diff --git a/collectors/sampleTimerMetric.go b/collectors/sampleTimerMetric.go new file mode 100644 index 0000000..aa6807e --- /dev/null +++ b/collectors/sampleTimerMetric.go @@ -0,0 +1,122 @@ +package collectors + +import ( + "encoding/json" + "sync" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" +) + +// These are the fields we read from the JSON configuration +type SampleTimerCollectorConfig struct { + Interval string `json:"interval"` +} + +// This contains all variables we need during execution and the variables +// defined by metricCollector (name, init, ...) +type SampleTimerCollector struct { + metricCollector + wg sync.WaitGroup // sync group for management + done chan bool // channel for management + meta map[string]string // default meta information + tags map[string]string // default tags + config SampleTimerCollectorConfig // the configuration structure + interval time.Duration // the interval parsed from configuration + ticker *time.Ticker // own timer + output chan lp.CCMetric // own internal output channel +} + +func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error { + var err error = nil + // Always set the name early in Init() to use it in cclog.Component* functions + m.name = "SampleTimerCollector" + // This is for later use, also call it early + m.setup() + // Define meta information sent with each metric + // (Can also be dynamic or this is the basic set with extension through AddMeta()) + m.meta = map[string]string{"source": m.name, "group": "SAMPLE"} + // Define tags sent with each metric + // The 'type' tag is always needed, it defines the granulatity of the metric + // node -> whole system + // socket -> CPU socket (requires socket ID as 'type-id' tag) + // cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag) + m.tags = map[string]string{"type": "node"} + // Read in the JSON configuration + if len(config) > 0 { + err = json.Unmarshal(config, &m.config) + if err != nil { + cclog.ComponentError(m.name, "Error reading config:", err.Error()) + return err + } + } + // Parse the read interval duration + m.interval, err = time.ParseDuration(m.config.Interval) + if err != nil { + cclog.ComponentError(m.name, "Error parsing interval:", err.Error()) + return err + } + + // Storage for output channel + m.output = nil + // Mangement channel for the timer function. + m.done = make(chan bool) + // Create the own ticker + m.ticker = time.NewTicker(m.interval) + + // Start the timer loop with return functionality by sending 'true' to the done channel + m.wg.Add(1) + go func() { + select { + case <-m.done: + // Exit the timer loop + cclog.ComponentDebug(m.name, "Closing...") + m.wg.Done() + return + case timestamp := <-m.ticker.C: + // This is executed every timer tick but we have to wait until the first + // Read() to get the output channel + if m.output != nil { + m.ReadMetrics(timestamp) + } + } + }() + + // Set this flag only if everything is initialized properly, all required files exist, ... + m.init = true + return err +} + +// This function is called at each interval timer tick +func (m *SampleTimerCollector) ReadMetrics(timestamp time.Time) { + // Create a sample metric + + value := 1.0 + + // If you want to measure something for a specific amout of time, use interval + // start := readState() + // time.Sleep(interval) + // stop := readState() + // value = (stop - start) / interval.Seconds() + + y, err := lp.New("sample_metric", m.tags, m.meta, map[string]interface{}{"value": value}, timestamp) + if err == nil && m.output != nil { + // Send it to output channel if we have a valid channel + m.output <- y + } +} + +func (m *SampleTimerCollector) Read(interval time.Duration, output chan lp.CCMetric) { + // Capture output channel + m.output = output +} + +func (m *SampleTimerCollector) Close() { + // Send signal to the timer loop to stop it + m.done <- true + // Wait until the timer loop is done + m.wg.Wait() + // Unset flag + m.init = false +} diff --git a/receivers/sampleReceiver.go b/receivers/sampleReceiver.go new file mode 100644 index 0000000..e9edc90 --- /dev/null +++ b/receivers/sampleReceiver.go @@ -0,0 +1,77 @@ +package receivers + +import ( + "encoding/json" + "fmt" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" +) + +type SampleReceiverConfig struct { + Type string `json:"type"` + Addr string `json:"address"` + Port string `json:"port"` +} + +type SampleReceiver struct { + receiver + config SampleReceiverConfig + + // Storage for static information + meta map[string]string + // Use in case of own go routine + // done chan bool + // wg sync.WaitGroup +} + +func (r *SampleReceiver) Start() { + cclog.ComponentDebug(r.name, "START") + + // Start server process like http.ListenAndServe() + + // or use own go routine but always make sure it exits + // as soon as it gets the signal of the r.done channel + // r.wg.Add(1) + // go func() { + // for { + // select { + // case <-r.done: + // r.wg.Done() + // return + // } + // } + // r.wg.Done() + // }() +} + +func (r *SampleReceiver) Close() { + cclog.ComponentDebug(r.name, "CLOSE") + + // Close server like http.Shutdown() + + // in case of own go routine, send the signal and wait + // r.done <- true + // r.wg.Wait() +} + +func NewSampleReceiver(name string, config json.RawMessage) (Receiver, error) { + r := new(SampleReceiver) + r.name = fmt.Sprintf("HttpReceiver(%s)", name) + + // Set static information + r.meta = map[string]string{"source": r.name} + + // Read the sample receiver specific JSON config + if len(config) > 0 { + err := json.Unmarshal(config, &r.config) + if err != nil { + cclog.ComponentError(r.name, "Error reading config:", err.Error()) + return nil, err + } + } + + // Check that all required fields in the configuration are set + // Use 'if len(r.config.Option) > 0' for strings + + return r, nil +} diff --git a/sinks/sampleSink.go b/sinks/sampleSink.go new file mode 100644 index 0000000..be0196a --- /dev/null +++ b/sinks/sampleSink.go @@ -0,0 +1,62 @@ +package sinks + +import ( + "encoding/json" + "fmt" + "log" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" +) + +type SampleSinkConfig struct { + defaultSinkConfig // defines JSON tags for 'type' and 'meta_as_tags' + // Add additional options +} + +type SampleSink struct { + sink // declarate 'name' and 'meta_as_tags' + config SampleSinkConfig // entry point to the SampleSinkConfig +} + +// Code to submit a single CCMetric to the sink +func (s *SampleSink) Write(point lp.CCMetric) error { + log.Print(point) + return nil +} + +// If the sink uses batched sends internally, you can tell to flush its buffers +func (s *SampleSink) Flush() error { + return nil +} + +// Close sink: close network connection, close files, close libraries, ... +func (s *SampleSink) Close() { + cclog.ComponentDebug(s.name, "CLOSE") +} + +// New function to create a new instance of the sink +// Initialize the sink by giving it a name and reading in the config JSON +func NewSampleSink(name string, config json.RawMessage) (Sink, error) { + s := new(SampleSink) + s.name = fmt.Sprintf("SampleSink(%s)", name) // Always specify a name here + + // Set defaults in s.config + + // Read in the config JSON + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return nil, err + } + } + + // Check if all required fields in the config are set + // Use 'len(s.config.Option) > 0' for string settings + + // Establish connection to the server, library, ... + // Check required files exist and lookup path(s) of executable(s) + + // Return (nil, meaningful error message) in case of errors + return s, nil +}