From 0fe634ec45ff8e3696fb5fef2f498d65d6c6a18a Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Tue, 27 May 2025 10:45:09 +0200 Subject: [PATCH] Fix to bugs for data consistency --- endpoint-test-scripts/test_ccms_write_api.sh | 2 +- internal/api/lineprotocol.go | 6 +++-- internal/avro/avroCheckpoint.go | 23 +++++++++-------- internal/avro/avroStruct.go | 27 ++++++++++++++++---- internal/memorystore/checkpoint.go | 17 ++++++++---- 5 files changed, 52 insertions(+), 23 deletions(-) diff --git a/endpoint-test-scripts/test_ccms_write_api.sh b/endpoint-test-scripts/test_ccms_write_api.sh index 3bc8ffc..6019efe 100755 --- a/endpoint-test-scripts/test_ccms_write_api.sh +++ b/endpoint-test-scripts/test_ccms_write_api.sh @@ -7,7 +7,7 @@ rm sample_alex.txt while [ true ]; do echo "Alex Metrics for hwthread types and type-ids" - timestamp="$(date '+%s%N')" + timestamp="$(date '+%s')" echo "Timestamp : "+$timestamp for metric in cpu_load cpu_user flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do for hostname in a0603 a0903 a0832 a0329 a0702 a0122 a1624 a0731 a0224 a0704 a0631 a0225 a0222 a0427 a0603 a0429 a0833 a0705 a0901 a0601 a0227 a0804 a0322 a0226 a0126 a0129 a0605 a0801 a0934 a1622 a0902 a0428 a0537 a1623 a1722 a0228 a0701 a0326 a0327 a0123 a0321 a1621 a0323 a0124 a0534 a0931 a0324 a0933 a0424 a0905 a0128 a0532 a0805 a0521 a0535 a0932 a0127 a0325 a0633 a0831 a0803 a0426 a0425 a0229 a1721 a0602 a0632 a0223 a0422 a0423 a0536 a0328 a0703 anvme7 a0125 a0221 a0604 a0802 a0522 a0531 a0533 a0904; do diff --git a/internal/api/lineprotocol.go b/internal/api/lineprotocol.go index cfedd3e..0f15d67 100644 --- a/internal/api/lineprotocol.go +++ b/internal/api/lineprotocol.go @@ -330,6 +330,8 @@ func decodeLine(dec *lineprotocol.Decoder, return fmt.Errorf("host %s: timestamp : %#v with error : %#v", host, t, err.Error()) } + time := t.Unix() + if config.Keys.Checkpoints.FileFormat != "json" { avro.LineProtocolMessages <- &avro.AvroStruct{ MetricName: string(metricBuf), @@ -337,10 +339,10 @@ func decodeLine(dec *lineprotocol.Decoder, Node: host, Selector: append([]string{}, selector...), Value: metric.Value, - Timestamp: t.Unix()} + Timestamp: time} } - if err := ms.WriteToLevel(lvl, selector, t.Unix(), []memorystore.Metric{metric}); err != nil { + if err := ms.WriteToLevel(lvl, selector, time, []memorystore.Metric{metric}); err != nil { return err } } diff --git a/internal/avro/avroCheckpoint.go b/internal/avro/avroCheckpoint.go index 4bfbc76..eb23d93 100644 --- a/internal/avro/avroCheckpoint.go +++ b/internal/avro/avroCheckpoint.go @@ -137,9 +137,10 @@ func getTimestamp(dir string) int64 { } interval, _ := time.ParseDuration(config.Keys.Checkpoints.Interval) - updateTime := time.Now().Add(-interval - time.Duration(CheckpointBufferMinutes) + 1).Unix() + updateTime := time.Unix(maxTs, 0).Add(interval).Add(time.Duration(CheckpointBufferMinutes-1) * time.Minute).Unix() - if maxTs < updateTime { + if updateTime < time.Now().Unix() { + fmt.Printf("maxTs : %d, updateTime : %d, now : %d\n", maxTs, updateTime, time.Now().Unix()) return 0 } @@ -151,11 +152,13 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { defer l.lock.Unlock() // fmt.Printf("Checkpointing directory: %s\n", dir) + // filepath contains the resolution + int_res, _ := strconv.Atoi(path.Base(dir)) // find smallest overall timestamp in l.data map and delete it from l.data var minTs int64 = int64(1<<63 - 1) - for ts := range l.data { - if ts < minTs && len(l.data[ts]) != 0 { + for ts, dat := range l.data { + if ts < minTs && len(dat) != 0 { minTs = ts } } @@ -176,8 +179,10 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { filePath := dir + fmt.Sprintf("_%d.avro", from) - fp_, err := os.Stat(filePath) - if errors.Is(err, os.ErrNotExist) { + var err error + + fp_, err_ := os.Stat(filePath) + if errors.Is(err_, os.ErrNotExist) { err = os.MkdirAll(path.Dir(dir), 0o755) if err != nil { return fmt.Errorf("failed to create directory: %v", err) @@ -206,10 +211,8 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { time_ref = time.Now().Unix() } + // Empty values if len(l.data) == 0 { - // filepath contains the resolution - int_res, _ := strconv.Atoi(path.Base(dir)) - // we checkpoint avro files every 60 seconds repeat := 60 / int_res @@ -236,7 +239,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { if err != nil { return fmt.Errorf("failed to compare read and generated schema: %v", err) } - if flag && readFlag { + if flag && readFlag && !errors.Is(err_, os.ErrNotExist) { f.Close() diff --git a/internal/avro/avroStruct.go b/internal/avro/avroStruct.go index 49a54f2..85aa3ba 100644 --- a/internal/avro/avroStruct.go +++ b/internal/avro/avroStruct.go @@ -117,17 +117,34 @@ func (l *AvroLevel) addMetric(metricName string, value util.Float, timestamp int } } + closestTs := int64(0) + minDiff := int64(Freq) + 1 // Start with diff just outside the valid range + found := false + // Iterate over timestamps and choose the one which is within range. // Since its epoch time, we check if the difference is less than 60 seconds. for ts, dat := range l.data { - if _, ok := dat[metricName]; ok { - // If the metric is already present, we can skip it + // Check if timestamp is within range + diff := timestamp - ts + if diff < -int64(Freq) || diff > int64(Freq) { continue } - if Abs(ts-timestamp) < int64(Freq) { - dat[metricName] = value - break + + // Metric already present at this timestamp — skip + if _, ok := dat[metricName]; ok { + continue } + + // Check if this is the closest timestamp so far + if Abs(diff) < minDiff { + minDiff = Abs(diff) + closestTs = ts + found = true + } + } + + if found { + l.data[closestTs][metricName] = value } } diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go index e78c373..ff2cb5c 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/memorystore/checkpoint.go @@ -386,6 +386,9 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { // Config read (replace with your actual config read) fileFormat := config.Keys.Checkpoints.FileFormat + if fileFormat == "" { + fileFormat = "avro" + } // Map to easily get the fallback format oppositeFormat := map[string]string{ @@ -439,16 +442,20 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { br := bufio.NewReader(f) fileName := f.Name()[strings.LastIndex(f.Name(), "/")+1:] - from_timestamp, err := strconv.ParseInt(fileName[strings.Index(fileName, "_")+1:len(fileName)-5], 10, 64) - if err != nil { - return fmt.Errorf("error converting timestamp from the avro file : %s", err) - } - resolution, err := strconv.ParseInt(fileName[0:strings.Index(fileName, "_")], 10, 64) if err != nil { return fmt.Errorf("error while reading avro file (resolution parsing) : %s", err) } + from_timestamp, err := strconv.ParseInt(fileName[strings.Index(fileName, "_")+1:len(fileName)-5], 10, 64) + + // Same logic according to lineprotocol + from_timestamp -= (resolution / 2) + + if err != nil { + return fmt.Errorf("error converting timestamp from the avro file : %s", err) + } + // fmt.Printf("File : %s with resolution : %d\n", fileName, resolution) var recordCounter int64 = 0