mirror of
https://github.com/ClusterCockpit/cc-metric-store.git
synced 2025-07-22 04:41:40 +02:00
Initial avro reader complete
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package avro
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -149,7 +150,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
fmt.Printf("Checkpointing directory: %s\n", dir)
|
||||
// fmt.Printf("Checkpointing directory: %s\n", dir)
|
||||
|
||||
// find smallest overall timestamp in l.data map and delete it from l.data
|
||||
var minTs int64 = int64(1<<63 - 1)
|
||||
@@ -175,17 +176,21 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
|
||||
|
||||
filePath := dir + fmt.Sprintf("_%d.avro", from)
|
||||
|
||||
if _, err := os.Stat(filePath); errors.Is(err, os.ErrNotExist) {
|
||||
fp_, err := os.Stat(filePath)
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
err = os.MkdirAll(path.Dir(dir), 0o755)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create directory: %v", err)
|
||||
}
|
||||
} else if fp_, err := os.Stat(filePath); fp_.Size() != 0 || errors.Is(err, os.ErrNotExist) {
|
||||
} else if fp_.Size() != 0 {
|
||||
f, err = os.Open(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open existing avro file: %v", err)
|
||||
}
|
||||
reader, err := goavro.NewOCFReader(f)
|
||||
|
||||
br := bufio.NewReader(f)
|
||||
|
||||
reader, err := goavro.NewOCFReader(br)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create OCF reader: %v", err)
|
||||
}
|
||||
@@ -194,11 +199,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
|
||||
|
||||
f.Close()
|
||||
}
|
||||
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0o644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to append new avro file: %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
time_ref := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix()
|
||||
|
||||
@@ -218,7 +218,10 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
|
||||
}
|
||||
}
|
||||
|
||||
readFlag := true
|
||||
|
||||
for ts := range l.data {
|
||||
flag := false
|
||||
if ts < time_ref {
|
||||
data := l.data[ts]
|
||||
|
||||
@@ -228,12 +231,12 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
|
||||
return err
|
||||
}
|
||||
|
||||
flag, schema, err := compareSchema(schema, schema_gen)
|
||||
flag, schema, err = compareSchema(schema, schema_gen)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to compare read and generated schema: %v", err)
|
||||
}
|
||||
if flag {
|
||||
if flag && readFlag {
|
||||
|
||||
f.Close()
|
||||
|
||||
@@ -242,9 +245,11 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
|
||||
return fmt.Errorf("failed to open Avro file: %v", err)
|
||||
}
|
||||
|
||||
ocfReader, err := goavro.NewOCFReader(f)
|
||||
br := bufio.NewReader(f)
|
||||
|
||||
ocfReader, err := goavro.NewOCFReader(br)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create OCF reader: %v", err)
|
||||
return fmt.Errorf("failed to create OCF reader while changing schema: %v", err)
|
||||
}
|
||||
|
||||
for ocfReader.Scan() {
|
||||
@@ -263,10 +268,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
|
||||
return fmt.Errorf("failed to delete file: %v", err)
|
||||
}
|
||||
|
||||
f, err = os.OpenFile(filePath, os.O_CREATE|os.O_RDWR, 0o644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create file after deleting : %v", err)
|
||||
}
|
||||
readFlag = false
|
||||
}
|
||||
codec, err = goavro.NewCodec(schema)
|
||||
if err != nil {
|
||||
@@ -282,6 +284,11 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
|
||||
return ErrNoNewData
|
||||
}
|
||||
|
||||
f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0o644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to append new avro file: %v", err)
|
||||
}
|
||||
|
||||
// fmt.Printf("Codec : %#v\n", codec)
|
||||
|
||||
writer, err := goavro.NewOCFWriter(goavro.OCFConfig{
|
||||
@@ -415,7 +422,7 @@ func generateSchema(data map[string]util.Float) (string, error) {
|
||||
field := map[string]interface{}{
|
||||
"name": key,
|
||||
"type": "double",
|
||||
"default": 0.0,
|
||||
"default": -1.0,
|
||||
}
|
||||
schema["fields"] = append(schema["fields"].([]map[string]interface{}), field)
|
||||
fieldTracker[key] = struct{}{}
|
||||
@@ -453,3 +460,12 @@ func correctKey(key string) string {
|
||||
|
||||
return key
|
||||
}
|
||||
|
||||
func ReplaceKey(key string) string {
|
||||
// Replace any invalid characters in the key
|
||||
// For example, replace spaces with underscores
|
||||
key = strings.ReplaceAll(key, "___", ":")
|
||||
key = strings.ReplaceAll(key, "__", ".")
|
||||
|
||||
return key
|
||||
}
|
||||
|
Reference in New Issue
Block a user