Refactor and update dependencies

This commit is contained in:
2025-09-09 11:36:02 +02:00
parent d8e85cf75d
commit d00881de2e
5 changed files with 63 additions and 57 deletions

View File

@@ -65,7 +65,7 @@ func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) {
defer wg.Done()
for workItem := range work {
var from int64 = getTimestamp(workItem.dir)
from := getTimestamp(workItem.dir)
if err := workItem.level.toCheckpoint(workItem.dir, from, dumpAll); err != nil {
if err == ErrNoNewData {
@@ -159,7 +159,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
int_res, _ := strconv.Atoi(path.Base(dir))
// find smallest overall timestamp in l.data map and delete it from l.data
var minTs int64 = int64(1<<63 - 1)
minTs := int64(1<<63 - 1)
for ts, dat := range l.data {
if ts < minTs && len(dat) != 0 {
minTs = ts
@@ -176,7 +176,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
var schema string
var codec *goavro.Codec
record_list := make([]map[string]interface{}, 0)
record_list := make([]map[string]any, 0)
var f *os.File
@@ -220,7 +220,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
repeat := 60 / int_res
for range repeat {
record_list = append(record_list, make(map[string]interface{}))
record_list = append(record_list, make(map[string]any))
}
}
@@ -262,7 +262,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
return fmt.Errorf("failed to read record: %v", err)
}
record_list = append(record_list, record.(map[string]interface{}))
record_list = append(record_list, record.(map[string]any))
}
f.Close()
@@ -411,10 +411,10 @@ func compareSchema(schemaRead, schemaGen string) (bool, string, error) {
func generateSchema(data map[string]schema.Float) (string, error) {
// Define the Avro schema structure
schema := map[string]interface{}{
schema := map[string]any{
"type": "record",
"name": "DataRecord",
"fields": []map[string]interface{}{},
"fields": []map[string]any{},
}
fieldTracker := make(map[string]struct{})
@@ -423,12 +423,12 @@ func generateSchema(data map[string]schema.Float) (string, error) {
if _, exists := fieldTracker[key]; !exists {
key = correctKey(key)
field := map[string]interface{}{
field := map[string]any{
"name": key,
"type": "double",
"default": -1.0,
}
schema["fields"] = append(schema["fields"].([]map[string]interface{}), field)
schema["fields"] = append(schema["fields"].([]map[string]any), field)
fieldTracker[key] = struct{}{}
}
}
@@ -441,14 +441,15 @@ func generateSchema(data map[string]schema.Float) (string, error) {
return string(schemaString), nil
}
func generateRecord(data map[string]schema.Float) map[string]interface{} {
record := make(map[string]interface{})
func generateRecord(data map[string]schema.Float) map[string]any {
record := make(map[string]any)
// Iterate through each map in data
for key, value := range data {
key = correctKey(key)
// Set the value in the record
// avro only accepts basic types
record[key] = value.Double()
}

View File

@@ -1,8 +1,13 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package avro
import (
"context"
"fmt"
"slices"
"strconv"
"sync"
@@ -10,7 +15,6 @@ import (
)
func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
// AvroPool is a pool of Avro writers.
go func() {
if config.MetricStoreKeys.Checkpoints.FileFormat == "json" {
@@ -28,7 +32,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
case <-ctx.Done():
return
case val := <-LineProtocolMessages:
//Fetch the frequency of the metric from the global configuration
// Fetch the frequency of the metric from the global configuration
freq, err := config.GetMetricFrequency(val.MetricName)
if err != nil {
fmt.Printf("Error fetching metric frequency: %s\n", err)
@@ -58,7 +62,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
if avroLevel == nil {
fmt.Printf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName)
}
oldSelector = append([]string{}, selector...)
oldSelector = slices.Clone(selector)
}
avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq))

View File

@@ -1,3 +1,7 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package avro
import (
@@ -37,9 +41,9 @@ type AvroLevel struct {
}
type AvroField struct {
Name string `json:"name"`
Type interface{} `json:"type"`
Default interface{} `json:"default,omitempty"`
Name string `json:"name"`
Type any `json:"type"`
Default any `json:"default,omitempty"`
}
type AvroSchema struct {