diff --git a/metric-collector.go b/metric-collector.go index 8f0e7dc..ea5c675 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -3,16 +3,18 @@ package main import ( "encoding/json" "fmt" + "github.com/ClusterCockpit/cc-metric-collector/collectors" + protocol "github.com/influxdata/line-protocol" "log" "os" "os/signal" + "strings" "sync" "time" - - "github.com/ClusterCockpit/cc-metric-collector/collectors" - protocol "github.com/influxdata/line-protocol" ) +// List of provided collectors. Which collector should be run can be +// configured at 'collectors' list in 'config.json'. var Collectors = map[string]collectors.MetricGetter{ "likwid": &collectors.LikwidCollector{}, "loadavg": &collectors.LoadavgCollector{}, @@ -22,8 +24,10 @@ var Collectors = map[string]collectors.MetricGetter{ "lustrestat": &collectors.LustreCollector{}, } +// Pointer to the InfluxDB line protocol encoder var serializer *protocol.Encoder +// Structure of the configuration file type GlobalConfig struct { Sink struct { User string `json:"user"` @@ -36,6 +40,7 @@ type GlobalConfig struct { Collectors []string `json:"collectors"` } +// Load JSON configuration file func LoadConfiguration(file string, config *GlobalConfig) error { configFile, err := os.Open(file) defer configFile.Close() @@ -47,6 +52,8 @@ func LoadConfiguration(file string, config *GlobalConfig) error { return err } +// Register an interrupt handler for Ctrl+C and similar. At signal, +// all collectors are closed func shutdown(wg *sync.WaitGroup, config *GlobalConfig) { sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt) @@ -64,6 +71,7 @@ func shutdown(wg *sync.WaitGroup, config *GlobalConfig) { }(wg) } +// Create a new measurement in InfluxDB line protocol func setupProtocol(scope string, tags map[string]string, fields map[string]interface{}, t time.Time) { cur, err := protocol.New(scope, tags, fields, t) if err != nil { @@ -85,6 +93,7 @@ func main() { return } + // Load and check configuration LoadConfiguration("config.json", &config) if config.Interval <= 0 || time.Duration(config.Interval)*time.Second <= 0 { log.Print("Configuration value 'interval' must be greater than zero") @@ -94,30 +103,51 @@ func main() { log.Print("Configuration value 'duration' must be greater than zero") return } + if len(config.Collectors) == 0 { + var keys []string + for k := range Collectors { + keys = append(keys, k) + } + log.Print("Configuration value 'collectors' does not contain any collector. Available: ", strings.Join(keys, ", ")) + return + } + for _, name := range config.Collectors { + if _, found := Collectors[name]; !found { + log.Print("Invalid collector '", name, "' in configuration") + return + } + } + // Register interrupt handler shutdown(&wg, &config) + // Setup InfluxDB line protocol encoder serializer = protocol.NewEncoder(os.Stdout) serializer.SetPrecision(time.Second) serializer.SetMaxLineBytes(1024) + // Initialize all collectors for _, c := range config.Collectors { col := Collectors[c] col.Init() log.Print("Start ", col.Name()) } - log.Print(config.Interval, time.Duration(config.Interval)*time.Second) + // Setup up ticker loop + log.Print("Running loop every ", time.Duration(config.Interval)*time.Second) ticker := time.NewTicker(time.Duration(config.Interval) * time.Second) done := make(chan bool) + // Storage for all node metrics nodeFields := make(map[string]interface{}) + // Storage for all socket metrics slist := collectors.SocketList() socketsFields := make(map[int]map[string]interface{}, len(slist)) for _, s := range slist { socketsFields[s] = make(map[string]interface{}) } + // Storage for all CPU metrics clist := collectors.CpuList() cpuFields := make(map[int]map[string]interface{}, len(clist)) for _, s := range clist { @@ -130,9 +160,12 @@ func main() { case <-done: return case t := <-ticker.C: + // Count how many socket and cpu metrics are returned scount := 0 ccount := 0 + // Read all collectors are sort the results in the right + // storage locations for _, c := range config.Collectors { col := Collectors[c] col.Read(time.Duration(config.Duration)) @@ -153,15 +186,17 @@ func main() { } } } - + // Send out node metrics setupProtocol("node", map[string]string{"host": host}, nodeFields, t) + // Send out socket metrics (if any) if scount > 0 { for sid, socket := range socketsFields { setupProtocol("socket", map[string]string{"socket": fmt.Sprintf("%d", sid), "host": host}, socket, t) } } + // Send out CPU metrics (if any) if ccount > 0 { for cid, cpu := range cpuFields { setupProtocol("cpu", map[string]string{"cpu": fmt.Sprintf("%d", cid), "host": host}, cpu, t) @@ -171,5 +206,6 @@ func main() { } }() + // Wait until receiving an interrupt wg.Wait() }