mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-01-13 15:49:06 +01:00
294 lines
7.3 KiB
Go
294 lines
7.3 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"github.com/ClusterCockpit/cc-metric-collector/collectors"
|
|
"github.com/ClusterCockpit/cc-metric-collector/receivers"
|
|
"github.com/ClusterCockpit/cc-metric-collector/sinks"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// List of provided collectors. Which collector should be run can be
|
|
// configured at 'collectors' list in 'config.json'.
|
|
var Collectors = map[string]collectors.MetricGetter{
|
|
"likwid": &collectors.LikwidCollector{},
|
|
"loadavg": &collectors.LoadavgCollector{},
|
|
"memstat": &collectors.MemstatCollector{},
|
|
"netstat": &collectors.NetstatCollector{},
|
|
"ibstat": &collectors.InfinibandCollector{},
|
|
"lustrestat": &collectors.LustreCollector{},
|
|
"cpustat": &collectors.CpustatCollector{},
|
|
"topprocs": &collectors.TopProcsCollector{},
|
|
"nvidia": &collectors.NvidiaCollector{},
|
|
}
|
|
|
|
var Sinks = map[string]sinks.SinkFuncs{
|
|
"influxdb": &sinks.InfluxSink{},
|
|
"stdout": &sinks.StdoutSink{},
|
|
"nats": &sinks.NatsSink{},
|
|
}
|
|
|
|
var Receivers = map[string]receivers.ReceiverFuncs{
|
|
"nats": &receivers.NatsReceiver{},
|
|
}
|
|
|
|
// Structure of the configuration file
|
|
type GlobalConfig struct {
|
|
Sink sinks.SinkConfig `json:"sink"`
|
|
Interval int `json:"interval"`
|
|
Duration int `json:"duration"`
|
|
Collectors []string `json:"collectors"`
|
|
Receiver receivers.ReceiverConfig `json:"receiver"`
|
|
}
|
|
|
|
// Load JSON configuration file
|
|
func LoadConfiguration(file string, config *GlobalConfig) error {
|
|
configFile, err := os.Open(file)
|
|
defer configFile.Close()
|
|
if err != nil {
|
|
fmt.Println(err.Error())
|
|
return err
|
|
}
|
|
jsonParser := json.NewDecoder(configFile)
|
|
jsonParser.Decode(config)
|
|
return err
|
|
}
|
|
|
|
func ReadCli() map[string]string {
|
|
var m map[string]string
|
|
cfg := flag.String("config", "./config.json", "Path to configuration file")
|
|
logfile := flag.String("log", "stderr", "Path for logfile")
|
|
flag.Parse()
|
|
m = make(map[string]string)
|
|
m["configfile"] = *cfg
|
|
m["logfile"] = *logfile
|
|
return m
|
|
}
|
|
|
|
func SetLogging(logfile string) error {
|
|
var file *os.File
|
|
var err error
|
|
if logfile != "stderr" {
|
|
file, err = os.OpenFile(logfile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
return err
|
|
}
|
|
} else {
|
|
file = os.Stderr
|
|
}
|
|
log.SetOutput(file)
|
|
return nil
|
|
}
|
|
|
|
// Register an interrupt handler for Ctrl+C and similar. At signal,
|
|
// all collectors are closed
|
|
func shutdown(wg *sync.WaitGroup, config *GlobalConfig, sink sinks.SinkFuncs, recv receivers.ReceiverFuncs) {
|
|
sigs := make(chan os.Signal, 1)
|
|
signal.Notify(sigs, os.Interrupt)
|
|
|
|
go func(wg *sync.WaitGroup) {
|
|
<-sigs
|
|
log.Print("Shutdown...")
|
|
for _, c := range config.Collectors {
|
|
col := Collectors[c]
|
|
log.Print("Stop ", col.Name())
|
|
col.Close()
|
|
}
|
|
time.Sleep(1 * time.Second)
|
|
if recv != nil {
|
|
recv.Close()
|
|
}
|
|
sink.Close()
|
|
wg.Done()
|
|
}(wg)
|
|
}
|
|
|
|
func main() {
|
|
var config GlobalConfig
|
|
var wg sync.WaitGroup
|
|
var recv receivers.ReceiverFuncs = nil
|
|
var use_recv bool
|
|
use_recv = false
|
|
wg.Add(1)
|
|
host, err := os.Hostname()
|
|
if err != nil {
|
|
log.Print(err)
|
|
return
|
|
}
|
|
clicfg := ReadCli()
|
|
err = SetLogging(clicfg["logfile"])
|
|
if err != nil {
|
|
log.Print("Error setting up logging system to ", clicfg["logfile"])
|
|
return
|
|
}
|
|
|
|
// Load and check configuration
|
|
err = LoadConfiguration(clicfg["configfile"], &config)
|
|
if err != nil {
|
|
log.Print("Error reading configuration file ", clicfg["configfile"])
|
|
return
|
|
}
|
|
if config.Interval <= 0 || time.Duration(config.Interval)*time.Second <= 0 {
|
|
log.Print("Configuration value 'interval' must be greater than zero")
|
|
return
|
|
}
|
|
if config.Duration <= 0 {
|
|
log.Print("Configuration value 'duration' must be greater than zero")
|
|
return
|
|
}
|
|
if len(config.Collectors) == 0 {
|
|
var keys []string
|
|
for k := range Collectors {
|
|
keys = append(keys, k)
|
|
}
|
|
log.Print("Configuration value 'collectors' does not contain any collector. Available: ", strings.Join(keys, ", "))
|
|
return
|
|
}
|
|
for _, name := range config.Collectors {
|
|
if _, found := Collectors[name]; !found {
|
|
log.Print("Invalid collector '", name, "' in configuration")
|
|
return
|
|
}
|
|
}
|
|
if _, found := Sinks[config.Sink.Type]; !found {
|
|
log.Print("Invalid sink type '", config.Sink.Type, "' in configuration")
|
|
return
|
|
}
|
|
// Setup sink
|
|
sink := Sinks[config.Sink.Type]
|
|
err = sink.Init(config.Sink)
|
|
if err != nil {
|
|
log.Print(err)
|
|
return
|
|
}
|
|
// Setup receiver
|
|
if config.Receiver.Type != "none" {
|
|
if _, found := Receivers[config.Receiver.Type]; !found {
|
|
log.Print("Invalid receiver type '", config.Receiver.Type, "' in configuration")
|
|
return
|
|
} else {
|
|
recv = Receivers[config.Receiver.Type]
|
|
err = recv.Init(config.Receiver, sink)
|
|
if err == nil {
|
|
use_recv = true
|
|
} else {
|
|
log.Print(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Register interrupt handler
|
|
shutdown(&wg, &config, sink, recv)
|
|
|
|
// Initialize all collectors
|
|
tmp := make([]string, 0)
|
|
for _, c := range config.Collectors {
|
|
col := Collectors[c]
|
|
err = col.Init()
|
|
if err != nil {
|
|
log.Print("SKIP ", col.Name())
|
|
} else {
|
|
log.Print("Start ", col.Name())
|
|
tmp = append(tmp, c)
|
|
}
|
|
}
|
|
config.Collectors = tmp
|
|
|
|
// Setup up ticker loop
|
|
log.Print("Running loop every ", time.Duration(config.Interval)*time.Second)
|
|
ticker := time.NewTicker(time.Duration(config.Interval) * time.Second)
|
|
done := make(chan bool)
|
|
|
|
// Storage for all node metrics
|
|
nodeFields := make(map[string]interface{})
|
|
|
|
// Storage for all socket metrics
|
|
slist := collectors.SocketList()
|
|
socketsFields := make(map[int]map[string]interface{}, len(slist))
|
|
for _, s := range slist {
|
|
socketsFields[s] = make(map[string]interface{})
|
|
}
|
|
|
|
// Storage for all CPU metrics
|
|
clist := collectors.CpuList()
|
|
cpuFields := make(map[int]map[string]interface{}, len(clist))
|
|
for _, s := range clist {
|
|
cpuFields[s] = make(map[string]interface{})
|
|
}
|
|
|
|
// Start receiver
|
|
if use_recv {
|
|
recv.Start()
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-done:
|
|
return
|
|
case t := <-ticker.C:
|
|
// Count how many socket and cpu metrics are returned
|
|
scount := 0
|
|
ccount := 0
|
|
|
|
// Read all collectors are sort the results in the right
|
|
// storage locations
|
|
for _, c := range config.Collectors {
|
|
col := Collectors[c]
|
|
col.Read(time.Duration(config.Duration))
|
|
|
|
for key, val := range col.GetNodeMetric() {
|
|
nodeFields[key] = val
|
|
}
|
|
for sid, socket := range col.GetSocketMetrics() {
|
|
for key, val := range socket {
|
|
socketsFields[sid][key] = val
|
|
scount++
|
|
}
|
|
}
|
|
for cid, cpu := range col.GetCpuMetrics() {
|
|
for key, val := range cpu {
|
|
cpuFields[cid][key] = val
|
|
ccount++
|
|
}
|
|
}
|
|
}
|
|
|
|
// Send out node metrics
|
|
if len(nodeFields) > 0 {
|
|
sink.Write("node", map[string]string{"host": host}, nodeFields, t)
|
|
}
|
|
|
|
// Send out socket metrics (if any)
|
|
if scount > 0 {
|
|
for sid, socket := range socketsFields {
|
|
if len(socket) > 0 {
|
|
sink.Write("socket", map[string]string{"socket": fmt.Sprintf("%d", sid), "host": host}, socket, t)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Send out CPU metrics (if any)
|
|
if ccount > 0 {
|
|
for cid, cpu := range cpuFields {
|
|
if len(cpu) > 0 {
|
|
sink.Write("cpu", map[string]string{"cpu": fmt.Sprintf("%d", cid), "host": host}, cpu, t)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Wait until receiving an interrupt
|
|
wg.Wait()
|
|
}
|