Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop

This commit is contained in:
Thomas Roehl 2022-02-25 13:53:34 +01:00
commit 940623585c
6 changed files with 359 additions and 2 deletions

View File

@ -0,0 +1,81 @@
package collectors
import (
"encoding/json"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
// These are the fields we read from the JSON configuration
type SampleCollectorConfig struct {
Interval string `json:"interval"`
}
// This contains all variables we need during execution and the variables
// defined by metricCollector (name, init, ...)
type SampleCollector struct {
metricCollector
config SampleTimerCollectorConfig // the configuration structure
meta map[string]string // default meta information
tags map[string]string // default tags
}
func (m *SampleCollector) Init(config json.RawMessage) error {
var err error = nil
// Always set the name early in Init() to use it in cclog.Component* functions
m.name = "InternalCollector"
// This is for later use, also call it early
m.setup()
// Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
// Define tags sent with each metric
// The 'type' tag is always needed, it defines the granulatity of the metric
// node -> whole system
// socket -> CPU socket (requires socket ID as 'type-id' tag)
// cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag)
m.tags = map[string]string{"type": "node"}
// Read in the JSON configuration
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
}
}
// Set up everything that the collector requires during the Read() execution
// Check files required, test execution of some commands, create data structure
// for all topological entities (sockets, NUMA domains, ...)
// Return some useful error message in case of any failures
// Set this flag only if everything is initialized properly, all required files exist, ...
m.init = true
return err
}
func (m *SampleCollector) Read(interval time.Duration, output chan lp.CCMetric) {
// Create a sample metric
timestamp := time.Now()
value := 1.0
// If you want to measure something for a specific amout of time, use interval
// start := readState()
// time.Sleep(interval)
// stop := readState()
// value = (stop - start) / interval.Seconds()
y, err := lp.New("sample_metric", m.tags, m.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil {
// Send it to output channel
output <- y
}
}
func (m *SampleCollector) Close() {
// Unset flag
m.init = false
}

View File

@ -0,0 +1,122 @@
package collectors
import (
"encoding/json"
"sync"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
// These are the fields we read from the JSON configuration
type SampleTimerCollectorConfig struct {
Interval string `json:"interval"`
}
// This contains all variables we need during execution and the variables
// defined by metricCollector (name, init, ...)
type SampleTimerCollector struct {
metricCollector
wg sync.WaitGroup // sync group for management
done chan bool // channel for management
meta map[string]string // default meta information
tags map[string]string // default tags
config SampleTimerCollectorConfig // the configuration structure
interval time.Duration // the interval parsed from configuration
ticker *time.Ticker // own timer
output chan lp.CCMetric // own internal output channel
}
func (m *SampleTimerCollector) Init(name string, config json.RawMessage) error {
var err error = nil
// Always set the name early in Init() to use it in cclog.Component* functions
m.name = "SampleTimerCollector"
// This is for later use, also call it early
m.setup()
// Define meta information sent with each metric
// (Can also be dynamic or this is the basic set with extension through AddMeta())
m.meta = map[string]string{"source": m.name, "group": "SAMPLE"}
// Define tags sent with each metric
// The 'type' tag is always needed, it defines the granulatity of the metric
// node -> whole system
// socket -> CPU socket (requires socket ID as 'type-id' tag)
// cpu -> single CPU hardware thread (requires cpu ID as 'type-id' tag)
m.tags = map[string]string{"type": "node"}
// Read in the JSON configuration
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
cclog.ComponentError(m.name, "Error reading config:", err.Error())
return err
}
}
// Parse the read interval duration
m.interval, err = time.ParseDuration(m.config.Interval)
if err != nil {
cclog.ComponentError(m.name, "Error parsing interval:", err.Error())
return err
}
// Storage for output channel
m.output = nil
// Mangement channel for the timer function.
m.done = make(chan bool)
// Create the own ticker
m.ticker = time.NewTicker(m.interval)
// Start the timer loop with return functionality by sending 'true' to the done channel
m.wg.Add(1)
go func() {
select {
case <-m.done:
// Exit the timer loop
cclog.ComponentDebug(m.name, "Closing...")
m.wg.Done()
return
case timestamp := <-m.ticker.C:
// This is executed every timer tick but we have to wait until the first
// Read() to get the output channel
if m.output != nil {
m.ReadMetrics(timestamp)
}
}
}()
// Set this flag only if everything is initialized properly, all required files exist, ...
m.init = true
return err
}
// This function is called at each interval timer tick
func (m *SampleTimerCollector) ReadMetrics(timestamp time.Time) {
// Create a sample metric
value := 1.0
// If you want to measure something for a specific amout of time, use interval
// start := readState()
// time.Sleep(interval)
// stop := readState()
// value = (stop - start) / interval.Seconds()
y, err := lp.New("sample_metric", m.tags, m.meta, map[string]interface{}{"value": value}, timestamp)
if err == nil && m.output != nil {
// Send it to output channel if we have a valid channel
m.output <- y
}
}
func (m *SampleTimerCollector) Read(interval time.Duration, output chan lp.CCMetric) {
// Capture output channel
m.output = output
}
func (m *SampleTimerCollector) Close() {
// Send signal to the timer loop to stop it
m.done <- true
// Wait until the timer loop is done
m.wg.Wait()
// Unset flag
m.init = false
}

View File

@ -0,0 +1,77 @@
package receivers
import (
"encoding/json"
"fmt"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
)
type SampleReceiverConfig struct {
Type string `json:"type"`
Addr string `json:"address"`
Port string `json:"port"`
}
type SampleReceiver struct {
receiver
config SampleReceiverConfig
// Storage for static information
meta map[string]string
// Use in case of own go routine
// done chan bool
// wg sync.WaitGroup
}
func (r *SampleReceiver) Start() {
cclog.ComponentDebug(r.name, "START")
// Start server process like http.ListenAndServe()
// or use own go routine but always make sure it exits
// as soon as it gets the signal of the r.done channel
// r.wg.Add(1)
// go func() {
// for {
// select {
// case <-r.done:
// r.wg.Done()
// return
// }
// }
// r.wg.Done()
// }()
}
func (r *SampleReceiver) Close() {
cclog.ComponentDebug(r.name, "CLOSE")
// Close server like http.Shutdown()
// in case of own go routine, send the signal and wait
// r.done <- true
// r.wg.Wait()
}
func NewSampleReceiver(name string, config json.RawMessage) (Receiver, error) {
r := new(SampleReceiver)
r.name = fmt.Sprintf("HttpReceiver(%s)", name)
// Set static information
r.meta = map[string]string{"source": r.name}
// Read the sample receiver specific JSON config
if len(config) > 0 {
err := json.Unmarshal(config, &r.config)
if err != nil {
cclog.ComponentError(r.name, "Error reading config:", err.Error())
return nil, err
}
}
// Check that all required fields in the configuration are set
// Use 'if len(r.config.Option) > 0' for strings
return r, nil
}

View File

@ -1,6 +1,7 @@
package sinks package sinks
import ( import (
"context"
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
"errors" "errors"
@ -64,6 +65,13 @@ func (s *InfluxAsyncSink) connect() error {
) )
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database) s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database)
ok, err := s.client.Ping(context.Background())
if err != nil {
return err
}
if !ok {
return fmt.Errorf("connection to %s not healthy", uri)
}
return nil return nil
} }
@ -108,7 +116,7 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
// Connect to InfluxDB server // Connect to InfluxDB server
if err := s.connect(); err != nil { if err := s.connect(); err != nil {
return nil, fmt.Errorf("Unable to connect: %v", err) return nil, fmt.Errorf("unable to connect: %v", err)
} }
// Start background: Read from error channel // Start background: Read from error channel

View File

@ -54,6 +54,13 @@ func (s *InfluxSink) connect() error {
) )
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database) s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database)
ok, err := s.client.Ping(context.Background())
if err != nil {
return err
}
if !ok {
return fmt.Errorf("connection to %s not healthy", uri)
}
return nil return nil
} }
@ -94,7 +101,7 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
// Connect to InfluxDB server // Connect to InfluxDB server
if err := s.connect(); err != nil { if err := s.connect(); err != nil {
return nil, fmt.Errorf("Unable to connect: %v", err) return nil, fmt.Errorf("unable to connect: %v", err)
} }
return s, nil return s, nil
} }

62
sinks/sampleSink.go Normal file
View File

@ -0,0 +1,62 @@
package sinks
import (
"encoding/json"
"fmt"
"log"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
type SampleSinkConfig struct {
defaultSinkConfig // defines JSON tags for 'type' and 'meta_as_tags'
// Add additional options
}
type SampleSink struct {
sink // declarate 'name' and 'meta_as_tags'
config SampleSinkConfig // entry point to the SampleSinkConfig
}
// Code to submit a single CCMetric to the sink
func (s *SampleSink) Write(point lp.CCMetric) error {
log.Print(point)
return nil
}
// If the sink uses batched sends internally, you can tell to flush its buffers
func (s *SampleSink) Flush() error {
return nil
}
// Close sink: close network connection, close files, close libraries, ...
func (s *SampleSink) Close() {
cclog.ComponentDebug(s.name, "CLOSE")
}
// New function to create a new instance of the sink
// Initialize the sink by giving it a name and reading in the config JSON
func NewSampleSink(name string, config json.RawMessage) (Sink, error) {
s := new(SampleSink)
s.name = fmt.Sprintf("SampleSink(%s)", name) // Always specify a name here
// Set defaults in s.config
// Read in the config JSON
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
}
}
// Check if all required fields in the config are set
// Use 'len(s.config.Option) > 0' for string settings
// Establish connection to the server, library, ...
// Check required files exist and lookup path(s) of executable(s)
// Return (nil, meaningful error message) in case of errors
return s, nil
}