Merge branch 'main' of github.com:ClusterCockpit/cc-metric-collector into main

This commit is contained in:
Thomas Roehl 2021-03-19 17:21:05 +01:00
commit 6fbaf21841
2 changed files with 155 additions and 155 deletions

View File

@ -4,14 +4,17 @@ import (
"fmt" "fmt"
"os" "os"
"os/exec" "os/exec"
//"bytes"
//"bytes"
// "context" // "context"
"encoding/json" "encoding/json"
"path/filepath" "path/filepath"
//"sort" //"sort"
"errors"
"strings" "strings"
"time" "time"
"errors"
protocol "github.com/influxdata/line-protocol" protocol "github.com/influxdata/line-protocol"
) )
@ -41,16 +44,16 @@ type GlobalConfig struct {
} }
type CollectorConfig struct { type CollectorConfig struct {
Command string `json:"command"` Command string `json:"command"`
Args string `json:"arguments"` Args string `json:"arguments"`
Provides []string `json:"provides"` Provides []string `json:"provides"`
} }
type InternalCollectorConfig struct { type InternalCollectorConfig struct {
Config CollectorConfig Config CollectorConfig
Location string Location string
LastRun time.Time LastRun time.Time
encoder *protocol.Encoder encoder *protocol.Encoder
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -81,7 +84,6 @@ func LoadCollectorConfiguration(file string, config *CollectorConfig) error {
return err return err
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Load collector configurations // Load collector configurations
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -90,11 +92,11 @@ func GetSingleCollector(folders *[]string) filepath.WalkFunc {
if info.IsDir() { if info.IsDir() {
configfile := filepath.Join(path, "config.json") configfile := filepath.Join(path, "config.json")
if _, err := os.Stat(configfile); err == nil { if _, err := os.Stat(configfile); err == nil {
// TODO: Validate config? // TODO: Validate config?
p, err := filepath.Abs(path) p, err := filepath.Abs(path)
if err == nil { if err == nil {
*folders = append(*folders, p) *folders = append(*folders, p)
} }
} }
} }
return nil return nil
@ -109,72 +111,72 @@ func GetCollectorFolders(root string, folders *[]string) error {
return err return err
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Setup all collectors // Setup all collectors
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
func SetupCollectors(config GlobalConfig) ([]InternalCollectorConfig, error) { func SetupCollectors(config GlobalConfig) ([]InternalCollectorConfig, error) {
var folders []string var folders []string
var outconfig []InternalCollectorConfig var outconfig []InternalCollectorConfig
//encoder := protocol.NewEncoder(buf) //encoder := protocol.NewEncoder(buf)
//encoder.SetMaxLineBytes(1024) //encoder.SetMaxLineBytes(1024)
GetCollectorFolders(config.CollectorPath, &folders) GetCollectorFolders(config.CollectorPath, &folders)
for _, path := range folders { for _, path := range folders {
var col_config InternalCollectorConfig var col_config InternalCollectorConfig
LoadCollectorConfiguration(filepath.Join(path, "config.json"), &col_config.Config) LoadCollectorConfiguration(filepath.Join(path, "config.json"), &col_config.Config)
col_config.LastRun = time.Now() col_config.LastRun = time.Now()
col_config.Location = path col_config.Location = path
//buf := &bytes.Buffer{} //buf := &bytes.Buffer{}
//col_config.Encoder := protocol.NewEncoder(buf) //col_config.Encoder := protocol.NewEncoder(buf)
//col_config.Encoder.SetMaxLineBytes(1024) //col_config.Encoder.SetMaxLineBytes(1024)
outconfig = append(outconfig, col_config) outconfig = append(outconfig, col_config)
} }
return outconfig, nil return outconfig, nil
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Run collector // Run collector
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
func RunCollector(config InternalCollectorConfig) ([]string, error) { func RunCollector(config InternalCollectorConfig) ([]string, error) {
var results []string var results []string
var err error var err error
cmd := config.Config.Command cmd := config.Config.Command
if _, err = os.Stat(cmd); err != nil { if _, err = os.Stat(cmd); err != nil {
//fmt.Println(err.Error()) //fmt.Println(err.Error())
if ! strings.HasPrefix(cmd, "/") { if !strings.HasPrefix(cmd, "/") {
cmd = filepath.Join(config.Location, config.Config.Command) cmd = filepath.Join(config.Location, config.Config.Command)
if _, err = os.Stat(cmd); err != nil { if _, err = os.Stat(cmd); err != nil {
//fmt.Println(err.Error()) //fmt.Println(err.Error())
cmd, err = exec.LookPath(config.Config.Command) cmd, err = exec.LookPath(config.Config.Command)
} }
} }
} }
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
return results, err return results, err
} }
// TODO: Add timeout
command := exec.Command(cmd, config.Config.Args)
command.Dir = config.Location
command.Wait()
stdout, err := command.Output()
if err != nil {
//log.error(err.Error())
fmt.Println(err.Error())
return results, err
}
lines := strings.Split(string(stdout), "\n") // TODO: Add timeout
for _, l := range lines { command := exec.Command(cmd, config.Config.Args)
if strings.HasPrefix(l, "#") { continue } command.Dir = config.Location
results = append(results, l) command.Wait()
} stdout, err := command.Output()
return results, err if err != nil {
//log.error(err.Error())
fmt.Println(err.Error())
return results, err
}
lines := strings.Split(string(stdout), "\n")
for _, l := range lines {
if strings.HasPrefix(l, "#") {
continue
}
results = append(results, l)
}
return results, err
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -182,67 +184,65 @@ func RunCollector(config InternalCollectorConfig) ([]string, error) {
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
func SetupSink(config GlobalConfig) chan string { func SetupSink(config GlobalConfig) chan string {
c := make(chan string, 300) c := make(chan string, 300)
// TODO: Setup something for sending? Establish HTTP connection? // TODO: Setup something for sending? Establish HTTP connection?
return c return c
} }
func RunSink(config GlobalConfig, queue* chan string) (*time.Ticker, chan bool) { func RunSink(config GlobalConfig, queue *chan string) (*time.Ticker, chan bool) {
var interval time.Duration
interval := time.Duration(config.Report.Interval) * time.Second
interval = time.Duration(config.Report.Interval) * time.Second ticker := time.NewTicker(interval)
ticker := time.NewTicker(interval) done := make(chan bool)
done := make(chan bool)
go func() { go func() {
for { for {
select { select {
case <- done: case <-done:
return return
case t := <-ticker.C: case t := <-ticker.C:
fmt.Println("SinkTick at", t) fmt.Println("SinkTick at", t)
empty := false empty := false
var batch []string var batch []string
for empty == false { for empty == false {
select { select {
case metric := <- *queue: case metric := <-*queue:
fmt.Println(metric) fmt.Println(metric)
batch = append(batch, metric) batch = append(batch, metric)
default: default:
// No metric available, wait for the next iteration // No metric available, wait for the next iteration
empty = true empty = true
break break
} }
} }
for _, m := range batch { for _, m := range batch {
fmt.Println(m) fmt.Println(m)
} }
} }
} }
}() }()
return ticker, done return ticker, done
} }
func CloseSink(config GlobalConfig, queue *chan string, ticker *time.Ticker, done chan bool) { func CloseSink(config GlobalConfig, queue *chan string, ticker *time.Ticker, done chan bool) {
ticker.Stop() ticker.Stop()
done <- true done <- true
close(*queue) close(*queue)
} }
func MainLoop(config GlobalConfig, sink *chan string) (*time.Ticker, chan bool) { func MainLoop(config GlobalConfig, sink *chan string) (*time.Ticker, chan bool) {
var intConfig []InternalCollectorConfig var intConfig []InternalCollectorConfig
intConfig, err := SetupCollectors(config) intConfig, err := SetupCollectors(config)
if err != nil { if err != nil {
panic(err) panic(err)
} }
var interval time.Duration
interval := time.Duration(config.Schedule.Node.Frequency) * time.Second
interval = time.Duration(config.Schedule.Node.Frequency) * time.Second ticker := time.NewTicker(time.Second)
done := make(chan bool)
ticker := time.NewTicker(time.Second)
done := make(chan bool) go func() {
go func() {
for { for {
select { select {
case <-done: case <-done:
@ -251,20 +251,20 @@ func MainLoop(config GlobalConfig, sink *chan string) (*time.Ticker, chan bool)
fmt.Println("CollectorTick at", t) fmt.Println("CollectorTick at", t)
unix := time.Now() unix := time.Now()
for i, _ := range intConfig { for i, _ := range intConfig {
if time.Duration(unix.Sub(intConfig[i].LastRun)) > interval { if time.Duration(unix.Sub(intConfig[i].LastRun)) > interval {
res, err := RunCollector(intConfig[i]) res, err := RunCollector(intConfig[i])
if err != nil { if err != nil {
//log.error("Collector failed: ", err.Error()) //log.error("Collector failed: ", err.Error())
} else { } else {
//TODO: parse and skip in case of error, encode to []string //TODO: parse and skip in case of error, encode to []string
for _, r := range res { for _, r := range res {
if len(r) > 0 { if len(r) > 0 {
*sink <- r *sink <- r
} }
} }
} }
intConfig[i].LastRun = time.Now() intConfig[i].LastRun = time.Now()
} }
} }
} }
} }
@ -287,7 +287,7 @@ func main() {
// fmt.Println(CreatePoint("flops_any", tags, fields, time.Now().UnixNano())) // fmt.Println(CreatePoint("flops_any", tags, fields, time.Now().UnixNano()))
var config GlobalConfig var config GlobalConfig
LoadGlobalConfiguration("config.json", &config) LoadGlobalConfiguration("config.json", &config)
queue := SetupSink(config) queue := SetupSink(config)
sinkTicker, sinkDone := RunSink(config, &queue) sinkTicker, sinkDone := RunSink(config, &queue)
collectTicker, collectDone := MainLoop(config, &queue) collectTicker, collectDone := MainLoop(config, &queue)
@ -296,25 +296,25 @@ func main() {
collectDone <- true collectDone <- true
CloseSink(config, &queue, sinkTicker, sinkDone) CloseSink(config, &queue, sinkTicker, sinkDone)
// var folders []string // var folders []string
// GetCollectorFolders(config.CollectorPath, &folders) // GetCollectorFolders(config.CollectorPath, &folders)
// for _, path := range folders { // for _, path := range folders {
// var col_config CollectorConfig // var col_config CollectorConfig
// LoadCollectorConfiguration(filepath.Join(path, "config.json"), &col_config) // LoadCollectorConfiguration(filepath.Join(path, "config.json"), &col_config)
// stdout := run_cmd(filepath.Join(path, col_config.Command), col_config.Args) // stdout := run_cmd(filepath.Join(path, col_config.Command), col_config.Args)
// metrics := strings.Split(stdout, "\n") // metrics := strings.Split(stdout, "\n")
// for _, m := range metrics { // for _, m := range metrics {
// if len(m) > 0 { // if len(m) > 0 {
// t := strings.Fields(m) // t := strings.Fields(m)
// if len(t) == 2 { // if len(t) == 2 {
// var s strings.Builder // var s strings.Builder
// fmt.Fprintf(&s, "%s %d", m, time.Now().UnixNano()) // fmt.Fprintf(&s, "%s %d", m, time.Now().UnixNano())
// m = s.String() // m = s.String()
// } // }
// fmt.Println("SEND", m) // fmt.Println("SEND", m)
// } // }
// } // }
// } // }
} }

0
collectors/memavg/read_memavg.go Executable file → Normal file
View File