diff --git a/configs/config-demo.json b/configs/config-demo.json index 197aee0..7daff78 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -78,7 +78,7 @@ ], "metric-store": { "checkpoints": { - "file-format": "json", + "file-format": "avro", "interval": "1h", "directory": "./var/checkpoints", "restore": "48h" diff --git a/internal/memorystore/avroCheckpoint.go b/internal/memorystore/avroCheckpoint.go index 3642186..563c2aa 100644 --- a/internal/memorystore/avroCheckpoint.go +++ b/internal/memorystore/avroCheckpoint.go @@ -28,6 +28,52 @@ var NumAvroWorkers int = 4 var ErrNoNewData error = errors.New("no data in the pool") +func UpdateAvroFile(f *os.File, insertCount int64) error { + filePath := f.Name() + f.Close() // close the original handle immediately + + // Reopen fresh for reading + readFile, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to reopen file for reading: %v", err) + } + defer readFile.Close() + + reader, err := goavro.NewOCFReader(readFile) + if err != nil { + return fmt.Errorf("failed to create OCF reader: %v", err) + } + + codec := reader.Codec() + + // Now reopen again for appending + appendFile, err := os.OpenFile(filePath, os.O_RDWR|os.O_APPEND, 0o644) + if err != nil { + return fmt.Errorf("failed to reopen file for appending: %v", err) + } + defer appendFile.Close() + + recordList := make([]map[string]any, insertCount) + for i := range recordList { + recordList[i] = make(map[string]any) + } + + writer, err := goavro.NewOCFWriter(goavro.OCFConfig{ + W: appendFile, + Codec: codec, + CompressionName: goavro.CompressionDeflateLabel, + }) + if err != nil { + return fmt.Errorf("failed to create OCF writer: %v", err) + } + + if err := writer.Append(recordList); err != nil { + return fmt.Errorf("failed to append record: %v", err) + } + + return nil +} + func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { levels := make([]*AvroLevel, 0) selectors := make([][]string, 0) diff --git a/internal/memorystore/buffer.go b/internal/memorystore/buffer.go index 413b04e..a942d0a 100644 --- a/internal/memorystore/buffer.go +++ b/internal/memorystore/buffer.go @@ -7,7 +7,6 @@ package memorystore import ( "errors" - "fmt" "sync" "github.com/ClusterCockpit/cc-lib/schema" @@ -188,8 +187,6 @@ func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int6 i++ } - fmt.Printf("Given From : %d, To: %d\n", from, to) - return data[:i], from, t, nil } diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go index fe09b9e..465bb0d 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/memorystore/checkpoint.go @@ -492,6 +492,32 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { return nil } + interval, err := time.ParseDuration(Keys.Checkpoints.Interval) + if err != nil { + fmt.Printf("error while parsing interval: %#v", err) + } + + now := time.Now().Unix() + cutOff := time.Unix(fromTimestamp, 0).Add(interval).Unix() + + newCount := (min(now, cutOff) - fromTimestamp) / resolution + + if recordCounter < newCount { + // fmt.Printf("Record Count: %d, Required Count: %d\n", recordCounter, newCount) + + insertCount := newCount - recordCounter + for range insertCount { + for key := range metricsData { + metricsData[key] = append(metricsData[key], schema.ConvertToFloat(0.0)) + } + } + + err := UpdateAvroFile(f, insertCount) + if err != nil { + fmt.Printf("error while inserting blanks into avro: %s\n", err) + } + } + for key, floatArray := range metricsData { metricName := ReplaceKey(key) diff --git a/internal/memorystore/memorystore.go b/internal/memorystore/memorystore.go index 552390e..d76b83b 100644 --- a/internal/memorystore/memorystore.go +++ b/internal/memorystore/memorystore.go @@ -10,7 +10,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "os" "os/signal" "runtime" @@ -387,8 +386,6 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso n, data := 0, make([]schema.Float, (to-from)/minfo.Frequency+1) - fmt.Printf("Requested From : %d, To: %d\n", from, to) - err := m.root.findBuffers(selector, minfo.offset, func(b *buffer) error { cdata, cfrom, cto, err := b.read(from, to, data) if err != nil {