fix to Avro writer special cases

This commit is contained in:
Aditya Ujeniya
2025-05-14 17:29:28 +02:00
parent a03eb315f5
commit 5569ad53d2
6 changed files with 149 additions and 110 deletions

View File

@@ -9,10 +9,12 @@ import (
"path"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/ClusterCockpit/cc-metric-store/internal/config"
"github.com/ClusterCockpit/cc-metric-store/internal/util"
"github.com/linkedin/goavro/v2"
)
@@ -32,7 +34,7 @@ func (as *AvroStore) ToCheckpoint(dir string) (int, error) {
for sel2, l2 := range l1.children {
l2.lock.RLock()
// Frequency
for sel3, l3 := range l1.children {
for sel3, l3 := range l2.children {
levels = append(levels, l3)
selectors = append(selectors, []string{sel1, sel2, sel3})
}
@@ -98,6 +100,8 @@ func getTimestamp(dir string) int64 {
// The existing avro file will be in epoch timestamp format
// iterate over all the files in the directory and find the maximum timestamp
// and return it
dir = path.Dir(dir)
files, err := os.ReadDir(dir)
if err != nil {
return 0
@@ -116,15 +120,24 @@ func getTimestamp(dir string) int64 {
if len(name) < 5 {
continue
}
ts, err := strconv.ParseInt(name[:len(name)-5], 10, 64)
ts, err := strconv.ParseInt(name[strings.Index(name, "_")+1:len(name)-5], 10, 64)
if err != nil {
fmt.Printf("error while parsing timestamp: %s\n", err.Error())
continue
}
if ts > maxTs {
maxTs = ts
}
}
interval, _ := time.ParseDuration(config.Keys.Checkpoints.Interval)
updateTime := time.Now().Add(-interval).Unix()
if maxTs > updateTime {
return 0
}
return maxTs
}
@@ -135,32 +148,36 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64) error {
// find smallest overall timestamp in l.data map and delete it from l.data
var minTs int64 = int64(1<<63 - 1)
for ts := range l.data {
if ts < minTs {
if ts < minTs && len(l.data[ts]) != 0 {
minTs = ts
}
}
if from == 0 {
if from == 0 && minTs != int64(1<<63-1) {
from = minTs
}
if from == 0 {
return ErrNoNewData
}
var schema string
var codec *goavro.Codec
record_list := make([]map[string]interface{}, 0)
var f *os.File
filePath := path.Join(dir, fmt.Sprintf("%d.avro", from))
filePath := dir + fmt.Sprintf("_%d.avro", from)
if _, err := os.Stat(filePath); errors.Is(err, os.ErrNotExist) {
err = os.MkdirAll(dir, 0o755)
err = os.MkdirAll(path.Dir(dir), 0o755)
if err == nil {
f, err = os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY, 0o644)
f, err = os.OpenFile(filePath, os.O_CREATE|os.O_RDWR, 0o644)
if err != nil {
return fmt.Errorf("failed to create new avro file: %v", err)
}
}
} else {
} else if fp_, err := os.Stat(filePath); fp_.Size() != 0 || errors.Is(err, os.ErrNotExist) {
f, err = os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open existing avro file: %v", err)
@@ -169,52 +186,58 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64) error {
if err != nil {
return fmt.Errorf("failed to create OCF reader: %v", err)
}
schema = reader.Codec().Schema()
codec = reader.Codec()
schema = codec.Schema()
f.Close()
f, err = os.OpenFile(filePath, os.O_APPEND|os.O_RDWR, 0o644)
if err != nil {
log.Fatalf("Failed to create file: %v", err)
return fmt.Errorf("failed to create file: %v", err)
}
} else {
f, err = os.OpenFile(filePath, os.O_APPEND|os.O_RDWR, 0o644)
if err != nil {
return fmt.Errorf("failed to append new avro file: %v", err)
}
}
defer f.Close()
time_ref := time.Now().Add(time.Duration(-CheckpointBufferMinutes) * time.Minute).Unix()
time_ref := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix()
for ts := range l.data {
if ts < time_ref {
schema_gen, err := generateSchema(l.data[ts])
data := l.data[ts]
schema_gen, err := generateSchema(data)
if err != nil {
return err
}
flag, schema, err := compareSchema(schema, schema_gen)
if err != nil {
log.Fatalf("Failed to compare read and generated schema: %v", err)
return fmt.Errorf("failed to compare read and generated schema: %v", err)
}
if flag {
codec, err = goavro.NewCodec(schema)
if err != nil {
log.Fatalf("Failed to create codec after merged schema: %v", err)
}
f.Close()
f, err = os.Open(filePath)
if err != nil {
log.Fatalf("Failed to open Avro file: %v", err)
return fmt.Errorf("failed to open Avro file: %v", err)
}
ocfReader, err := goavro.NewOCFReader(f)
if err != nil {
log.Fatalf("Failed to create OCF reader: %v", err)
return fmt.Errorf("failed to create OCF reader: %v", err)
}
for ocfReader.Scan() {
record, err := ocfReader.Read()
if err != nil {
log.Fatalf("Failed to read record: %v", err)
return fmt.Errorf("failed to read record: %v", err)
}
record_list = append(record_list, record.(map[string]interface{}))
@@ -224,17 +247,21 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64) error {
err = os.Remove(filePath)
if err != nil {
log.Fatalf("Failed to delete file: %v", err)
return fmt.Errorf("failed to delete file: %v", err)
}
f, err = os.OpenFile(filePath, os.O_CREATE|os.O_RDWR, 0o644)
if err != nil {
log.Fatalf("Failed to create file after deleting : %v", err)
return fmt.Errorf("failed to create file after deleting : %v", err)
}
}
codec, err = goavro.NewCodec(schema)
if err != nil {
return fmt.Errorf("failed to create codec after merged schema: %v", err)
}
record_list = append(record_list, generateRecord(l.data[ts]))
delete(l.data, minTs)
record_list = append(record_list, generateRecord(data))
delete(l.data, ts)
}
}
@@ -242,20 +269,23 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64) error {
return ErrNoNewData
}
// fmt.Printf("Codec : %#v\n", codec)
writer, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: f,
Codec: codec,
Schema: schema,
W: f,
Codec: codec,
})
if err != nil {
log.Fatalf("Failed to create OCF writer: %v", err)
return fmt.Errorf("failed to create OCF writer: %v", err)
}
// Append the new record
if err := writer.Append(record_list); err != nil {
log.Fatalf("Failed to append record: %v", err)
return fmt.Errorf("failed to append record: %v", err)
}
f.Close()
return nil
}
@@ -351,9 +381,6 @@ func compareSchema(schemaRead, schemaGen string) (bool, string, error) {
return false, "", fmt.Errorf("failed to marshal merged schema: %v", err)
}
fmt.Printf("Merged Schema: %s\n", string(mergedSchemaJson))
fmt.Printf("Read Schema: %s\n", schemaRead)
return true, string(mergedSchemaJson), nil
}
@@ -370,9 +397,11 @@ func generateSchema(data map[string]util.Float) (string, error) {
for key := range data {
if _, exists := fieldTracker[key]; !exists {
key = correctKey(key)
field := map[string]interface{}{
"name": key,
"type": "double", // Allows null or float
"type": "double",
"default": 0.0,
}
schema["fields"] = append(schema["fields"].([]map[string]interface{}), field)
@@ -387,15 +416,27 @@ func generateSchema(data map[string]util.Float) (string, error) {
return string(schemaString), nil
}
func generateRecord(data map[string]util.Float) map[string]interface{} {
record := make(map[string]interface{})
// Iterate through each map in data
for key, value := range data {
key = correctKey(key)
// Set the value in the record
record[key] = value
record[key] = value.Double()
}
return record
}
func correctKey(key string) string {
// Replace any invalid characters in the key
// For example, replace spaces with underscores
key = strings.ReplaceAll(key, ":", "___")
key = strings.ReplaceAll(key, ".", "__")
return key
}