Add documentation

This commit is contained in:
Holger Obermaier 2022-01-28 15:16:58 +01:00
parent 82f5c1c5d0
commit 4e408f9490
3 changed files with 81 additions and 47 deletions

View File

@ -34,17 +34,18 @@ var AvailableCollectors = map[string]MetricCollector{
"nfsstat": new(NfsCollector), "nfsstat": new(NfsCollector),
} }
// Metric collector manager data structure
type collectorManager struct { type collectorManager struct {
collectors []MetricCollector collectors []MetricCollector // List of metric collectors to use
output chan lp.CCMetric // List of all output channels output chan lp.CCMetric // Output channels
done chan bool // channel to finish / stop metric collector manager done chan bool // channel to finish / stop metric collector manager
ticker mct.MultiChanTicker ticker mct.MultiChanTicker // periodically ticking once each interval
duration time.Duration duration time.Duration // duration (for metrics that measure over a given duration)
wg *sync.WaitGroup wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
config map[string]json.RawMessage config map[string]json.RawMessage // json encoded config for collector manager
} }
// Metric collector access functions // Metric collector manager access functions
type CollectorManager interface { type CollectorManager interface {
Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error
AddOutput(output chan lp.CCMetric) AddOutput(output chan lp.CCMetric)
@ -53,9 +54,9 @@ type CollectorManager interface {
} }
// Init initializes a new metric collector manager by setting up: // Init initializes a new metric collector manager by setting up:
// * output channels // * output channel
// * done channel // * done channel
// * wait group synchronization (from variable wg) // * wait group synchronization for goroutines (from variable wg)
// * ticker (from variable ticker) // * ticker (from variable ticker)
// * configuration (read from config file in variable collectConfigFile) // * configuration (read from config file in variable collectConfigFile)
// Initialization is done for all configured collectors // Initialization is done for all configured collectors
@ -82,20 +83,20 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
} }
// Initialize configured collectors // Initialize configured collectors
for k, cfg := range cm.config { for collectorName, collectorCfg := range cm.config {
if _, found := AvailableCollectors[k]; !found { if _, found := AvailableCollectors[collectorName]; !found {
cclog.ComponentError("CollectorManager", "SKIP unknown collector", k) cclog.ComponentError("CollectorManager", "SKIP unknown collector", collectorName)
continue continue
} }
c := AvailableCollectors[k] collector := AvailableCollectors[collectorName]
err = c.Init(cfg) err = collector.Init(collectorCfg)
if err != nil { if err != nil {
cclog.ComponentError("CollectorManager", "Collector", k, "initialization failed:", err.Error()) cclog.ComponentError("CollectorManager", "Collector", collectorName, "initialization failed:", err.Error())
continue continue
} }
cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", c.Name()) cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name())
cm.collectors = append(cm.collectors, c) cm.collectors = append(cm.collectors, collector)
} }
return nil return nil
} }
@ -157,7 +158,7 @@ func (cm *collectorManager) Close() {
// New creates a new initialized metric collector manager // New creates a new initialized metric collector manager
func New(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) { func New(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) {
cm := &collectorManager{} cm := new(collectorManager)
err := cm.Init(ticker, duration, wg, collectConfigFile) err := cm.Init(ticker, duration, wg, collectConfigFile)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -24,19 +24,20 @@ type metricRouterTagConfig struct {
type metricRouterConfig struct { type metricRouterConfig struct {
AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met
DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met
IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically? IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval?
} }
// Metric router data structure
type metricRouter struct { type metricRouter struct {
coll_input chan lp.CCMetric // Input channel from CollectorManager coll_input chan lp.CCMetric // Input channel from CollectorManager
recv_input chan lp.CCMetric // Input channel from ReceiveManager recv_input chan lp.CCMetric // Input channel from ReceiveManager
outputs []chan lp.CCMetric // List of all output channels outputs []chan lp.CCMetric // List of all output channels
done chan bool // channel to finish / stop metric router done chan bool // channel to finish / stop metric router
wg *sync.WaitGroup wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
timestamp time.Time // timestamp timestamp time.Time // timestamp periodically updated by ticker each interval
timerdone chan bool // channel to finish / stop timestamp updater timerdone chan bool // channel to finish / stop timestamp updater
ticker mct.MultiChanTicker ticker mct.MultiChanTicker // periodically ticking once each interval
config metricRouterConfig config metricRouterConfig // json encoded config for metric router
} }
// MetricRouter access functions // MetricRouter access functions
@ -60,6 +61,8 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
r.done = make(chan bool) r.done = make(chan bool)
r.wg = wg r.wg = wg
r.ticker = ticker r.ticker = ticker
// Read metric router config file
configFile, err := os.Open(routerConfigFile) configFile, err := os.Open(routerConfigFile)
if err != nil { if err != nil {
cclog.ComponentError("MetricRouter", err.Error()) cclog.ComponentError("MetricRouter", err.Error())
@ -97,11 +100,11 @@ func (r *metricRouter) StartTimer() {
cclog.ComponentDebug("MetricRouter", "TIMER START") cclog.ComponentDebug("MetricRouter", "TIMER START")
} }
// EvalCondition evaluates condition Cond for metric data from point // EvalCondition evaluates condition cond for metric data from point
func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error) { func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, error) {
expression, err := govaluate.NewEvaluableExpression(Cond) expression, err := govaluate.NewEvaluableExpression(cond)
if err != nil { if err != nil {
cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error()) cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error())
return false, err return false, err
} }
@ -122,7 +125,7 @@ func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, erro
// evaluate condition // evaluate condition
result, err := expression.Evaluate(params) result, err := expression.Evaluate(params)
if err != nil { if err != nil {
cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error()) cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error())
return false, err return false, err
} }
return bool(result.(bool)), err return bool(result.(bool)), err
@ -172,13 +175,20 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) {
// Start starts the metric router // Start starts the metric router
func (r *metricRouter) Start() { func (r *metricRouter) Start() {
// start timer if configured
r.timestamp = time.Now() r.timestamp = time.Now()
if r.config.IntervalStamp { if r.config.IntervalStamp {
r.StartTimer() r.StartTimer()
} }
// Router manager is done
done := func() { done := func() {
cclog.ComponentDebug("MetricRouter", "DONE") cclog.ComponentDebug("MetricRouter", "DONE")
} }
// Forward takes a received metric, adds or deletes tags
// and forwards it to the output channels
forward := func(point lp.CCMetric) { forward := func(point lp.CCMetric) {
cclog.ComponentDebug("MetricRouter", "FORWARD", point) cclog.ComponentDebug("MetricRouter", "FORWARD", point)
r.DoAddTags(point) r.DoAddTags(point)
@ -192,17 +202,20 @@ func (r *metricRouter) Start() {
go func() { go func() {
defer r.wg.Done() defer r.wg.Done()
for { for {
// RouterLoop:
select { select {
case <-r.done: case <-r.done:
done() done()
return return
case p := <-r.coll_input: case p := <-r.coll_input:
// receive from metric collector
if r.config.IntervalStamp { if r.config.IntervalStamp {
p.SetTime(r.timestamp) p.SetTime(r.timestamp)
} }
forward(p) forward(p)
case p := <-r.recv_input: case p := <-r.recv_input:
// receive from receive manager
if r.config.IntervalStamp { if r.config.IntervalStamp {
p.SetTime(r.timestamp) p.SetTime(r.timestamp)
} }
@ -213,11 +226,12 @@ func (r *metricRouter) Start() {
cclog.ComponentDebug("MetricRouter", "STARTED") cclog.ComponentDebug("MetricRouter", "STARTED")
} }
// AddInput adds a input channel to the metric router // AddCollectorInput adds a channel between metric collector and metric router
func (r *metricRouter) AddCollectorInput(input chan lp.CCMetric) { func (r *metricRouter) AddCollectorInput(input chan lp.CCMetric) {
r.coll_input = input r.coll_input = input
} }
// AddReceiverInput adds a channel between metric receiver and metric router
func (r *metricRouter) AddReceiverInput(input chan lp.CCMetric) { func (r *metricRouter) AddReceiverInput(input chan lp.CCMetric) {
r.recv_input = input r.recv_input = input
} }

View File

@ -9,21 +9,24 @@ import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
// Map of all available sinks
var AvailableSinks = map[string]Sink{ var AvailableSinks = map[string]Sink{
"influxdb": &InfluxSink{}, "influxdb": new(InfluxSink),
"stdout": &StdoutSink{}, "stdout": new(StdoutSink),
"nats": &NatsSink{}, "nats": new(NatsSink),
"http": &HttpSink{}, "http": new(HttpSink),
} }
// Metric collector manager data structure
type sinkManager struct { type sinkManager struct {
input chan lp.CCMetric input chan lp.CCMetric // input channel
outputs []Sink outputs []Sink // List of sinks to use
done chan bool done chan bool // channel to finish / stop metric sink manager
wg *sync.WaitGroup wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
config []sinkConfig config []sinkConfig // json encoded config for sink manager
} }
// Sink manager access functions
type SinkManager interface { type SinkManager interface {
Init(wg *sync.WaitGroup, sinkConfigFile string) error Init(wg *sync.WaitGroup, sinkConfigFile string) error
AddInput(input chan lp.CCMetric) AddInput(input chan lp.CCMetric)
@ -38,6 +41,8 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
sm.done = make(chan bool) sm.done = make(chan bool)
sm.wg = wg sm.wg = wg
sm.config = make([]sinkConfig, 0) sm.config = make([]sinkConfig, 0)
// Read sink config file
if len(sinkConfigFile) > 0 { if len(sinkConfigFile) > 0 {
configFile, err := os.Open(sinkConfigFile) configFile, err := os.Open(sinkConfigFile)
if err != nil { if err != nil {
@ -63,27 +68,36 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
} }
func (sm *sinkManager) Start() { func (sm *sinkManager) Start() {
sm.wg.Add(1)
batchcount := 20 batchcount := 20
sm.wg.Add(1)
go func() { go func() {
defer sm.wg.Done()
// Sink manager is done
done := func() { done := func() {
for _, s := range sm.outputs { for _, s := range sm.outputs {
s.Flush() s.Flush()
s.Close() s.Close()
} }
sm.wg.Done()
cclog.ComponentDebug("SinkManager", "DONE") cclog.ComponentDebug("SinkManager", "DONE")
} }
for { for {
select { select {
case <-sm.done: case <-sm.done:
done() done()
return return
case p := <-sm.input: case p := <-sm.input:
// Send received metric to all outputs
cclog.ComponentDebug("SinkManager", "WRITE", p) cclog.ComponentDebug("SinkManager", "WRITE", p)
for _, s := range sm.outputs { for _, s := range sm.outputs {
s.Write(p) s.Write(p)
} }
// Flush all outputs
if batchcount == 0 { if batchcount == 0 {
cclog.ComponentDebug("SinkManager", "FLUSH") cclog.ComponentDebug("SinkManager", "FLUSH")
for _, s := range sm.outputs { for _, s := range sm.outputs {
@ -95,9 +109,12 @@ func (sm *sinkManager) Start() {
} }
} }
}() }()
// Sink manager is started
cclog.ComponentDebug("SinkManager", "STARTED") cclog.ComponentDebug("SinkManager", "STARTED")
} }
// AddInput adds the input channel to the sink manager
func (sm *sinkManager) AddInput(input chan lp.CCMetric) { func (sm *sinkManager) AddInput(input chan lp.CCMetric) {
sm.input = input sm.input = input
} }
@ -128,11 +145,13 @@ func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error {
return nil return nil
} }
// Close finishes / stops the sink manager
func (sm *sinkManager) Close() { func (sm *sinkManager) Close() {
cclog.ComponentDebug("SinkManager", "CLOSE") cclog.ComponentDebug("SinkManager", "CLOSE")
sm.done <- true sm.done <- true
} }
// New creates a new initialized sink manager
func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) { func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) {
sm := &sinkManager{} sm := &sinkManager{}
err := sm.Init(wg, sinkConfigFile) err := sm.Init(wg, sinkConfigFile)