From aa0a3673f29e43f813efe6c27241920243f9a387 Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Sun, 25 May 2025 19:44:21 +0200 Subject: [PATCH] Fix to Avro features --- internal/api/lineprotocol.go | 17 +++---- internal/avro/avroCheckpoint.go | 2 +- internal/avro/avroHelper.go | 2 +- internal/avro/avroStruct.go | 19 +++++--- internal/memorystore/checkpoint.go | 19 ++++++-- internal/memorystore/memorystore.go | 74 ++++++++++++++++++++++++++++- 6 files changed, 111 insertions(+), 22 deletions(-) diff --git a/internal/api/lineprotocol.go b/internal/api/lineprotocol.go index a2c2066..cfedd3e 100644 --- a/internal/api/lineprotocol.go +++ b/internal/api/lineprotocol.go @@ -198,8 +198,6 @@ func decodeLine(dec *lineprotocol.Decoder, var lvl *memorystore.Level = nil prevCluster, prevHost := "", "" - var AvroBuf avro.AvroStruct - var ok bool for dec.Next() { rawmeasurement, err := dec.Measurement() @@ -333,14 +331,13 @@ func decodeLine(dec *lineprotocol.Decoder, } if config.Keys.Checkpoints.FileFormat != "json" { - AvroBuf.MetricName = string(metricBuf) - AvroBuf.Cluster = cluster - AvroBuf.Node = host - AvroBuf.Selector = selector - AvroBuf.Value = metric.Value - AvroBuf.Timestamp = t.Unix() - - avro.LineProtocolMessages <- AvroBuf + avro.LineProtocolMessages <- &avro.AvroStruct{ + MetricName: string(metricBuf), + Cluster: cluster, + Node: host, + Selector: append([]string{}, selector...), + Value: metric.Value, + Timestamp: t.Unix()} } if err := ms.WriteToLevel(lvl, selector, t.Unix(), []memorystore.Metric{metric}); err != nil { diff --git a/internal/avro/avroCheckpoint.go b/internal/avro/avroCheckpoint.go index 2e9c2a3..4bfbc76 100644 --- a/internal/avro/avroCheckpoint.go +++ b/internal/avro/avroCheckpoint.go @@ -137,7 +137,7 @@ func getTimestamp(dir string) int64 { } interval, _ := time.ParseDuration(config.Keys.Checkpoints.Interval) - updateTime := time.Now().Add(-interval).Unix() + updateTime := time.Now().Add(-interval - time.Duration(CheckpointBufferMinutes) + 1).Unix() if maxTs < updateTime { return 0 diff --git a/internal/avro/avroHelper.go b/internal/avro/avroHelper.go index bced065..1f7cbd3 100644 --- a/internal/avro/avroHelper.go +++ b/internal/avro/avroHelper.go @@ -58,7 +58,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { if avroLevel == nil { fmt.Printf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName) } - copy(oldSelector, selector) + oldSelector = append([]string{}, selector...) } avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq)) diff --git a/internal/avro/avroStruct.go b/internal/avro/avroStruct.go index 937a858..49a54f2 100644 --- a/internal/avro/avroStruct.go +++ b/internal/avro/avroStruct.go @@ -6,9 +6,8 @@ import ( "github.com/ClusterCockpit/cc-metric-store/internal/util" ) -var LineProtocolMessages = make(chan AvroStruct) +var LineProtocolMessages = make(chan *AvroStruct) var Delimiter = "ZZZZZ" -var AvroCounter = 0 // CheckpointBufferMinutes should always be in minutes. // Its controls the amount of data to hold for given amount of time. @@ -120,13 +119,13 @@ func (l *AvroLevel) addMetric(metricName string, value util.Float, timestamp int // Iterate over timestamps and choose the one which is within range. // Since its epoch time, we check if the difference is less than 60 seconds. - for ts := range l.data { - if _, ok := l.data[ts][metricName]; ok { + for ts, dat := range l.data { + if _, ok := dat[metricName]; ok { // If the metric is already present, we can skip it continue } - if (ts - timestamp) < int64(Freq) { - l.data[ts][metricName] = value + if Abs(ts-timestamp) < int64(Freq) { + dat[metricName] = value break } } @@ -135,3 +134,11 @@ func (l *AvroLevel) addMetric(metricName string, value util.Float, timestamp int func GetAvroStore() *AvroStore { return &avroStore } + +// Abs returns the absolute value of x. +func Abs(x int64) int64 { + if x < 0 { + return -x + } + return x +} diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go index 5f75392..e78c373 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/memorystore/checkpoint.go @@ -439,6 +439,10 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { br := bufio.NewReader(f) fileName := f.Name()[strings.LastIndex(f.Name(), "/")+1:] + from_timestamp, err := strconv.ParseInt(fileName[strings.Index(fileName, "_")+1:len(fileName)-5], 10, 64) + if err != nil { + return fmt.Errorf("error converting timestamp from the avro file : %s", err) + } resolution, err := strconv.ParseInt(fileName[0:strings.Index(fileName, "_")], 10, 64) if err != nil { @@ -475,7 +479,7 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { recordCounter += 1 } - to := (from + (recordCounter / (60 / resolution) * 60)) + to := (from_timestamp + (recordCounter / (60 / resolution) * 60)) if to < from { return nil } @@ -508,12 +512,12 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { } leafMetricName := subString[len(subString)-1] - err = lvl.createBuffer(m, leafMetricName, floatArray, from, resolution) + err = lvl.createBuffer(m, leafMetricName, floatArray, from_timestamp, resolution) if err != nil { return fmt.Errorf("error while creating buffers from avroReader : %s", err) } } else { - err = l.createBuffer(m, metricName, floatArray, from, resolution) + err = l.createBuffer(m, metricName, floatArray, from_timestamp, resolution) if err != nil { return fmt.Errorf("error while creating buffers from avroReader : %s", err) } @@ -551,6 +555,15 @@ func (l *Level) createBuffer(m *MemoryStore, metricName string, floatArray util. b.prev = prev prev.next = b + + missingCount := ((int(b.start) - int(prev.start)) - len(prev.data)*int(b.frequency)) + if missingCount > 0 { + missingCount /= int(b.frequency) + + for range missingCount { + prev.data = append(prev.data, util.NaN) + } + } } l.metrics[minfo.Offset] = b diff --git a/internal/memorystore/memorystore.go b/internal/memorystore/memorystore.go index 4932efb..959a582 100644 --- a/internal/memorystore/memorystore.go +++ b/internal/memorystore/memorystore.go @@ -81,11 +81,13 @@ func Shutdown() { var files int var err error + ms := GetMemoryStore() + if config.Keys.Checkpoints.FileFormat == "json" { - ms := GetMemoryStore() files, err = ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) } else { files, err = avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, true) + close(avro.LineProtocolMessages) } if err != nil { @@ -93,6 +95,76 @@ func Shutdown() { } log.Printf("Done! (%d files written)\n", files) + // ms.PrintHeirarchy() +} + +// func (m *MemoryStore) PrintHeirarchy() { +// m.root.lock.Lock() +// defer m.root.lock.Unlock() + +// fmt.Printf("Root : \n") + +// for lvl1, sel1 := range m.root.children { +// fmt.Printf("\t%s\n", lvl1) +// for lvl2, sel2 := range sel1.children { +// fmt.Printf("\t\t%s\n", lvl2) +// if lvl1 == "fritz" && lvl2 == "f0201" { + +// for name, met := range m.Metrics { +// mt := sel2.metrics[met.Offset] + +// fmt.Printf("\t\t\t\t%s\n", name) +// fmt.Printf("\t\t\t\t") + +// for mt != nil { +// // if name == "cpu_load" { +// fmt.Printf("%d(%d) -> %#v", mt.start, len(mt.data), mt.data) +// // } +// mt = mt.prev +// } +// fmt.Printf("\n") + +// } +// } +// for lvl3, sel3 := range sel2.children { +// if lvl1 == "fritz" && lvl2 == "f0201" && lvl3 == "hwthread70" { + +// fmt.Printf("\t\t\t\t\t%s\n", lvl3) + +// for name, met := range m.Metrics { +// mt := sel3.metrics[met.Offset] + +// fmt.Printf("\t\t\t\t\t\t%s\n", name) + +// fmt.Printf("\t\t\t\t\t\t") + +// for mt != nil { +// // if name == "clock" { +// fmt.Printf("%d(%d) -> %#v", mt.start, len(mt.data), mt.data) + +// mt = mt.prev +// } +// fmt.Printf("\n") + +// } + +// // for i, _ := range sel3.metrics { +// // fmt.Printf("\t\t\t\t\t%s\n", getName(configmetrics, i)) +// // } +// } +// } +// } +// } + +// } + +func getName(m *MemoryStore, i int) string { + for key, val := range m.Metrics { + if val.Offset == i { + return key + } + } + return "" } func Retention(wg *sync.WaitGroup, ctx context.Context) {