From 1815825206a857e0c81ffc57610a718e75943bd1 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 18 Mar 2021 08:06:57 +0100 Subject: [PATCH 1/2] Reformat. Make Metrics a string array. --- clusterdaemon.go | 307 +++++++++++++++++++++++------------------------ 1 file changed, 152 insertions(+), 155 deletions(-) diff --git a/clusterdaemon.go b/clusterdaemon.go index a4f52fb..e8ec566 100644 --- a/clusterdaemon.go +++ b/clusterdaemon.go @@ -1,199 +1,196 @@ package main import ( - "fmt" - "os" - "os/exec" -// "context" - "strings" - "time" - "sort" - "path/filepath" - "encoding/json" + "fmt" + "os" + "os/exec" + + // "context" + "encoding/json" + "path/filepath" + "sort" + "strings" + "time" ) - type GlobalConfig struct { - Sink struct { - User string `json:"user"` - Password string `json:"password"` - } `json:"sink"` - Host string `json:"host"` - Port string `json:"port"` - Report struct { - Levels string `json:"levels"` - Interval int `json:"interval"` - } `json:"report"` - Schedule struct { - Core struct { - Frequency int `json:"frequency"` - Duration int `json:"duration"` - } `json:"core"` - Node struct { - Frequency int `json:"frequency"` - Duration int `json:"duration"` - } `json:"node"` - } `json:"schedule"` - Metrics string `json:"metrics"` - CollectorPath string `json:"collector_path"` + Sink struct { + User string `json:"user"` + Password string `json:"password"` + } `json:"sink"` + Host string `json:"host"` + Port string `json:"port"` + Report struct { + Levels string `json:"levels"` + Interval int `json:"interval"` + } `json:"report"` + Schedule struct { + Core struct { + Frequency int `json:"frequency"` + Duration int `json:"duration"` + } `json:"core"` + Node struct { + Frequency int `json:"frequency"` + Duration int `json:"duration"` + } `json:"node"` + } `json:"schedule"` + Metrics []string `json:"metrics"` + CollectorPath string `json:"collector_path"` } type CollectorConfig struct { - Command string `json:"command"` - Args string `json:"arguments"` + Command string `json:"command"` + Args string `json:"arguments"` } -func LoadGlobalConfiguration(file string, config* GlobalConfig) error { - configFile, err := os.Open(file) - defer configFile.Close() - if err != nil { - fmt.Println(err.Error()) - } - jsonParser := json.NewDecoder(configFile) - jsonParser.Decode(config) - return err +func LoadGlobalConfiguration(file string, config *GlobalConfig) error { + configFile, err := os.Open(file) + defer configFile.Close() + if err != nil { + fmt.Println(err.Error()) + } + jsonParser := json.NewDecoder(configFile) + jsonParser.Decode(config) + return err } -func LoadCollectorConfiguration(file string, config* CollectorConfig) error { - configFile, err := os.Open(file) - defer configFile.Close() - if err != nil { - fmt.Println(err.Error()) - } - jsonParser := json.NewDecoder(configFile) - jsonParser.Decode(config) - return err +func LoadCollectorConfiguration(file string, config *CollectorConfig) error { + configFile, err := os.Open(file) + defer configFile.Close() + if err != nil { + fmt.Println(err.Error()) + } + jsonParser := json.NewDecoder(configFile) + jsonParser.Decode(config) + return err } func SortStringStringMap(input map[string]string) []string { - keys := make([]string, 0, len(input)) - output := make([]string, len(input)) - for k := range input { - keys = append(keys, k) - } - sort.Strings(keys) + keys := make([]string, 0, len(input)) + output := make([]string, len(input)) + for k := range input { + keys = append(keys, k) + } + sort.Strings(keys) - for i, k := range keys { - var s strings.Builder - fmt.Fprintf(&s, "%s=%s", k, string(input[k])) - output[i] = s.String() - } - return output + for i, k := range keys { + var s strings.Builder + fmt.Fprintf(&s, "%s=%s", k, string(input[k])) + output[i] = s.String() + } + return output } func SortStringInterfaceMap(input map[string]interface{}) []string { - keys := make([]string, 0, len(input)) - output := make([]string, len(input)) - for k := range input { - keys = append(keys, k) - } - sort.Strings(keys) + keys := make([]string, 0, len(input)) + output := make([]string, len(input)) + for k := range input { + keys = append(keys, k) + } + sort.Strings(keys) - for i, k := range keys { - var s strings.Builder - fmt.Fprintf(&s, "%s=%v", k, input[k]) - output[i] = s.String() - } - return output + for i, k := range keys { + var s strings.Builder + fmt.Fprintf(&s, "%s=%v", k, input[k]) + output[i] = s.String() + } + return output } func CreatePoint(metricname string, tags map[string]string, fields map[string]interface{}, timestamp int64) string { - var s strings.Builder - taglist := SortStringStringMap(tags) - fieldlist := SortStringInterfaceMap(fields) + var s strings.Builder + taglist := SortStringStringMap(tags) + fieldlist := SortStringInterfaceMap(fields) - if (len(taglist) > 0) { - fmt.Fprintf(&s, "%s,%s %s %d", metricname, strings.Join(taglist, ","), strings.Join(fieldlist, ","), timestamp) - } else { - fmt.Fprintf(&s, "%s %s %d", metricname, strings.Join(fieldlist, ","), timestamp) - } - return s.String() + if len(taglist) > 0 { + fmt.Fprintf(&s, "%s,%s %s %d", metricname, strings.Join(taglist, ","), strings.Join(fieldlist, ","), timestamp) + } else { + fmt.Fprintf(&s, "%s %s %d", metricname, strings.Join(fieldlist, ","), timestamp) + } + return s.String() } func run_cmd(cmd string, cmd_opts string) string { - //ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) - //defer cancel() - //command := exec.CommandContext(ctx, cmd, strings.Join(cmd_opts, " ")) - command := exec.Command(cmd, cmd_opts) - command.Wait() - //select { - // case <-time.After(101 * time.Second): + //ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) + //defer cancel() + //command := exec.CommandContext(ctx, cmd, strings.Join(cmd_opts, " ")) + command := exec.Command(cmd, cmd_opts) + command.Wait() + //select { + // case <-time.After(101 * time.Second): // fmt.Println("overslept") // case <-ctx.Done(): // fmt.Println(ctx.Err()) // prints "context deadline exceeded" //} - stdout, err := command.Output() + stdout, err := command.Output() - if err != nil { - fmt.Println(err.Error()) - return "" - } - return (string(stdout)) + if err != nil { + fmt.Println(err.Error()) + return "" + } + return (string(stdout)) } func GetSingleCollector(folders *[]string) filepath.WalkFunc { - return func(path string, info os.FileInfo, err error) error { - if err != nil { - panic(err) - } - if (info.IsDir()) { - configfile := filepath.Join(path, "config.json") - if _, err := os.Stat(configfile); err == nil { - *folders = append(*folders, path) - } - } - return nil - } + return func(path string, info os.FileInfo, err error) error { + if err != nil { + panic(err) + } + if info.IsDir() { + configfile := filepath.Join(path, "config.json") + if _, err := os.Stat(configfile); err == nil { + *folders = append(*folders, path) + } + } + return nil + } } func GetCollectorFolders(root string, folders *[]string) error { - - err := filepath.Walk(root, GetSingleCollector(folders)) - if err != nil { - panic(err) - } - return err + + err := filepath.Walk(root, GetSingleCollector(folders)) + if err != nil { + panic(err) + } + return err } func main() { -// fmt.Println("Hello") -// cmd_opts := []string{"la","le","lu"} -// cmd := "echo" -// s := run_cmd(cmd, cmd_opts) -// fmt.Println(s) -// tags := map[string]string { -// "host" : "broadep2", -// } -// fields := map[string]interface{} { -// "value" : float64(1.0), -// } -// fmt.Println(CreatePoint("flops_any", tags, fields, time.Now().UnixNano())) - var config GlobalConfig - LoadGlobalConfiguration("config.json", &config) -// fmt.Println(config) - var folders []string - GetCollectorFolders(config.CollectorPath, &folders) -// fmt.Println(folders) - for _, path := range folders { - var col_config CollectorConfig - configfile := filepath.Join(path, "config.json") - LoadCollectorConfiguration(configfile, &col_config) - cmd := filepath.Join(path, col_config.Command) - stdout := run_cmd(cmd, col_config.Args) - metrics := strings.Split(stdout, "\n") - for _, m := range metrics { - if len(m) > 0 { - t := strings.Fields(m) - if len(t) == 2 { - var s strings.Builder - fmt.Fprintf(&s, "%s %d", m, time.Now().UnixNano()) - m = s.String() - } - fmt.Println("SEND", m) - } - } - - } - + // fmt.Println("Hello") + // cmd_opts := []string{"la","le","lu"} + // cmd := "echo" + // s := run_cmd(cmd, cmd_opts) + // fmt.Println(s) + // tags := map[string]string { + // "host" : "broadep2", + // } + // fields := map[string]interface{} { + // "value" : float64(1.0), + // } + // fmt.Println(CreatePoint("flops_any", tags, fields, time.Now().UnixNano())) + var config GlobalConfig + LoadGlobalConfiguration("config.json", &config) + + var folders []string + GetCollectorFolders(config.CollectorPath, &folders) + + for _, path := range folders { + var col_config CollectorConfig + LoadCollectorConfiguration(filepath.Join(path, "config.json"), &col_config) + stdout := run_cmd(filepath.Join(path, col_config.Command), col_config.Args) + + metrics := strings.Split(stdout, "\n") + for _, m := range metrics { + if len(m) > 0 { + t := strings.Fields(m) + if len(t) == 2 { + var s strings.Builder + fmt.Fprintf(&s, "%s %d", m, time.Now().UnixNano()) + m = s.String() + } + fmt.Println("SEND", m) + } + } + } } From e0b9c9e8b2a43c714b9ae58f41a9565accecb716 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 18 Mar 2021 09:11:25 +0100 Subject: [PATCH 2/2] Add example for collector --- collectors/likwid.go | 64 ++++++++++++++++++++++++++++++++++++++++++++ go.mod | 5 ++++ go.sum | 2 ++ 3 files changed, 71 insertions(+) create mode 100644 collectors/likwid.go create mode 100644 go.mod create mode 100644 go.sum diff --git a/collectors/likwid.go b/collectors/likwid.go new file mode 100644 index 0000000..829b5c3 --- /dev/null +++ b/collectors/likwid.go @@ -0,0 +1,64 @@ +package collectors + +import ( + "bytes" + "fmt" + "time" + + protocol "github.com/influxdata/line-protocol" +) + +type LikwidCollector struct { + name string + tags []*protocol.Tag + fields []*protocol.Field + t time.Time + encoder *protocol.Encoder +} + +func (c *LikwidCollector) Name() string { + return c.name +} +func (c *LikwidCollector) TagList() []*protocol.Tag { + return c.tags +} + +func (c *LikwidCollector) FieldList() []*protocol.Field { + return c.fields +} + +func (c *LikwidCollector) Time() time.Time { + return c.t +} + +func (c *LikwidCollector) New() { + buf := &bytes.Buffer{} + c.encoder = protocol.NewEncoder(buf) + c.encoder.SetMaxLineBytes(1024) +} + +func (c *LikwidCollector) Start( + level string, + frequency time.Duration, + duration int) { + ticker := time.NewTicker(frequency * time.Second) + done := make(chan bool) + + go func() { + for { + select { + case <-done: + return + case t := <-ticker.C: + fmt.Println("Tick at", t) + + c.encoder.Encode(c) + } + } + }() + + time.Sleep(1600 * time.Second) + ticker.Stop() + done <- true + fmt.Println("Ticker stopped") +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9771f8d --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module github.com/ClusterCockpit/cc-metric-collector + +go 1.16 + +require github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..3a13697 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 h1:vilfsDSy7TDxedi9gyBkMvAirat/oRcL0lFdJBf6tdM= +github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=