From 2287586700be1b207d8177ca82dbb86d4030f286 Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Tue, 28 Oct 2025 08:53:43 +0100 Subject: [PATCH] Revert avro files writing logic --- internal/memorystore/avroCheckpoint.go | 46 -------------------------- internal/memorystore/checkpoint.go | 26 --------------- 2 files changed, 72 deletions(-) diff --git a/internal/memorystore/avroCheckpoint.go b/internal/memorystore/avroCheckpoint.go index 563c2aa..3642186 100644 --- a/internal/memorystore/avroCheckpoint.go +++ b/internal/memorystore/avroCheckpoint.go @@ -28,52 +28,6 @@ var NumAvroWorkers int = 4 var ErrNoNewData error = errors.New("no data in the pool") -func UpdateAvroFile(f *os.File, insertCount int64) error { - filePath := f.Name() - f.Close() // close the original handle immediately - - // Reopen fresh for reading - readFile, err := os.Open(filePath) - if err != nil { - return fmt.Errorf("failed to reopen file for reading: %v", err) - } - defer readFile.Close() - - reader, err := goavro.NewOCFReader(readFile) - if err != nil { - return fmt.Errorf("failed to create OCF reader: %v", err) - } - - codec := reader.Codec() - - // Now reopen again for appending - appendFile, err := os.OpenFile(filePath, os.O_RDWR|os.O_APPEND, 0o644) - if err != nil { - return fmt.Errorf("failed to reopen file for appending: %v", err) - } - defer appendFile.Close() - - recordList := make([]map[string]any, insertCount) - for i := range recordList { - recordList[i] = make(map[string]any) - } - - writer, err := goavro.NewOCFWriter(goavro.OCFConfig{ - W: appendFile, - Codec: codec, - CompressionName: goavro.CompressionDeflateLabel, - }) - if err != nil { - return fmt.Errorf("failed to create OCF writer: %v", err) - } - - if err := writer.Append(recordList); err != nil { - return fmt.Errorf("failed to append record: %v", err) - } - - return nil -} - func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { levels := make([]*AvroLevel, 0) selectors := make([][]string, 0) diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go index 465bb0d..fe09b9e 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/memorystore/checkpoint.go @@ -492,32 +492,6 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { return nil } - interval, err := time.ParseDuration(Keys.Checkpoints.Interval) - if err != nil { - fmt.Printf("error while parsing interval: %#v", err) - } - - now := time.Now().Unix() - cutOff := time.Unix(fromTimestamp, 0).Add(interval).Unix() - - newCount := (min(now, cutOff) - fromTimestamp) / resolution - - if recordCounter < newCount { - // fmt.Printf("Record Count: %d, Required Count: %d\n", recordCounter, newCount) - - insertCount := newCount - recordCounter - for range insertCount { - for key := range metricsData { - metricsData[key] = append(metricsData[key], schema.ConvertToFloat(0.0)) - } - } - - err := UpdateAvroFile(f, insertCount) - if err != nil { - fmt.Printf("error while inserting blanks into avro: %s\n", err) - } - } - for key, floatArray := range metricsData { metricName := ReplaceKey(key)