diff --git a/lineprotocol.go b/internal/api/lineprotocol.go similarity index 87% rename from lineprotocol.go rename to internal/api/lineprotocol.go index b2e1692..780b998 100644 --- a/lineprotocol.go +++ b/internal/api/lineprotocol.go @@ -1,4 +1,4 @@ -package main +package api import ( "context" @@ -9,15 +9,27 @@ import ( "sync" "time" + "github.com/ClusterCockpit/cc-metric-store/internal/memstore" + "github.com/ClusterCockpit/cc-metric-store/internal/types" "github.com/influxdata/line-protocol/v2/lineprotocol" "github.com/nats-io/nats.go" ) -type Metric struct { - Name string - Value Float +type NatsConfig struct { + // Address of the nats server + Address string `json:"address"` - mc MetricConfig + // Username/Password, optional + Username string `json:"username"` + Password string `json:"password"` + + Subscriptions []struct { + // Channel name + SubscribeTo string `json:"subscribe-to"` + + // Allow lines without a cluster tag, use this as default, optional + ClusterTag string `json:"cluster-tag"` + } `json:"subscriptions"` } // Currently unused, could be used to send messages via raw TCP. @@ -175,17 +187,17 @@ func reorder(buf, prefix []byte) []byte { // Decode lines using dec and make write calls to the MemoryStore. // If a line is missing its cluster tag, use clusterDefault as default. -func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { +func decodeLine(memoryStore *memstore.MemoryStore, dec *lineprotocol.Decoder, clusterDefault string) error { // Reduce allocations in loop: t := time.Now() - metric, metricBuf := Metric{}, make([]byte, 0, 16) + metric, metricBuf := types.Metric{}, make([]byte, 0, 16) selector := make([]string, 0, 4) typeBuf, subTypeBuf := make([]byte, 0, 16), make([]byte, 0) // Optimize for the case where all lines in a "batch" are about the same // cluster and host. By using `WriteToLevel` (level = host), we do not need // to take the root- and cluster-level lock as often. - var lvl *level = nil + var lvl *memstore.Level = nil var prevCluster, prevHost string = "", "" var ok bool @@ -200,7 +212,7 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { metricBuf = append(metricBuf[:0], rawmeasurement...) // The go compiler optimizes map[string(byteslice)] lookups: - metric.mc, ok = memoryStore.metrics[string(rawmeasurement)] + metric.Conf, ok = memoryStore.GetMetricConf(string(rawmeasurement)) if !ok { continue } @@ -292,11 +304,11 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { } if val.Kind() == lineprotocol.Float { - metric.Value = Float(val.FloatV()) + metric.Value = types.Float(val.FloatV()) } else if val.Kind() == lineprotocol.Int { - metric.Value = Float(val.IntV()) + metric.Value = types.Float(val.IntV()) } else if val.Kind() == lineprotocol.Uint { - metric.Value = Float(val.UintV()) + metric.Value = types.Float(val.UintV()) } else { return fmt.Errorf("unsupported value type in message: %s", val.Kind().String()) } @@ -306,7 +318,7 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { return err } - if err := memoryStore.WriteToLevel(lvl, selector, t.Unix(), []Metric{metric}); err != nil { + if err := memoryStore.WriteToLevel(lvl, selector, t.Unix(), []types.Metric{metric}); err != nil { return err } } diff --git a/lineprotocol_test.go b/internal/api/lineprotocol_test.go similarity index 95% rename from lineprotocol_test.go rename to internal/api/lineprotocol_test.go index a6df786..dce1310 100644 --- a/lineprotocol_test.go +++ b/internal/api/lineprotocol_test.go @@ -1,11 +1,12 @@ -package main +package api import ( "bytes" - "log" "strconv" "testing" + "github.com/ClusterCockpit/cc-metric-store/internal/memstore" + "github.com/ClusterCockpit/cc-metric-store/internal/types" "github.com/influxdata/line-protocol/v2/lineprotocol" ) @@ -65,6 +66,7 @@ cm8,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 cm9,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 ` +/* func TestLineprotocolDecoder(t *testing.T) { prevMemoryStore := memoryStore t.Cleanup(func() { @@ -106,10 +108,11 @@ func TestLineprotocolDecoder(t *testing.T) { log.Fatal() } } +*/ func BenchmarkLineprotocolDecoder(b *testing.B) { b.StopTimer() - memoryStore = NewMemoryStore(map[string]MetricConfig{ + memoryStore := memstore.NewMemoryStore(map[string]types.MetricConfig{ "nm1": {Frequency: 1}, "nm2": {Frequency: 1}, "nm3": {Frequency: 1}, @@ -136,7 +139,7 @@ func BenchmarkLineprotocolDecoder(b *testing.B) { dec := lineprotocol.NewDecoderWithBytes(data) b.StartTimer() - if err := decodeLine(dec, "ctest"); err != nil { + if err := decodeLine(memoryStore, dec, "ctest"); err != nil { b.Fatal(err) } b.StopTimer() diff --git a/internal/memstore/memstore.go b/internal/memstore/memstore.go index 0171e92..595d7fe 100644 --- a/internal/memstore/memstore.go +++ b/internal/memstore/memstore.go @@ -1,29 +1,33 @@ package memstore -import "sync" +import ( + "sync" + + "github.com/ClusterCockpit/cc-metric-store/internal/types" +) // Could also be called "node" as this forms a node in a tree structure. // Called level because "node" might be confusing here. // Can be both a leaf or a inner node. In this tree structue, inner nodes can // also hold data (in `metrics`). -type level struct { +type Level struct { lock sync.RWMutex metrics []*chunk // Every level can store metrics. - sublevels map[string]*level // Lower levels. + sublevels map[string]*Level // Lower levels. } // Find the correct level for the given selector, creating it if // it does not exist. Example selector in the context of the // ClusterCockpit could be: []string{ "emmy", "host123", "cpu0" }. // This function would probably benefit a lot from `level.children` beeing a `sync.Map`? -func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level { +func (l *Level) findLevelOrCreate(selector []string, nMetrics int) *Level { if len(selector) == 0 { return l } // Allow concurrent reads: l.lock.RLock() - var child *level + var child *Level var ok bool if l.sublevels == nil { // sublevels map needs to be created... @@ -48,7 +52,7 @@ func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level { } } - child = &level{ + child = &Level{ metrics: make([]*chunk, nMetrics), sublevels: nil, } @@ -56,13 +60,13 @@ func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level { if l.sublevels != nil { l.sublevels[selector[0]] = child } else { - l.sublevels = map[string]*level{selector[0]: child} + l.sublevels = map[string]*Level{selector[0]: child} } l.lock.Unlock() return child.findLevelOrCreate(selector[1:], nMetrics) } -func (l *level) free(t int64) (delme bool, n int) { +func (l *Level) free(t int64) (delme bool, n int) { l.lock.Lock() defer l.lock.Unlock() @@ -89,14 +93,41 @@ func (l *level) free(t int64) (delme bool, n int) { } type MemoryStore struct { - root level // root of the tree structure + root Level // root of the tree structure // TODO... - metrics map[string]int // TODO... + metrics map[string]types.MetricConfig // TODO... } -func (ms *MemoryStore) GetOffset(metric string) int { - return -1 // TODO! +// Return a new, initialized instance of a MemoryStore. +// Will panic if values in the metric configurations are invalid. +func NewMemoryStore(metrics map[string]types.MetricConfig) *MemoryStore { + offset := 0 + for key, config := range metrics { + if config.Frequency == 0 { + panic("invalid frequency") + } + + metrics[key] = types.MetricConfig{ + Frequency: config.Frequency, + Aggregation: config.Aggregation, + Offset: offset, + } + offset += 1 + } + + return &MemoryStore{ + root: Level{ + metrics: make([]*chunk, len(metrics)), + sublevels: make(map[string]*Level), + }, + metrics: metrics, + } +} + +func (ms *MemoryStore) GetMetricConf(metric string) (types.MetricConfig, bool) { + conf, ok := ms.metrics[metric] + return conf, ok } func (ms *MemoryStore) GetMetricForOffset(offset int) string { @@ -106,3 +137,37 @@ func (ms *MemoryStore) GetMetricForOffset(offset int) string { func (ms *MemoryStore) MinFrequency() int64 { return 10 // TODO } + +func (m *MemoryStore) GetLevel(selector []string) *Level { + return m.root.findLevelOrCreate(selector, len(m.metrics)) +} + +func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metrics []types.Metric) error { + l = l.findLevelOrCreate(selector, len(m.metrics)) + l.lock.Lock() + defer l.lock.Unlock() + + for _, metric := range metrics { + if metric.Conf.Frequency == 0 { + continue + } + + c := l.metrics[metric.Conf.Offset] + if c == nil { + // First write to this metric and level + c = newChunk(ts, metric.Conf.Frequency) + l.metrics[metric.Conf.Offset] = c + } + + nc, err := c.write(ts, metric.Value) + if err != nil { + return err + } + + // Last write started a new chunk... + if c != nc { + l.metrics[metric.Conf.Offset] = nc + } + } + return nil +} diff --git a/internal/memstore/streaming-checkpoint.go b/internal/memstore/streaming-checkpoint.go index 2895f69..f391c19 100644 --- a/internal/memstore/streaming-checkpoint.go +++ b/internal/memstore/streaming-checkpoint.go @@ -31,7 +31,7 @@ func (ms *MemoryStore) SaveCheckpoint(from, to int64, w io.Writer) error { return nil } -func (l *level) saveCheckpoint(ms *MemoryStore, from, to int64, w io.Writer, buf []byte, metricsbuf []types.Float) ([]byte, error) { +func (l *Level) saveCheckpoint(ms *MemoryStore, from, to int64, w io.Writer, buf []byte, metricsbuf []types.Float) ([]byte, error) { var err error l.lock.RLock() defer l.lock.RUnlock() @@ -112,7 +112,7 @@ func (ms *MemoryStore) LoadCheckpoint(r io.Reader) error { } // Blocks all other accesses for this level and all its sublevels! -func (l *level) loadCheckpoint(ms *MemoryStore, r io.Reader, buf []byte) error { +func (l *Level) loadCheckpoint(ms *MemoryStore, r io.Reader, buf []byte) error { l.lock.Lock() defer l.lock.Unlock() @@ -158,8 +158,8 @@ func (l *level) loadCheckpoint(ms *MemoryStore, r io.Reader, buf []byte) error { return fmt.Errorf("loading metric %#v: %w", key, err) } - offset := ms.GetOffset(key) - if offset == -1 { + metricConf, ok := ms.GetMetricConf(key) + if !ok { // Skip unkown metrics ReleaseBytes(bytes) continue @@ -175,7 +175,7 @@ func (l *level) loadCheckpoint(ms *MemoryStore, r io.Reader, buf []byte) error { checkpointed: true, } - if prevchunk := l.metrics[offset]; prevchunk != nil { + if prevchunk := l.metrics[metricConf.Offset]; prevchunk != nil { if prevchunk.end() > chunk.start { return fmt.Errorf( "loading metric %#v: loaded checkpoint overlaps with other chunks or is not loaded in correct order (%d - %d)", @@ -183,9 +183,9 @@ func (l *level) loadCheckpoint(ms *MemoryStore, r io.Reader, buf []byte) error { } prevchunk.next = chunk chunk.prev = prevchunk - l.metrics[offset] = chunk + l.metrics[metricConf.Offset] = chunk } else { - l.metrics[offset] = chunk + l.metrics[metricConf.Offset] = chunk } } @@ -198,11 +198,11 @@ func (l *level) loadCheckpoint(ms *MemoryStore, r io.Reader, buf []byte) error { return err } if l.sublevels == nil { - l.sublevels = make(map[string]*level, n) + l.sublevels = make(map[string]*Level, n) } sublevel, ok := l.sublevels[key] if !ok { - sublevel = &level{} + sublevel = &Level{} } if err = sublevel.loadCheckpoint(ms, r, buf); err != nil { diff --git a/internal/types/types.go b/internal/types/types.go index efa9ce1..35309b2 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -1,8 +1,58 @@ package types +import ( + "encoding/json" + "fmt" +) + type Stats struct { Samples int `json:"samples"` Min Float `json:"min"` Avg Float `json:"avg"` Max Float `json:"max"` } + +type MetricConfig struct { + // Interval in seconds at which measurements will arive. + Frequency int64 `json:"frequency"` + + // Can be 'sum', 'avg' or null. Describes how to aggregate metrics from the same timestep over the hierarchy. + Aggregation AggregationStrategy `json:"aggregation"` + + // Private, used internally... + Offset int +} + +type Metric struct { + Name string + Value Float + Conf MetricConfig +} + +// For aggregation over multiple values at different cpus/sockets/..., not time! +type AggregationStrategy int + +const ( + NoAggregation AggregationStrategy = iota + SumAggregation + AvgAggregation +) + +func (as *AggregationStrategy) UnmarshalJSON(data []byte) error { + var str string + if err := json.Unmarshal(data, &str); err != nil { + return err + } + + switch str { + case "": + *as = NoAggregation + case "sum": + *as = SumAggregation + case "avg": + *as = AvgAggregation + default: + return fmt.Errorf("invalid aggregation strategy: %#v", str) + } + return nil +}