diff --git a/clusterdaemon.go b/clusterdaemon.go new file mode 100644 index 0000000..a4f52fb --- /dev/null +++ b/clusterdaemon.go @@ -0,0 +1,199 @@ +package main + +import ( + "fmt" + "os" + "os/exec" +// "context" + "strings" + "time" + "sort" + "path/filepath" + "encoding/json" +) + + +type GlobalConfig struct { + Sink struct { + User string `json:"user"` + Password string `json:"password"` + } `json:"sink"` + Host string `json:"host"` + Port string `json:"port"` + Report struct { + Levels string `json:"levels"` + Interval int `json:"interval"` + } `json:"report"` + Schedule struct { + Core struct { + Frequency int `json:"frequency"` + Duration int `json:"duration"` + } `json:"core"` + Node struct { + Frequency int `json:"frequency"` + Duration int `json:"duration"` + } `json:"node"` + } `json:"schedule"` + Metrics string `json:"metrics"` + CollectorPath string `json:"collector_path"` +} + +type CollectorConfig struct { + Command string `json:"command"` + Args string `json:"arguments"` +} + +func LoadGlobalConfiguration(file string, config* GlobalConfig) error { + configFile, err := os.Open(file) + defer configFile.Close() + if err != nil { + fmt.Println(err.Error()) + } + jsonParser := json.NewDecoder(configFile) + jsonParser.Decode(config) + return err +} + +func LoadCollectorConfiguration(file string, config* CollectorConfig) error { + configFile, err := os.Open(file) + defer configFile.Close() + if err != nil { + fmt.Println(err.Error()) + } + jsonParser := json.NewDecoder(configFile) + jsonParser.Decode(config) + return err +} + +func SortStringStringMap(input map[string]string) []string { + keys := make([]string, 0, len(input)) + output := make([]string, len(input)) + for k := range input { + keys = append(keys, k) + } + sort.Strings(keys) + + for i, k := range keys { + var s strings.Builder + fmt.Fprintf(&s, "%s=%s", k, string(input[k])) + output[i] = s.String() + } + return output +} + +func SortStringInterfaceMap(input map[string]interface{}) []string { + keys := make([]string, 0, len(input)) + output := make([]string, len(input)) + for k := range input { + keys = append(keys, k) + } + sort.Strings(keys) + + for i, k := range keys { + var s strings.Builder + fmt.Fprintf(&s, "%s=%v", k, input[k]) + output[i] = s.String() + } + return output +} + +func CreatePoint(metricname string, tags map[string]string, fields map[string]interface{}, timestamp int64) string { + var s strings.Builder + taglist := SortStringStringMap(tags) + fieldlist := SortStringInterfaceMap(fields) + + if (len(taglist) > 0) { + fmt.Fprintf(&s, "%s,%s %s %d", metricname, strings.Join(taglist, ","), strings.Join(fieldlist, ","), timestamp) + } else { + fmt.Fprintf(&s, "%s %s %d", metricname, strings.Join(fieldlist, ","), timestamp) + } + return s.String() +} + +func run_cmd(cmd string, cmd_opts string) string { + //ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) + //defer cancel() + //command := exec.CommandContext(ctx, cmd, strings.Join(cmd_opts, " ")) + command := exec.Command(cmd, cmd_opts) + command.Wait() + //select { + // case <-time.After(101 * time.Second): + // fmt.Println("overslept") + // case <-ctx.Done(): + // fmt.Println(ctx.Err()) // prints "context deadline exceeded" + //} + + stdout, err := command.Output() + + if err != nil { + fmt.Println(err.Error()) + return "" + } + return (string(stdout)) +} + +func GetSingleCollector(folders *[]string) filepath.WalkFunc { + return func(path string, info os.FileInfo, err error) error { + if err != nil { + panic(err) + } + if (info.IsDir()) { + configfile := filepath.Join(path, "config.json") + if _, err := os.Stat(configfile); err == nil { + *folders = append(*folders, path) + } + } + return nil + } +} + +func GetCollectorFolders(root string, folders *[]string) error { + + err := filepath.Walk(root, GetSingleCollector(folders)) + if err != nil { + panic(err) + } + return err +} + +func main() { +// fmt.Println("Hello") +// cmd_opts := []string{"la","le","lu"} +// cmd := "echo" +// s := run_cmd(cmd, cmd_opts) +// fmt.Println(s) +// tags := map[string]string { +// "host" : "broadep2", +// } +// fields := map[string]interface{} { +// "value" : float64(1.0), +// } +// fmt.Println(CreatePoint("flops_any", tags, fields, time.Now().UnixNano())) + var config GlobalConfig + LoadGlobalConfiguration("config.json", &config) +// fmt.Println(config) + var folders []string + GetCollectorFolders(config.CollectorPath, &folders) +// fmt.Println(folders) + for _, path := range folders { + var col_config CollectorConfig + configfile := filepath.Join(path, "config.json") + LoadCollectorConfiguration(configfile, &col_config) + cmd := filepath.Join(path, col_config.Command) + stdout := run_cmd(cmd, col_config.Args) + metrics := strings.Split(stdout, "\n") + for _, m := range metrics { + if len(m) > 0 { + t := strings.Fields(m) + if len(t) == 2 { + var s strings.Builder + fmt.Fprintf(&s, "%s %d", m, time.Now().UnixNano()) + m = s.String() + } + fmt.Println("SEND", m) + } + } + + } + +} diff --git a/collectors/likwid/README.md b/collectors/likwid/README.md new file mode 100644 index 0000000..1973a76 --- /dev/null +++ b/collectors/likwid/README.md @@ -0,0 +1,10 @@ +# LIKWID collector + +Measure hardware performance counters with LIKWID + +# Build +- Download LIKWID +- Configure LIKWID with SHARED=false +- Build +- Copy static libraries in current folder +- Copy likwid.h, likwid-marker.h and bstrlib.h in current folder diff --git a/collectors/likwid/likwid.go b/collectors/likwid/likwid.go new file mode 100644 index 0000000..e0e3b11 --- /dev/null +++ b/collectors/likwid/likwid.go @@ -0,0 +1,31 @@ +package main +/* +#cgo CFLAGS: -I. +#cgo LDFLAGS: -L. -llikwid -llikwid-hwloc -lm +#include +#include +*/ +import "C" +import "fmt" +import "unsafe" + +func main() { + var topo C.CpuTopology_t + C.topology_init(); + topo = C.get_cpuTopology() + cpulist := make([]C.int, topo.numHWThreads) + for a := 0; a < int(topo.numHWThreads); a++ { + cpulist[C.int(a)] = C.int(a) + } + C.perfmon_init(C.int(topo.numHWThreads), &cpulist[0]) + gstring := C.CString("INSTR_RETIRED_ANY:FIXC0") + gid := C.perfmon_addEventSet(gstring) + C.perfmon_setupCounters(gid) + C.perfmon_startCounters() + C.perfmon_stopCounters() + v := C.perfmon_getResult(gid, 0, 0) + fmt.Println(v) + C.free(unsafe.Pointer(gstring)) + C.perfmon_finalize() + C.topology_finalize(); +} diff --git a/collectors/memavg/config.json b/collectors/memavg/config.json new file mode 100644 index 0000000..a1b8ffb --- /dev/null +++ b/collectors/memavg/config.json @@ -0,0 +1,3 @@ +{ + "command": "read_memavg.sh" +} diff --git a/collectors/memavg/read_memavg.sh b/collectors/memavg/read_memavg.sh new file mode 100755 index 0000000..12732a7 --- /dev/null +++ b/collectors/memavg/read_memavg.sh @@ -0,0 +1,12 @@ +#!/bin/bash + + +TOTAL=$(grep "MemTotal" /proc/meminfo | awk '{print $2}') +AVAIL=$(grep "MemAvailable" /proc/meminfo | awk '{print $2}') +FREE=$(grep "MemFree" /proc/meminfo | awk '{print $2}') +HOST=$(hostname -s) + + +echo "mem_total,host=$HOST $TOTAL" +echo "mem_avail,host=$HOST $AVAIL" +echo "mem_free,host=$HOST $FREE" diff --git a/config.json b/config.json new file mode 100644 index 0000000..1c836ac --- /dev/null +++ b/config.json @@ -0,0 +1,33 @@ +{ + "sink": { + "user": "admin", + "password": "12345" + }, + "host": "localhost", + "port": "8080", + "report": { + "levels": ["core","node"], + "interval": 120 + }, + "schedule": { + "core": { + "frequency": 30, + "duration": 10 + }, + "node":{ + "frequency": 60, + "duration": 20 + } + }, + "metrics": [ + "ipc", + "flops_any", + "clock", + "load", + "mem_bw", + "mem_used", + "net_bw", + "file_bw" + ], + "collector_path": "./collectors" +}