Merge pull request #1 from ClusterCockpit/dev-jan

Go Modules and further changes
This commit is contained in:
Thomas Gruber 2021-03-18 12:47:47 +01:00 committed by GitHub
commit 63cdc060c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 223 additions and 155 deletions

View File

@ -1,199 +1,196 @@
package main package main
import ( import (
"fmt" "fmt"
"os" "os"
"os/exec" "os/exec"
// "context"
"strings" // "context"
"time" "encoding/json"
"sort" "path/filepath"
"path/filepath" "sort"
"encoding/json" "strings"
"time"
) )
type GlobalConfig struct { type GlobalConfig struct {
Sink struct { Sink struct {
User string `json:"user"` User string `json:"user"`
Password string `json:"password"` Password string `json:"password"`
} `json:"sink"` } `json:"sink"`
Host string `json:"host"` Host string `json:"host"`
Port string `json:"port"` Port string `json:"port"`
Report struct { Report struct {
Levels string `json:"levels"` Levels string `json:"levels"`
Interval int `json:"interval"` Interval int `json:"interval"`
} `json:"report"` } `json:"report"`
Schedule struct { Schedule struct {
Core struct { Core struct {
Frequency int `json:"frequency"` Frequency int `json:"frequency"`
Duration int `json:"duration"` Duration int `json:"duration"`
} `json:"core"` } `json:"core"`
Node struct { Node struct {
Frequency int `json:"frequency"` Frequency int `json:"frequency"`
Duration int `json:"duration"` Duration int `json:"duration"`
} `json:"node"` } `json:"node"`
} `json:"schedule"` } `json:"schedule"`
Metrics string `json:"metrics"` Metrics []string `json:"metrics"`
CollectorPath string `json:"collector_path"` CollectorPath string `json:"collector_path"`
} }
type CollectorConfig struct { type CollectorConfig struct {
Command string `json:"command"` Command string `json:"command"`
Args string `json:"arguments"` Args string `json:"arguments"`
} }
func LoadGlobalConfiguration(file string, config* GlobalConfig) error { func LoadGlobalConfiguration(file string, config *GlobalConfig) error {
configFile, err := os.Open(file) configFile, err := os.Open(file)
defer configFile.Close() defer configFile.Close()
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
} }
jsonParser := json.NewDecoder(configFile) jsonParser := json.NewDecoder(configFile)
jsonParser.Decode(config) jsonParser.Decode(config)
return err return err
} }
func LoadCollectorConfiguration(file string, config* CollectorConfig) error { func LoadCollectorConfiguration(file string, config *CollectorConfig) error {
configFile, err := os.Open(file) configFile, err := os.Open(file)
defer configFile.Close() defer configFile.Close()
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
} }
jsonParser := json.NewDecoder(configFile) jsonParser := json.NewDecoder(configFile)
jsonParser.Decode(config) jsonParser.Decode(config)
return err return err
} }
func SortStringStringMap(input map[string]string) []string { func SortStringStringMap(input map[string]string) []string {
keys := make([]string, 0, len(input)) keys := make([]string, 0, len(input))
output := make([]string, len(input)) output := make([]string, len(input))
for k := range input { for k := range input {
keys = append(keys, k) keys = append(keys, k)
} }
sort.Strings(keys) sort.Strings(keys)
for i, k := range keys { for i, k := range keys {
var s strings.Builder var s strings.Builder
fmt.Fprintf(&s, "%s=%s", k, string(input[k])) fmt.Fprintf(&s, "%s=%s", k, string(input[k]))
output[i] = s.String() output[i] = s.String()
} }
return output return output
} }
func SortStringInterfaceMap(input map[string]interface{}) []string { func SortStringInterfaceMap(input map[string]interface{}) []string {
keys := make([]string, 0, len(input)) keys := make([]string, 0, len(input))
output := make([]string, len(input)) output := make([]string, len(input))
for k := range input { for k := range input {
keys = append(keys, k) keys = append(keys, k)
} }
sort.Strings(keys) sort.Strings(keys)
for i, k := range keys { for i, k := range keys {
var s strings.Builder var s strings.Builder
fmt.Fprintf(&s, "%s=%v", k, input[k]) fmt.Fprintf(&s, "%s=%v", k, input[k])
output[i] = s.String() output[i] = s.String()
} }
return output return output
} }
func CreatePoint(metricname string, tags map[string]string, fields map[string]interface{}, timestamp int64) string { func CreatePoint(metricname string, tags map[string]string, fields map[string]interface{}, timestamp int64) string {
var s strings.Builder var s strings.Builder
taglist := SortStringStringMap(tags) taglist := SortStringStringMap(tags)
fieldlist := SortStringInterfaceMap(fields) fieldlist := SortStringInterfaceMap(fields)
if (len(taglist) > 0) { if len(taglist) > 0 {
fmt.Fprintf(&s, "%s,%s %s %d", metricname, strings.Join(taglist, ","), strings.Join(fieldlist, ","), timestamp) fmt.Fprintf(&s, "%s,%s %s %d", metricname, strings.Join(taglist, ","), strings.Join(fieldlist, ","), timestamp)
} else { } else {
fmt.Fprintf(&s, "%s %s %d", metricname, strings.Join(fieldlist, ","), timestamp) fmt.Fprintf(&s, "%s %s %d", metricname, strings.Join(fieldlist, ","), timestamp)
} }
return s.String() return s.String()
} }
func run_cmd(cmd string, cmd_opts string) string { func run_cmd(cmd string, cmd_opts string) string {
//ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) //ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
//defer cancel() //defer cancel()
//command := exec.CommandContext(ctx, cmd, strings.Join(cmd_opts, " ")) //command := exec.CommandContext(ctx, cmd, strings.Join(cmd_opts, " "))
command := exec.Command(cmd, cmd_opts) command := exec.Command(cmd, cmd_opts)
command.Wait() command.Wait()
//select { //select {
// case <-time.After(101 * time.Second): // case <-time.After(101 * time.Second):
// fmt.Println("overslept") // fmt.Println("overslept")
// case <-ctx.Done(): // case <-ctx.Done():
// fmt.Println(ctx.Err()) // prints "context deadline exceeded" // fmt.Println(ctx.Err()) // prints "context deadline exceeded"
//} //}
stdout, err := command.Output() stdout, err := command.Output()
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
return "" return ""
} }
return (string(stdout)) return (string(stdout))
} }
func GetSingleCollector(folders *[]string) filepath.WalkFunc { func GetSingleCollector(folders *[]string) filepath.WalkFunc {
return func(path string, info os.FileInfo, err error) error { return func(path string, info os.FileInfo, err error) error {
if err != nil { if err != nil {
panic(err) panic(err)
} }
if (info.IsDir()) { if info.IsDir() {
configfile := filepath.Join(path, "config.json") configfile := filepath.Join(path, "config.json")
if _, err := os.Stat(configfile); err == nil { if _, err := os.Stat(configfile); err == nil {
*folders = append(*folders, path) *folders = append(*folders, path)
} }
} }
return nil return nil
} }
} }
func GetCollectorFolders(root string, folders *[]string) error { func GetCollectorFolders(root string, folders *[]string) error {
err := filepath.Walk(root, GetSingleCollector(folders)) err := filepath.Walk(root, GetSingleCollector(folders))
if err != nil { if err != nil {
panic(err) panic(err)
} }
return err return err
} }
func main() { func main() {
// fmt.Println("Hello") // fmt.Println("Hello")
// cmd_opts := []string{"la","le","lu"} // cmd_opts := []string{"la","le","lu"}
// cmd := "echo" // cmd := "echo"
// s := run_cmd(cmd, cmd_opts) // s := run_cmd(cmd, cmd_opts)
// fmt.Println(s) // fmt.Println(s)
// tags := map[string]string { // tags := map[string]string {
// "host" : "broadep2", // "host" : "broadep2",
// } // }
// fields := map[string]interface{} { // fields := map[string]interface{} {
// "value" : float64(1.0), // "value" : float64(1.0),
// } // }
// fmt.Println(CreatePoint("flops_any", tags, fields, time.Now().UnixNano())) // fmt.Println(CreatePoint("flops_any", tags, fields, time.Now().UnixNano()))
var config GlobalConfig var config GlobalConfig
LoadGlobalConfiguration("config.json", &config) LoadGlobalConfiguration("config.json", &config)
// fmt.Println(config)
var folders []string var folders []string
GetCollectorFolders(config.CollectorPath, &folders) GetCollectorFolders(config.CollectorPath, &folders)
// fmt.Println(folders)
for _, path := range folders { for _, path := range folders {
var col_config CollectorConfig var col_config CollectorConfig
configfile := filepath.Join(path, "config.json") LoadCollectorConfiguration(filepath.Join(path, "config.json"), &col_config)
LoadCollectorConfiguration(configfile, &col_config) stdout := run_cmd(filepath.Join(path, col_config.Command), col_config.Args)
cmd := filepath.Join(path, col_config.Command)
stdout := run_cmd(cmd, col_config.Args) metrics := strings.Split(stdout, "\n")
metrics := strings.Split(stdout, "\n") for _, m := range metrics {
for _, m := range metrics { if len(m) > 0 {
if len(m) > 0 { t := strings.Fields(m)
t := strings.Fields(m) if len(t) == 2 {
if len(t) == 2 { var s strings.Builder
var s strings.Builder fmt.Fprintf(&s, "%s %d", m, time.Now().UnixNano())
fmt.Fprintf(&s, "%s %d", m, time.Now().UnixNano()) m = s.String()
m = s.String() }
} fmt.Println("SEND", m)
fmt.Println("SEND", m) }
} }
} }
}
} }

64
collectors/likwid.go Normal file
View File

@ -0,0 +1,64 @@
package collectors
import (
"bytes"
"fmt"
"time"
protocol "github.com/influxdata/line-protocol"
)
type LikwidCollector struct {
name string
tags []*protocol.Tag
fields []*protocol.Field
t time.Time
encoder *protocol.Encoder
}
func (c *LikwidCollector) Name() string {
return c.name
}
func (c *LikwidCollector) TagList() []*protocol.Tag {
return c.tags
}
func (c *LikwidCollector) FieldList() []*protocol.Field {
return c.fields
}
func (c *LikwidCollector) Time() time.Time {
return c.t
}
func (c *LikwidCollector) New() {
buf := &bytes.Buffer{}
c.encoder = protocol.NewEncoder(buf)
c.encoder.SetMaxLineBytes(1024)
}
func (c *LikwidCollector) Start(
level string,
frequency time.Duration,
duration int) {
ticker := time.NewTicker(frequency * time.Second)
done := make(chan bool)
go func() {
for {
select {
case <-done:
return
case t := <-ticker.C:
fmt.Println("Tick at", t)
c.encoder.Encode(c)
}
}
}()
time.Sleep(1600 * time.Second)
ticker.Stop()
done <- true
fmt.Println("Ticker stopped")
}

5
go.mod Normal file
View File

@ -0,0 +1,5 @@
module github.com/ClusterCockpit/cc-metric-collector
go 1.16
require github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097

2
go.sum Normal file
View File

@ -0,0 +1,2 @@
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 h1:vilfsDSy7TDxedi9gyBkMvAirat/oRcL0lFdJBf6tdM=
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=