diff --git a/config.json b/config.json index 65e7105..5298ab1 100644 --- a/config.json +++ b/config.json @@ -1,15 +1,8 @@ { - "root": "/home/jan/bla", - [ - { - "class": "N", - "frequency": 60, - "metrics": ["flops_any", "mem_bw"] - }, - { - "class": "C", - "frequency": 30, - "metrics": ["cpi", "flops_any"] + "metrics": { + "node": { + "frequency": 3, + "metrics": ["load_one", "load_five", "load_fifteen", "proc_total", "proc_run"] } - ] + } } diff --git a/memoryStore.go b/memoryStore.go index 3d9798f..9b1ff58 100644 --- a/memoryStore.go +++ b/memoryStore.go @@ -3,6 +3,8 @@ package main import ( "fmt" "math" + + "github.com/ClusterCockpit/cc-metric-store/lineprotocol" ) type storeBuffer struct { @@ -71,7 +73,7 @@ func newMemoryStore(o []string, n int, f int) *MemoryStore { func (m *MemoryStore) AddMetrics( key string, ts int64, - metrics []Metric) error { + metrics []lineprotocol.Metric) error { b, ok := m.containers[key] diff --git a/memoryStore_test.go b/memoryStore_test.go index 1602c8d..46c3941 100644 --- a/memoryStore_test.go +++ b/memoryStore_test.go @@ -5,9 +5,11 @@ import ( "log" "math" "testing" + + "github.com/ClusterCockpit/cc-metric-store/lineprotocol" ) -var testMetrics [][]Metric = [][]Metric{ +var testMetrics [][]lineprotocol.Metric = [][]lineprotocol.Metric{ {{"flops", 100.5}, {"mem_bw", 2088.67}}, {{"flops", 180.5}, {"mem_bw", 4078.32}, {"mem_capacity", 1020}}, {{"flops", 980.5}, {"mem_bw", 9078.32}, {"mem_capacity", 5010}}, @@ -19,7 +21,7 @@ var testMetrics [][]Metric = [][]Metric{ {{"flops", 970.5}, {"mem_bw", 9178.32}, {"mem_capacity", 2010}}, {{"flops", 970.5}, {"mem_bw", 9178.32}, {"mem_capacity", 2010}}} -var testMetricsAlt [][]Metric = [][]Metric{ +var testMetricsAlt [][]lineprotocol.Metric = [][]lineprotocol.Metric{ {{"flops", 120.5}, {"mem_bw", 2080.67}}, {{"flops", 130.5}, {"mem_bw", 4071.32}, {"mem_capacity", 1120}}, {{"flops", 940.5}, {"mem_bw", 9072.32}, {"mem_capacity", 5210}}, diff --git a/metric-store.go b/metric-store.go index a83d0e2..8ebff1c 100644 --- a/metric-store.go +++ b/metric-store.go @@ -1,46 +1,31 @@ package main import ( - "bytes" - "encoding/gob" "encoding/json" + "errors" "fmt" "log" "os" - "sync" + "os/signal" + "syscall" - nats "github.com/nats-io/nats.go" + "github.com/ClusterCockpit/cc-metric-store/lineprotocol" ) +type MetricStore interface { + AddMetrics(key string, ts int64, metrics []lineprotocol.Metric) error + GetMetric(key string, metric string, from int64, to int64) ([]float64, int64, error) +} + type Config struct { - MemoryStore struct { - Duration string `json:"duration"` - } `json:"memory_store"` - FileStore struct { - Duration string `json:"duration"` - } `json:"file_store"` - Root string `json:"root"` - Frequency int `json:"frequency"` - Metrics []string `json:"metrics"` + MetricClasses map[string]struct { + Frequency int `json:"frequency"` + Metrics []string `json:"metrics"` + } `json:"metrics"` } -type MetricData struct { - Name string - Values []float64 -} - -type Metric struct { - Name string - Value float64 -} - -type message struct { - Ts int64 - Tags []string - Fields []Metric -} - -var Conf Config +var conf Config +var metricStores map[string]MetricStore = map[string]MetricStore{} func loadConfiguration(file string) Config { var config Config @@ -54,36 +39,56 @@ func loadConfiguration(file string) Config { return config } +// TODO: Change MetricStore API so that we do not have to do string concat? +// Nested hashmaps could be an alternative. +func buildKey(line *lineprotocol.Line) (string, error) { + cluster, ok := line.Tags["cluster"] + if !ok { + return "", errors.New("missing cluster tag") + } + + host, ok := line.Tags["host"] + if !ok { + return "", errors.New("missing host tag") + } + + cpu, ok := line.Tags["cpu"] + if ok { + return cluster + ":" + host + ":" + cpu, nil + } + + return cluster + ":" + host, nil +} + +func handleLine(line *lineprotocol.Line) { + // log.Printf("line: %v\n", line) + + store := metricStores[line.Measurement] + key, err := buildKey(line) + if err != nil { + log.Println(err) + } + + err = store.AddMetrics(key, line.Ts.Unix(), line.Fields) +} + func main() { + conf = loadConfiguration("config.json") - Conf = loadConfiguration("config.json") + for class, info := range conf.MetricClasses { + metricStores[class] = newMemoryStore(info.Metrics, 1000, info.Frequency) + } - // Connect to a server - nc, err := nats.Connect(nats.DefaultURL) + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go func() { + _ = <-sigs + done <- true + }() + + err := lineprotocol.ReceiveNats("nats://localhost:4222", handleLine, done) if err != nil { log.Fatal(err) } - defer nc.Close() - - var msgBuffer bytes.Buffer - dec := gob.NewDecoder(&msgBuffer) - - // Use a WaitGroup to wait for a message to arrive - wg := sync.WaitGroup{} - wg.Add(1) - - // Subscribe - if _, err := nc.Subscribe("updates", func(m *nats.Msg) { - log.Println(m.Subject) - var p message - err = dec.Decode(&p) - if err != nil { - log.Fatal("decode error 1:", err) - } - }); err != nil { - log.Fatal(err) - } - - // Wait for a message to come in - wg.Wait() }