From 4519632ef2fbefb427eab2e9a0c7aa5876a71c26 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 19 Mar 2021 09:34:12 +0100 Subject: [PATCH] Reformat, remove x bit on memavg Collector --- clusterdaemon.go | 310 +++++++++++++++---------------- collectors/memavg/read_memavg.go | 0 2 files changed, 155 insertions(+), 155 deletions(-) mode change 100755 => 100644 collectors/memavg/read_memavg.go diff --git a/clusterdaemon.go b/clusterdaemon.go index 6d9d911..afe82b8 100644 --- a/clusterdaemon.go +++ b/clusterdaemon.go @@ -4,14 +4,17 @@ import ( "fmt" "os" "os/exec" - //"bytes" + + //"bytes" // "context" "encoding/json" "path/filepath" + //"sort" + "errors" "strings" "time" - "errors" + protocol "github.com/influxdata/line-protocol" ) @@ -41,16 +44,16 @@ type GlobalConfig struct { } type CollectorConfig struct { - Command string `json:"command"` - Args string `json:"arguments"` + Command string `json:"command"` + Args string `json:"arguments"` Provides []string `json:"provides"` } type InternalCollectorConfig struct { - Config CollectorConfig - Location string - LastRun time.Time - encoder *protocol.Encoder + Config CollectorConfig + Location string + LastRun time.Time + encoder *protocol.Encoder } ////////////////////////////////////////////////////////////////////////////// @@ -81,7 +84,6 @@ func LoadCollectorConfiguration(file string, config *CollectorConfig) error { return err } - ////////////////////////////////////////////////////////////////////////////// // Load collector configurations ////////////////////////////////////////////////////////////////////////////// @@ -90,11 +92,11 @@ func GetSingleCollector(folders *[]string) filepath.WalkFunc { if info.IsDir() { configfile := filepath.Join(path, "config.json") if _, err := os.Stat(configfile); err == nil { - // TODO: Validate config? - p, err := filepath.Abs(path) - if err == nil { - *folders = append(*folders, p) - } + // TODO: Validate config? + p, err := filepath.Abs(path) + if err == nil { + *folders = append(*folders, p) + } } } return nil @@ -109,72 +111,72 @@ func GetCollectorFolders(root string, folders *[]string) error { return err } - ////////////////////////////////////////////////////////////////////////////// // Setup all collectors ////////////////////////////////////////////////////////////////////////////// func SetupCollectors(config GlobalConfig) ([]InternalCollectorConfig, error) { - var folders []string - var outconfig []InternalCollectorConfig - //encoder := protocol.NewEncoder(buf) - //encoder.SetMaxLineBytes(1024) - GetCollectorFolders(config.CollectorPath, &folders) - for _, path := range folders { - var col_config InternalCollectorConfig - LoadCollectorConfiguration(filepath.Join(path, "config.json"), &col_config.Config) - col_config.LastRun = time.Now() - col_config.Location = path - //buf := &bytes.Buffer{} - //col_config.Encoder := protocol.NewEncoder(buf) - //col_config.Encoder.SetMaxLineBytes(1024) - outconfig = append(outconfig, col_config) - } - return outconfig, nil + var folders []string + var outconfig []InternalCollectorConfig + //encoder := protocol.NewEncoder(buf) + //encoder.SetMaxLineBytes(1024) + GetCollectorFolders(config.CollectorPath, &folders) + for _, path := range folders { + var col_config InternalCollectorConfig + LoadCollectorConfiguration(filepath.Join(path, "config.json"), &col_config.Config) + col_config.LastRun = time.Now() + col_config.Location = path + //buf := &bytes.Buffer{} + //col_config.Encoder := protocol.NewEncoder(buf) + //col_config.Encoder.SetMaxLineBytes(1024) + outconfig = append(outconfig, col_config) + } + return outconfig, nil } - ////////////////////////////////////////////////////////////////////////////// // Run collector ////////////////////////////////////////////////////////////////////////////// func RunCollector(config InternalCollectorConfig) ([]string, error) { - var results []string - var err error - cmd := config.Config.Command + var results []string + var err error + cmd := config.Config.Command - if _, err = os.Stat(cmd); err != nil { - //fmt.Println(err.Error()) - if ! strings.HasPrefix(cmd, "/") { - cmd = filepath.Join(config.Location, config.Config.Command) - if _, err = os.Stat(cmd); err != nil { - //fmt.Println(err.Error()) - cmd, err = exec.LookPath(config.Config.Command) - } - } - } - if err != nil { - fmt.Println(err.Error()) - return results, err - } - - // TODO: Add timeout - - command := exec.Command(cmd, config.Config.Args) - command.Dir = config.Location - command.Wait() - stdout, err := command.Output() - if err != nil { - //log.error(err.Error()) - fmt.Println(err.Error()) - return results, err - } + if _, err = os.Stat(cmd); err != nil { + //fmt.Println(err.Error()) + if !strings.HasPrefix(cmd, "/") { + cmd = filepath.Join(config.Location, config.Config.Command) + if _, err = os.Stat(cmd); err != nil { + //fmt.Println(err.Error()) + cmd, err = exec.LookPath(config.Config.Command) + } + } + } + if err != nil { + fmt.Println(err.Error()) + return results, err + } - lines := strings.Split(string(stdout), "\n") - - for _, l := range lines { - if strings.HasPrefix(l, "#") { continue } - results = append(results, l) - } - return results, err + // TODO: Add timeout + + command := exec.Command(cmd, config.Config.Args) + command.Dir = config.Location + command.Wait() + stdout, err := command.Output() + if err != nil { + //log.error(err.Error()) + fmt.Println(err.Error()) + return results, err + } + + lines := strings.Split(string(stdout), "\n") + + for _, l := range lines { + if strings.HasPrefix(l, "#") { + continue + } + results = append(results, l) + } + return results, err } ////////////////////////////////////////////////////////////////////////////// @@ -182,67 +184,65 @@ func RunCollector(config InternalCollectorConfig) ([]string, error) { ////////////////////////////////////////////////////////////////////////////// func SetupSink(config GlobalConfig) chan string { - c := make(chan string, 300) - - // TODO: Setup something for sending? Establish HTTP connection? - return c + c := make(chan string, 300) + + // TODO: Setup something for sending? Establish HTTP connection? + return c } -func RunSink(config GlobalConfig, queue* chan string) (*time.Ticker, chan bool) { - var interval time.Duration - - interval = time.Duration(config.Report.Interval) * time.Second - ticker := time.NewTicker(interval) - done := make(chan bool) - go func() { - for { - select { - case <- done: - return - case t := <-ticker.C: - fmt.Println("SinkTick at", t) - empty := false - var batch []string - for empty == false { - select { - case metric := <- *queue: - fmt.Println(metric) - batch = append(batch, metric) - default: - // No metric available, wait for the next iteration - empty = true - break - } - } - for _, m := range batch { - fmt.Println(m) - } - } - } - }() - return ticker, done +func RunSink(config GlobalConfig, queue *chan string) (*time.Ticker, chan bool) { + + interval := time.Duration(config.Report.Interval) * time.Second + ticker := time.NewTicker(interval) + done := make(chan bool) + + go func() { + for { + select { + case <-done: + return + case t := <-ticker.C: + fmt.Println("SinkTick at", t) + empty := false + var batch []string + for empty == false { + select { + case metric := <-*queue: + fmt.Println(metric) + batch = append(batch, metric) + default: + // No metric available, wait for the next iteration + empty = true + break + } + } + for _, m := range batch { + fmt.Println(m) + } + } + } + }() + return ticker, done } func CloseSink(config GlobalConfig, queue *chan string, ticker *time.Ticker, done chan bool) { - ticker.Stop() - done <- true - close(*queue) + ticker.Stop() + done <- true + close(*queue) } - func MainLoop(config GlobalConfig, sink *chan string) (*time.Ticker, chan bool) { - var intConfig []InternalCollectorConfig - intConfig, err := SetupCollectors(config) - if err != nil { - panic(err) - } - var interval time.Duration - - interval = time.Duration(config.Schedule.Node.Frequency) * time.Second - - ticker := time.NewTicker(time.Second) - done := make(chan bool) - go func() { + var intConfig []InternalCollectorConfig + intConfig, err := SetupCollectors(config) + if err != nil { + panic(err) + } + + interval := time.Duration(config.Schedule.Node.Frequency) * time.Second + ticker := time.NewTicker(time.Second) + done := make(chan bool) + + go func() { for { select { case <-done: @@ -251,20 +251,20 @@ func MainLoop(config GlobalConfig, sink *chan string) (*time.Ticker, chan bool) fmt.Println("CollectorTick at", t) unix := time.Now() for i, _ := range intConfig { - if time.Duration(unix.Sub(intConfig[i].LastRun)) > interval { - res, err := RunCollector(intConfig[i]) - if err != nil { - //log.error("Collector failed: ", err.Error()) - } else { - //TODO: parse and skip in case of error, encode to []string - for _, r := range res { - if len(r) > 0 { - *sink <- r - } - } - } - intConfig[i].LastRun = time.Now() - } + if time.Duration(unix.Sub(intConfig[i].LastRun)) > interval { + res, err := RunCollector(intConfig[i]) + if err != nil { + //log.error("Collector failed: ", err.Error()) + } else { + //TODO: parse and skip in case of error, encode to []string + for _, r := range res { + if len(r) > 0 { + *sink <- r + } + } + } + intConfig[i].LastRun = time.Now() + } } } } @@ -287,7 +287,7 @@ func main() { // fmt.Println(CreatePoint("flops_any", tags, fields, time.Now().UnixNano())) var config GlobalConfig LoadGlobalConfiguration("config.json", &config) - + queue := SetupSink(config) sinkTicker, sinkDone := RunSink(config, &queue) collectTicker, collectDone := MainLoop(config, &queue) @@ -296,25 +296,25 @@ func main() { collectDone <- true CloseSink(config, &queue, sinkTicker, sinkDone) -// var folders []string -// GetCollectorFolders(config.CollectorPath, &folders) + // var folders []string + // GetCollectorFolders(config.CollectorPath, &folders) -// for _, path := range folders { -// var col_config CollectorConfig -// LoadCollectorConfiguration(filepath.Join(path, "config.json"), &col_config) -// stdout := run_cmd(filepath.Join(path, col_config.Command), col_config.Args) + // for _, path := range folders { + // var col_config CollectorConfig + // LoadCollectorConfiguration(filepath.Join(path, "config.json"), &col_config) + // stdout := run_cmd(filepath.Join(path, col_config.Command), col_config.Args) -// metrics := strings.Split(stdout, "\n") -// for _, m := range metrics { -// if len(m) > 0 { -// t := strings.Fields(m) -// if len(t) == 2 { -// var s strings.Builder -// fmt.Fprintf(&s, "%s %d", m, time.Now().UnixNano()) -// m = s.String() -// } -// fmt.Println("SEND", m) -// } -// } -// } + // metrics := strings.Split(stdout, "\n") + // for _, m := range metrics { + // if len(m) > 0 { + // t := strings.Fields(m) + // if len(t) == 2 { + // var s strings.Builder + // fmt.Fprintf(&s, "%s %d", m, time.Now().UnixNano()) + // m = s.String() + // } + // fmt.Println("SEND", m) + // } + // } + // } } diff --git a/collectors/memavg/read_memavg.go b/collectors/memavg/read_memavg.go old mode 100755 new mode 100644