diff --git a/clusterdaemon.go b/clusterdaemon.go deleted file mode 100644 index afe82b8..0000000 --- a/clusterdaemon.go +++ /dev/null @@ -1,320 +0,0 @@ -package main - -import ( - "fmt" - "os" - "os/exec" - - //"bytes" - // "context" - "encoding/json" - "path/filepath" - - //"sort" - "errors" - "strings" - "time" - - protocol "github.com/influxdata/line-protocol" -) - -type GlobalConfig struct { - Sink struct { - User string `json:"user"` - Password string `json:"password"` - } `json:"sink"` - Host string `json:"host"` - Port string `json:"port"` - Report struct { - Levels string `json:"levels"` - Interval int `json:"interval"` - } `json:"report"` - Schedule struct { - Core struct { - Frequency int `json:"frequency"` - Duration int `json:"duration"` - } `json:"core"` - Node struct { - Frequency int `json:"frequency"` - Duration int `json:"duration"` - } `json:"node"` - } `json:"schedule"` - Metrics []string `json:"metrics"` - CollectorPath string `json:"collector_path"` -} - -type CollectorConfig struct { - Command string `json:"command"` - Args string `json:"arguments"` - Provides []string `json:"provides"` -} - -type InternalCollectorConfig struct { - Config CollectorConfig - Location string - LastRun time.Time - encoder *protocol.Encoder -} - -////////////////////////////////////////////////////////////////////////////// -// Load global configuration from JSON file -////////////////////////////////////////////////////////////////////////////// -func LoadGlobalConfiguration(file string, config *GlobalConfig) error { - configFile, err := os.Open(file) - defer configFile.Close() - if err != nil { - return err - } - jsonParser := json.NewDecoder(configFile) - jsonParser.Decode(config) - return err -} - -////////////////////////////////////////////////////////////////////////////// -// Load collector configuration from JSON file -////////////////////////////////////////////////////////////////////////////// -func LoadCollectorConfiguration(file string, config *CollectorConfig) error { - configFile, err := os.Open(file) - defer configFile.Close() - if err != nil { - return err - } - jsonParser := json.NewDecoder(configFile) - jsonParser.Decode(config) - return err -} - -////////////////////////////////////////////////////////////////////////////// -// Load collector configurations -////////////////////////////////////////////////////////////////////////////// -func GetSingleCollector(folders *[]string) filepath.WalkFunc { - return func(path string, info os.FileInfo, err error) error { - if info.IsDir() { - configfile := filepath.Join(path, "config.json") - if _, err := os.Stat(configfile); err == nil { - // TODO: Validate config? - p, err := filepath.Abs(path) - if err == nil { - *folders = append(*folders, p) - } - } - } - return nil - } -} - -func GetCollectorFolders(root string, folders *[]string) error { - err := filepath.Walk(root, GetSingleCollector(folders)) - if err != nil { - err = errors.New("Cannot get collectors") - } - return err -} - -////////////////////////////////////////////////////////////////////////////// -// Setup all collectors -////////////////////////////////////////////////////////////////////////////// -func SetupCollectors(config GlobalConfig) ([]InternalCollectorConfig, error) { - var folders []string - var outconfig []InternalCollectorConfig - //encoder := protocol.NewEncoder(buf) - //encoder.SetMaxLineBytes(1024) - GetCollectorFolders(config.CollectorPath, &folders) - for _, path := range folders { - var col_config InternalCollectorConfig - LoadCollectorConfiguration(filepath.Join(path, "config.json"), &col_config.Config) - col_config.LastRun = time.Now() - col_config.Location = path - //buf := &bytes.Buffer{} - //col_config.Encoder := protocol.NewEncoder(buf) - //col_config.Encoder.SetMaxLineBytes(1024) - outconfig = append(outconfig, col_config) - } - return outconfig, nil -} - -////////////////////////////////////////////////////////////////////////////// -// Run collector -////////////////////////////////////////////////////////////////////////////// -func RunCollector(config InternalCollectorConfig) ([]string, error) { - var results []string - var err error - cmd := config.Config.Command - - if _, err = os.Stat(cmd); err != nil { - //fmt.Println(err.Error()) - if !strings.HasPrefix(cmd, "/") { - cmd = filepath.Join(config.Location, config.Config.Command) - if _, err = os.Stat(cmd); err != nil { - //fmt.Println(err.Error()) - cmd, err = exec.LookPath(config.Config.Command) - } - } - } - if err != nil { - fmt.Println(err.Error()) - return results, err - } - - // TODO: Add timeout - - command := exec.Command(cmd, config.Config.Args) - command.Dir = config.Location - command.Wait() - stdout, err := command.Output() - if err != nil { - //log.error(err.Error()) - fmt.Println(err.Error()) - return results, err - } - - lines := strings.Split(string(stdout), "\n") - - for _, l := range lines { - if strings.HasPrefix(l, "#") { - continue - } - results = append(results, l) - } - return results, err -} - -////////////////////////////////////////////////////////////////////////////// -// Setup sink -////////////////////////////////////////////////////////////////////////////// -func SetupSink(config GlobalConfig) chan string { - - c := make(chan string, 300) - - // TODO: Setup something for sending? Establish HTTP connection? - return c -} - -func RunSink(config GlobalConfig, queue *chan string) (*time.Ticker, chan bool) { - - interval := time.Duration(config.Report.Interval) * time.Second - ticker := time.NewTicker(interval) - done := make(chan bool) - - go func() { - for { - select { - case <-done: - return - case t := <-ticker.C: - fmt.Println("SinkTick at", t) - empty := false - var batch []string - for empty == false { - select { - case metric := <-*queue: - fmt.Println(metric) - batch = append(batch, metric) - default: - // No metric available, wait for the next iteration - empty = true - break - } - } - for _, m := range batch { - fmt.Println(m) - } - } - } - }() - return ticker, done -} - -func CloseSink(config GlobalConfig, queue *chan string, ticker *time.Ticker, done chan bool) { - ticker.Stop() - done <- true - close(*queue) -} - -func MainLoop(config GlobalConfig, sink *chan string) (*time.Ticker, chan bool) { - var intConfig []InternalCollectorConfig - intConfig, err := SetupCollectors(config) - if err != nil { - panic(err) - } - - interval := time.Duration(config.Schedule.Node.Frequency) * time.Second - ticker := time.NewTicker(time.Second) - done := make(chan bool) - - go func() { - for { - select { - case <-done: - return - case t := <-ticker.C: - fmt.Println("CollectorTick at", t) - unix := time.Now() - for i, _ := range intConfig { - if time.Duration(unix.Sub(intConfig[i].LastRun)) > interval { - res, err := RunCollector(intConfig[i]) - if err != nil { - //log.error("Collector failed: ", err.Error()) - } else { - //TODO: parse and skip in case of error, encode to []string - for _, r := range res { - if len(r) > 0 { - *sink <- r - } - } - } - intConfig[i].LastRun = time.Now() - } - } - } - } - }() - return ticker, done -} - -func main() { - // fmt.Println("Hello") - // cmd_opts := []string{"la","le","lu"} - // cmd := "echo" - // s := run_cmd(cmd, cmd_opts) - // fmt.Println(s) - // tags := map[string]string { - // "host" : "broadep2", - // } - // fields := map[string]interface{} { - // "value" : float64(1.0), - // } - // fmt.Println(CreatePoint("flops_any", tags, fields, time.Now().UnixNano())) - var config GlobalConfig - LoadGlobalConfiguration("config.json", &config) - - queue := SetupSink(config) - sinkTicker, sinkDone := RunSink(config, &queue) - collectTicker, collectDone := MainLoop(config, &queue) - time.Sleep(1600 * time.Second) - collectTicker.Stop() - collectDone <- true - CloseSink(config, &queue, sinkTicker, sinkDone) - - // var folders []string - // GetCollectorFolders(config.CollectorPath, &folders) - - // for _, path := range folders { - // var col_config CollectorConfig - // LoadCollectorConfiguration(filepath.Join(path, "config.json"), &col_config) - // stdout := run_cmd(filepath.Join(path, col_config.Command), col_config.Args) - - // metrics := strings.Split(stdout, "\n") - // for _, m := range metrics { - // if len(m) > 0 { - // t := strings.Fields(m) - // if len(t) == 2 { - // var s strings.Builder - // fmt.Fprintf(&s, "%s %d", m, time.Now().UnixNano()) - // m = s.String() - // } - // fmt.Println("SEND", m) - // } - // } - // } -} diff --git a/clusterdaemon_simple.go b/clusterdaemon_simple.go deleted file mode 100644 index 66964e8..0000000 --- a/clusterdaemon_simple.go +++ /dev/null @@ -1,190 +0,0 @@ -package main - -import ( - "fmt" - "strings" - "io/ioutil" - "os" - "os/signal" - "strconv" - "time" -) - - -// geht nicht -//enum CollectScope { -// Node: 0, -// Socket, -// Die, -// LLC, -// NUMA, -// Core, -// HWThread -//} - -//var scopeNames = map[CollectScope]string{ -// Node: "Node", -// Socket: "Socket", -// Die: "Die", -// LLC: "LLC", -// NUMA: "NUMA", -// Core: "Core", -// HWThread: "HWThread" -//} - -type CollectValue struct { - Name string - Value interface{} - //scope CollectScope -} - -type InitFunc func() error -type ReadFunc func(time.Duration) ([]CollectValue, error) -type CloseFunc func() error -type SinkFunc func([]CollectValue) error - -func read_memavg(duration time.Duration) ([]CollectValue, error) { - var values []CollectValue - data, err := ioutil.ReadFile("/proc/meminfo") - if err != nil { - fmt.Println(err.Error()) - return values, err - } - var matches = map[string]string { - "MemTotal" : "mem_total", - "MemAvailable" : "mem_avail", - "MemFree" : "mem_free", - } - lines := strings.Split(string(data), "\n") - for _, l := range lines { - for i,o := range matches { - if strings.HasPrefix(l, i) { - f := strings.Fields(l) - v, err := strconv.ParseInt(f[1], 10, 0) - if err == nil { - var value CollectValue - // value.Scope = Node - value.Name = o - value.Value = v - values = append(values, value) - } - } - } - } - return values, nil -} - -func read_loadavg(duration time.Duration) ([]CollectValue, error) { - var values []CollectValue - data, err := ioutil.ReadFile("/proc/loadavg") - if err != nil { - fmt.Println(err.Error()) - return values, err - } - var matches = map[int]string { - 0 : "loadavg1m", - 1 : "loadavg5m", - 2 : "loadavg15m", - } - f := strings.Fields(string(data)) - for i, m := range matches { - v, err := strconv.ParseFloat(f[i], 64) - if err == nil { - var value CollectValue - value.Name = m - value.Value = v - // value.Scope = Node - values = append(values, value) - } - } - return values, nil -} - -func read_netstat(duration time.Duration) ([]CollectValue, error) { - var values []CollectValue - data, err := ioutil.ReadFile("/proc/net/dev") - if err != nil { - fmt.Println(err.Error()) - return values, err - } - var matches = map[int]string { - 1 : "bytes_in", - 9 : "bytes_out", - 2 : "pkts_in", - 10 : "pkts_out", - } - lines := strings.Split(string(data), "\n") - for _, l := range lines { - if ! strings.Contains(l, ":") { - continue - } - f := strings.Fields(l) - dev := f[0][0:len(f[0])-1] - if dev == "lo" { - continue - } - for i, m := range matches { - v, err := strconv.ParseInt(f[i], 10, 0) - if err == nil { - var value CollectValue - value.Name = fmt.Sprintf("%s_%s", dev, m) - value.Value = v - //value.Scope = Node - values = append(values, value) - } - } - } - return values, nil -} - -func Send(values []CollectValue) error { - for _, v := range values { - fmt.Printf("Name: '%s' Value: '%v'\n", v.Name, v.Value) - } - return nil -} - -func ReadAll(duration time.Duration, reads []ReadFunc, sink SinkFunc) { - for _, f := range reads { - values, err := f(duration) - if err == nil { - sink(values) - } - } -} - -func ReadLoop(interval time.Duration, duration time.Duration, reads []ReadFunc, sink SinkFunc) { - ticker := time.NewTicker(interval) - done := make(chan bool) - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, os.Interrupt) - ReadAll(duration, reads, sink) - go func() { - <-sigs - // Should call all CloseFunc functions here - os.Exit(1) - }() - func() { - select { - case <-done: - return - case t := <-ticker.C: - fmt.Println("Tick at", t) - ReadAll(duration, reads, sink) - } - }() - ticker.Stop() - done <- true -} - -func main() { - //var inits []InitFunc - var reads = []ReadFunc {read_memavg, read_loadavg, read_netstat} - //var closes []CloseFunc - var duration time.Duration - var interval time.Duration - duration = time.Duration(1) * time.Second - interval = time.Duration(10) * time.Second - ReadLoop(interval, duration, reads, Send) - return -} diff --git a/collectors/likwid.go b/collectors/likwid.go deleted file mode 100644 index 829b5c3..0000000 --- a/collectors/likwid.go +++ /dev/null @@ -1,64 +0,0 @@ -package collectors - -import ( - "bytes" - "fmt" - "time" - - protocol "github.com/influxdata/line-protocol" -) - -type LikwidCollector struct { - name string - tags []*protocol.Tag - fields []*protocol.Field - t time.Time - encoder *protocol.Encoder -} - -func (c *LikwidCollector) Name() string { - return c.name -} -func (c *LikwidCollector) TagList() []*protocol.Tag { - return c.tags -} - -func (c *LikwidCollector) FieldList() []*protocol.Field { - return c.fields -} - -func (c *LikwidCollector) Time() time.Time { - return c.t -} - -func (c *LikwidCollector) New() { - buf := &bytes.Buffer{} - c.encoder = protocol.NewEncoder(buf) - c.encoder.SetMaxLineBytes(1024) -} - -func (c *LikwidCollector) Start( - level string, - frequency time.Duration, - duration int) { - ticker := time.NewTicker(frequency * time.Second) - done := make(chan bool) - - go func() { - for { - select { - case <-done: - return - case t := <-ticker.C: - fmt.Println("Tick at", t) - - c.encoder.Encode(c) - } - } - }() - - time.Sleep(1600 * time.Second) - ticker.Stop() - done <- true - fmt.Println("Ticker stopped") -} diff --git a/collectors/memavg/config.json b/collectors/memavg/config.json deleted file mode 100644 index a1b8ffb..0000000 --- a/collectors/memavg/config.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "command": "read_memavg.sh" -} diff --git a/collectors/memavg/read_memavg.go b/collectors/memavg/read_memavg.go deleted file mode 100644 index 9590878..0000000 --- a/collectors/memavg/read_memavg.go +++ /dev/null @@ -1,48 +0,0 @@ -package main -import ( - "strings" - "io/ioutil" - "fmt" - "time" - "os" - "strconv" - ) - -func main() { - t := time.Now() - hostname, err := os.Hostname() - if err != nil { - fmt.Println("#", err) - os.Exit(1) - } - hostname = strings.Split(hostname, ".")[0] - data, err := ioutil.ReadFile("/proc/meminfo") - if err != nil { - fmt.Println("#", err) - os.Exit(1) - return - } - lines := strings.Split(string(data), "\n") - for _, l := range lines { - if strings.HasPrefix(l, "MemTotal") { - f := strings.Fields(l) - v, err := strconv.ParseInt(f[1], 10, 0) - if err == nil { - fmt.Printf("mem_total,hostname=%s value=%v %v\n", hostname, v*1024, t.UnixNano()) - } - } else if strings.HasPrefix(l, "MemAvailable") { - f := strings.Fields(l) - v, err := strconv.ParseInt(f[1], 10, 0) - if err == nil { - fmt.Printf("mem_avail,hostname=%s value=%v %v\n", hostname, v*1024, t.UnixNano()) - } - } else if strings.HasPrefix(l, "MemFree") { - f := strings.Fields(l) - v, err := strconv.ParseInt(f[1], 10, 0) - if err == nil { - fmt.Printf("mem_free,hostname=%s value=%v %v\n", hostname, v*1024, t.UnixNano()) - } - } - } - return -} diff --git a/collectors/memavg/read_memavg.sh b/collectors/memavg/read_memavg.sh deleted file mode 100755 index d2f3db1..0000000 --- a/collectors/memavg/read_memavg.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash - - -TOTAL=$(grep "MemTotal" /proc/meminfo | awk '{print $2}') -AVAIL=$(grep "MemAvailable" /proc/meminfo | awk '{print $2}') -FREE=$(grep "MemFree" /proc/meminfo | awk '{print $2}') -HOST=$(hostname -s) - - -echo "mem_total,host=$HOST value=$TOTAL" -echo "mem_avail,host=$HOST value=$AVAIL" -echo "mem_free,host=$HOST value=$FREE"