Skip collectors that fail at init. Only write metrics with fields

This commit is contained in:
Thomas Roehl 2021-05-14 19:22:42 +02:00
parent 6b6e28d4c7
commit 1da906470d

View File

@ -2,6 +2,7 @@ package main
import ( import (
"encoding/json" "encoding/json"
"flag"
"fmt" "fmt"
"github.com/ClusterCockpit/cc-metric-collector/collectors" "github.com/ClusterCockpit/cc-metric-collector/collectors"
"github.com/ClusterCockpit/cc-metric-collector/sinks" "github.com/ClusterCockpit/cc-metric-collector/sinks"
@ -11,7 +12,6 @@ import (
"strings" "strings"
"sync" "sync"
"time" "time"
"flag"
) )
// List of provided collectors. Which collector should be run can be // List of provided collectors. Which collector should be run can be
@ -23,14 +23,15 @@ var Collectors = map[string]collectors.MetricGetter{
"netstat": &collectors.NetstatCollector{}, "netstat": &collectors.NetstatCollector{},
"ibstat": &collectors.InfinibandCollector{}, "ibstat": &collectors.InfinibandCollector{},
"lustrestat": &collectors.LustreCollector{}, "lustrestat": &collectors.LustreCollector{},
"cpustat": &collectors.CpustatCollector{}, "cpustat": &collectors.CpustatCollector{},
"topprocs": &collectors.TopProcsCollector{}, "topprocs": &collectors.TopProcsCollector{},
"nvidia": &collectors.NvidiaCollector{},
} }
var Sinks = map[string]sinks.SinkFuncs{ var Sinks = map[string]sinks.SinkFuncs{
"influxdb": &sinks.InfluxSink{}, "influxdb": &sinks.InfluxSink{},
"stdout": &sinks.StdoutSink{}, "stdout": &sinks.StdoutSink{},
"nats": &sinks.NatsSink{}, "nats": &sinks.NatsSink{},
} }
// Structure of the configuration file // Structure of the configuration file
@ -62,30 +63,30 @@ func LoadConfiguration(file string, config *GlobalConfig) error {
} }
func ReadCli() map[string]string { func ReadCli() map[string]string {
var m map[string]string var m map[string]string
cfg := flag.String("config", "./config.json", "Path to configuration file") cfg := flag.String("config", "./config.json", "Path to configuration file")
logfile := flag.String("log", "stderr", "Path for logfile") logfile := flag.String("log", "stderr", "Path for logfile")
flag.Parse() flag.Parse()
m = make(map[string]string) m = make(map[string]string)
m["configfile"] = *cfg m["configfile"] = *cfg
m["logfile"] = *logfile m["logfile"] = *logfile
return m return m
} }
func SetLogging(logfile string) error { func SetLogging(logfile string) error {
var file *os.File var file *os.File
var err error var err error
if (logfile != "stderr") { if logfile != "stderr" {
file, err = os.OpenFile(logfile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) file, err = os.OpenFile(logfile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
return err return err
} }
} else { } else {
file = os.Stderr file = os.Stderr
} }
log.SetOutput(file) log.SetOutput(file)
return nil return nil
} }
// Register an interrupt handler for Ctrl+C and similar. At signal, // Register an interrupt handler for Ctrl+C and similar. At signal,
@ -119,16 +120,16 @@ func main() {
} }
clicfg := ReadCli() clicfg := ReadCli()
err = SetLogging(clicfg["logfile"]) err = SetLogging(clicfg["logfile"])
if (err != nil) { if err != nil {
log.Print("Error setting up logging system to ", clicfg["logfile"]) log.Print("Error setting up logging system to ", clicfg["logfile"])
return return
} }
// Load and check configuration // Load and check configuration
err = LoadConfiguration(clicfg["configfile"], &config) err = LoadConfiguration(clicfg["configfile"], &config)
if (err != nil) { if err != nil {
log.Print("Error reading configuration file ", clicfg["configfile"]) log.Print("Error reading configuration file ", clicfg["configfile"])
return return
} }
if config.Interval <= 0 || time.Duration(config.Interval)*time.Second <= 0 { if config.Interval <= 0 || time.Duration(config.Interval)*time.Second <= 0 {
log.Print("Configuration value 'interval' must be greater than zero") log.Print("Configuration value 'interval' must be greater than zero")
@ -167,11 +168,18 @@ func main() {
shutdown(&wg, &config, sink) shutdown(&wg, &config, sink)
// Initialize all collectors // Initialize all collectors
tmp := make([]string, 0)
for _, c := range config.Collectors { for _, c := range config.Collectors {
col := Collectors[c] col := Collectors[c]
col.Init() err = col.Init()
log.Print("Start ", col.Name()) if err != nil {
log.Print("SKIP ", col.Name())
} else {
log.Print("Start ", col.Name())
tmp = append(tmp, c)
}
} }
config.Collectors = tmp
// Setup up ticker loop // Setup up ticker loop
log.Print("Running loop every ", time.Duration(config.Interval)*time.Second) log.Print("Running loop every ", time.Duration(config.Interval)*time.Second)
@ -229,19 +237,25 @@ func main() {
} }
// Send out node metrics // Send out node metrics
sink.Write("node", map[string]string{"host": host}, nodeFields, t) if len(nodeFields) > 0 {
sink.Write("node", map[string]string{"host": host}, nodeFields, t)
}
// Send out socket metrics (if any) // Send out socket metrics (if any)
if scount > 0 { if scount > 0 {
for sid, socket := range socketsFields { for sid, socket := range socketsFields {
sink.Write("socket", map[string]string{"socket": fmt.Sprintf("%d", sid), "host": host}, socket, t) if len(socket) > 0 {
sink.Write("socket", map[string]string{"socket": fmt.Sprintf("%d", sid), "host": host}, socket, t)
}
} }
} }
// Send out CPU metrics (if any) // Send out CPU metrics (if any)
if ccount > 0 { if ccount > 0 {
for cid, cpu := range cpuFields { for cid, cpu := range cpuFields {
sink.Write("cpu", map[string]string{"cpu": fmt.Sprintf("%d", cid), "host": host}, cpu, t) if len(cpu) > 0 {
sink.Write("cpu", map[string]string{"cpu": fmt.Sprintf("%d", cid), "host": host}, cpu, t)
}
} }
} }
} }