Some more config file checks and some comments

This commit is contained in:
Thomas Roehl 2021-03-26 13:08:44 +01:00
parent e7d933e60f
commit e92f54b411

View File

@ -3,16 +3,18 @@ package main
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/ClusterCockpit/cc-metric-collector/collectors"
protocol "github.com/influxdata/line-protocol"
"log" "log"
"os" "os"
"os/signal" "os/signal"
"strings"
"sync" "sync"
"time" "time"
"github.com/ClusterCockpit/cc-metric-collector/collectors"
protocol "github.com/influxdata/line-protocol"
) )
// List of provided collectors. Which collector should be run can be
// configured at 'collectors' list in 'config.json'.
var Collectors = map[string]collectors.MetricGetter{ var Collectors = map[string]collectors.MetricGetter{
"likwid": &collectors.LikwidCollector{}, "likwid": &collectors.LikwidCollector{},
"loadavg": &collectors.LoadavgCollector{}, "loadavg": &collectors.LoadavgCollector{},
@ -22,8 +24,10 @@ var Collectors = map[string]collectors.MetricGetter{
"lustrestat": &collectors.LustreCollector{}, "lustrestat": &collectors.LustreCollector{},
} }
// Pointer to the InfluxDB line protocol encoder
var serializer *protocol.Encoder var serializer *protocol.Encoder
// Structure of the configuration file
type GlobalConfig struct { type GlobalConfig struct {
Sink struct { Sink struct {
User string `json:"user"` User string `json:"user"`
@ -36,6 +40,7 @@ type GlobalConfig struct {
Collectors []string `json:"collectors"` Collectors []string `json:"collectors"`
} }
// Load JSON configuration file
func LoadConfiguration(file string, config *GlobalConfig) error { func LoadConfiguration(file string, config *GlobalConfig) error {
configFile, err := os.Open(file) configFile, err := os.Open(file)
defer configFile.Close() defer configFile.Close()
@ -47,6 +52,8 @@ func LoadConfiguration(file string, config *GlobalConfig) error {
return err return err
} }
// Register an interrupt handler for Ctrl+C and similar. At signal,
// all collectors are closed
func shutdown(wg *sync.WaitGroup, config *GlobalConfig) { func shutdown(wg *sync.WaitGroup, config *GlobalConfig) {
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt) signal.Notify(sigs, os.Interrupt)
@ -64,6 +71,7 @@ func shutdown(wg *sync.WaitGroup, config *GlobalConfig) {
}(wg) }(wg)
} }
// Create a new measurement in InfluxDB line protocol
func setupProtocol(scope string, tags map[string]string, fields map[string]interface{}, t time.Time) { func setupProtocol(scope string, tags map[string]string, fields map[string]interface{}, t time.Time) {
cur, err := protocol.New(scope, tags, fields, t) cur, err := protocol.New(scope, tags, fields, t)
if err != nil { if err != nil {
@ -85,6 +93,7 @@ func main() {
return return
} }
// Load and check configuration
LoadConfiguration("config.json", &config) LoadConfiguration("config.json", &config)
if config.Interval <= 0 || time.Duration(config.Interval)*time.Second <= 0 { if config.Interval <= 0 || time.Duration(config.Interval)*time.Second <= 0 {
log.Print("Configuration value 'interval' must be greater than zero") log.Print("Configuration value 'interval' must be greater than zero")
@ -94,30 +103,51 @@ func main() {
log.Print("Configuration value 'duration' must be greater than zero") log.Print("Configuration value 'duration' must be greater than zero")
return return
} }
if len(config.Collectors) == 0 {
var keys []string
for k := range Collectors {
keys = append(keys, k)
}
log.Print("Configuration value 'collectors' does not contain any collector. Available: ", strings.Join(keys, ", "))
return
}
for _, name := range config.Collectors {
if _, found := Collectors[name]; !found {
log.Print("Invalid collector '", name, "' in configuration")
return
}
}
// Register interrupt handler
shutdown(&wg, &config) shutdown(&wg, &config)
// Setup InfluxDB line protocol encoder
serializer = protocol.NewEncoder(os.Stdout) serializer = protocol.NewEncoder(os.Stdout)
serializer.SetPrecision(time.Second) serializer.SetPrecision(time.Second)
serializer.SetMaxLineBytes(1024) serializer.SetMaxLineBytes(1024)
// Initialize all collectors
for _, c := range config.Collectors { for _, c := range config.Collectors {
col := Collectors[c] col := Collectors[c]
col.Init() col.Init()
log.Print("Start ", col.Name()) log.Print("Start ", col.Name())
} }
log.Print(config.Interval, time.Duration(config.Interval)*time.Second) // Setup up ticker loop
log.Print("Running loop every ", time.Duration(config.Interval)*time.Second)
ticker := time.NewTicker(time.Duration(config.Interval) * time.Second) ticker := time.NewTicker(time.Duration(config.Interval) * time.Second)
done := make(chan bool) done := make(chan bool)
// Storage for all node metrics
nodeFields := make(map[string]interface{}) nodeFields := make(map[string]interface{})
// Storage for all socket metrics
slist := collectors.SocketList() slist := collectors.SocketList()
socketsFields := make(map[int]map[string]interface{}, len(slist)) socketsFields := make(map[int]map[string]interface{}, len(slist))
for _, s := range slist { for _, s := range slist {
socketsFields[s] = make(map[string]interface{}) socketsFields[s] = make(map[string]interface{})
} }
// Storage for all CPU metrics
clist := collectors.CpuList() clist := collectors.CpuList()
cpuFields := make(map[int]map[string]interface{}, len(clist)) cpuFields := make(map[int]map[string]interface{}, len(clist))
for _, s := range clist { for _, s := range clist {
@ -130,9 +160,12 @@ func main() {
case <-done: case <-done:
return return
case t := <-ticker.C: case t := <-ticker.C:
// Count how many socket and cpu metrics are returned
scount := 0 scount := 0
ccount := 0 ccount := 0
// Read all collectors are sort the results in the right
// storage locations
for _, c := range config.Collectors { for _, c := range config.Collectors {
col := Collectors[c] col := Collectors[c]
col.Read(time.Duration(config.Duration)) col.Read(time.Duration(config.Duration))
@ -153,15 +186,17 @@ func main() {
} }
} }
} }
// Send out node metrics
setupProtocol("node", map[string]string{"host": host}, nodeFields, t) setupProtocol("node", map[string]string{"host": host}, nodeFields, t)
// Send out socket metrics (if any)
if scount > 0 { if scount > 0 {
for sid, socket := range socketsFields { for sid, socket := range socketsFields {
setupProtocol("socket", map[string]string{"socket": fmt.Sprintf("%d", sid), "host": host}, socket, t) setupProtocol("socket", map[string]string{"socket": fmt.Sprintf("%d", sid), "host": host}, socket, t)
} }
} }
// Send out CPU metrics (if any)
if ccount > 0 { if ccount > 0 {
for cid, cpu := range cpuFields { for cid, cpu := range cpuFields {
setupProtocol("cpu", map[string]string{"cpu": fmt.Sprintf("%d", cid), "host": host}, cpu, t) setupProtocol("cpu", map[string]string{"cpu": fmt.Sprintf("%d", cid), "host": host}, cpu, t)
@ -171,5 +206,6 @@ func main() {
} }
}() }()
// Wait until receiving an interrupt
wg.Wait() wg.Wait()
} }