Use nats in main

This commit is contained in:
Lou Knauer 2021-08-20 11:45:34 +02:00
parent 13c206b11e
commit fb433cc497
4 changed files with 73 additions and 71 deletions

View File

@ -1,15 +1,8 @@
{
"root": "/home/jan/bla",
[
{
"class": "N",
"frequency": 60,
"metrics": ["flops_any", "mem_bw"]
},
{
"class": "C",
"frequency": 30,
"metrics": ["cpi", "flops_any"]
"metrics": {
"node": {
"frequency": 3,
"metrics": ["load_one", "load_five", "load_fifteen", "proc_total", "proc_run"]
}
]
}
}

View File

@ -3,6 +3,8 @@ package main
import (
"fmt"
"math"
"github.com/ClusterCockpit/cc-metric-store/lineprotocol"
)
type storeBuffer struct {
@ -71,7 +73,7 @@ func newMemoryStore(o []string, n int, f int) *MemoryStore {
func (m *MemoryStore) AddMetrics(
key string,
ts int64,
metrics []Metric) error {
metrics []lineprotocol.Metric) error {
b, ok := m.containers[key]

View File

@ -5,9 +5,11 @@ import (
"log"
"math"
"testing"
"github.com/ClusterCockpit/cc-metric-store/lineprotocol"
)
var testMetrics [][]Metric = [][]Metric{
var testMetrics [][]lineprotocol.Metric = [][]lineprotocol.Metric{
{{"flops", 100.5}, {"mem_bw", 2088.67}},
{{"flops", 180.5}, {"mem_bw", 4078.32}, {"mem_capacity", 1020}},
{{"flops", 980.5}, {"mem_bw", 9078.32}, {"mem_capacity", 5010}},
@ -19,7 +21,7 @@ var testMetrics [][]Metric = [][]Metric{
{{"flops", 970.5}, {"mem_bw", 9178.32}, {"mem_capacity", 2010}},
{{"flops", 970.5}, {"mem_bw", 9178.32}, {"mem_capacity", 2010}}}
var testMetricsAlt [][]Metric = [][]Metric{
var testMetricsAlt [][]lineprotocol.Metric = [][]lineprotocol.Metric{
{{"flops", 120.5}, {"mem_bw", 2080.67}},
{{"flops", 130.5}, {"mem_bw", 4071.32}, {"mem_capacity", 1120}},
{{"flops", 940.5}, {"mem_bw", 9072.32}, {"mem_capacity", 5210}},

View File

@ -1,46 +1,31 @@
package main
import (
"bytes"
"encoding/gob"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"sync"
"os/signal"
"syscall"
nats "github.com/nats-io/nats.go"
"github.com/ClusterCockpit/cc-metric-store/lineprotocol"
)
type MetricStore interface {
AddMetrics(key string, ts int64, metrics []lineprotocol.Metric) error
GetMetric(key string, metric string, from int64, to int64) ([]float64, int64, error)
}
type Config struct {
MemoryStore struct {
Duration string `json:"duration"`
} `json:"memory_store"`
FileStore struct {
Duration string `json:"duration"`
} `json:"file_store"`
Root string `json:"root"`
Frequency int `json:"frequency"`
Metrics []string `json:"metrics"`
MetricClasses map[string]struct {
Frequency int `json:"frequency"`
Metrics []string `json:"metrics"`
} `json:"metrics"`
}
type MetricData struct {
Name string
Values []float64
}
type Metric struct {
Name string
Value float64
}
type message struct {
Ts int64
Tags []string
Fields []Metric
}
var Conf Config
var conf Config
var metricStores map[string]MetricStore = map[string]MetricStore{}
func loadConfiguration(file string) Config {
var config Config
@ -54,36 +39,56 @@ func loadConfiguration(file string) Config {
return config
}
// TODO: Change MetricStore API so that we do not have to do string concat?
// Nested hashmaps could be an alternative.
func buildKey(line *lineprotocol.Line) (string, error) {
cluster, ok := line.Tags["cluster"]
if !ok {
return "", errors.New("missing cluster tag")
}
host, ok := line.Tags["host"]
if !ok {
return "", errors.New("missing host tag")
}
cpu, ok := line.Tags["cpu"]
if ok {
return cluster + ":" + host + ":" + cpu, nil
}
return cluster + ":" + host, nil
}
func handleLine(line *lineprotocol.Line) {
// log.Printf("line: %v\n", line)
store := metricStores[line.Measurement]
key, err := buildKey(line)
if err != nil {
log.Println(err)
}
err = store.AddMetrics(key, line.Ts.Unix(), line.Fields)
}
func main() {
conf = loadConfiguration("config.json")
Conf = loadConfiguration("config.json")
for class, info := range conf.MetricClasses {
metricStores[class] = newMemoryStore(info.Metrics, 1000, info.Frequency)
}
// Connect to a server
nc, err := nats.Connect(nats.DefaultURL)
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
_ = <-sigs
done <- true
}()
err := lineprotocol.ReceiveNats("nats://localhost:4222", handleLine, done)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
var msgBuffer bytes.Buffer
dec := gob.NewDecoder(&msgBuffer)
// Use a WaitGroup to wait for a message to arrive
wg := sync.WaitGroup{}
wg.Add(1)
// Subscribe
if _, err := nc.Subscribe("updates", func(m *nats.Msg) {
log.Println(m.Subject)
var p message
err = dec.Decode(&p)
if err != nil {
log.Fatal("decode error 1:", err)
}
}); err != nil {
log.Fatal(err)
}
// Wait for a message to come in
wg.Wait()
}