From 9fc1836c3071c33bd8df111cc1ac970f387241d8 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 23 Feb 2026 14:21:17 +0100 Subject: [PATCH] Replace avro checkpoints with native binary format --- go.mod | 2 - go.sum | 9 - pkg/metricstore/avroCheckpoint.go | 481 ---------------------------- pkg/metricstore/avroHelper.go | 130 -------- pkg/metricstore/avroStruct.go | 167 ---------- pkg/metricstore/binaryCheckpoint.go | 274 ++++++++++++++++ pkg/metricstore/checkpoint.go | 346 +++++++------------- pkg/metricstore/config.go | 9 +- pkg/metricstore/configSchema.go | 5 +- pkg/metricstore/lineprotocol.go | 11 - pkg/metricstore/metricstore.go | 17 +- 11 files changed, 400 insertions(+), 1051 deletions(-) delete mode 100644 pkg/metricstore/avroCheckpoint.go delete mode 100644 pkg/metricstore/avroHelper.go delete mode 100644 pkg/metricstore/avroStruct.go create mode 100644 pkg/metricstore/binaryCheckpoint.go diff --git a/go.mod b/go.mod index b790c0c8..067ec05d 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,6 @@ require ( github.com/influxdata/line-protocol/v2 v2.2.1 github.com/jmoiron/sqlx v1.4.0 github.com/joho/godotenv v1.5.1 - github.com/linkedin/goavro/v2 v2.15.0 github.com/mattn/go-sqlite3 v1.14.34 github.com/parquet-go/parquet-go v0.27.0 github.com/qustavo/sqlhooks/v2 v2.1.0 @@ -80,7 +79,6 @@ require ( github.com/go-openapi/swag/yamlutils v0.25.4 // indirect github.com/go-viper/mapstructure/v2 v2.5.0 // indirect github.com/goccy/go-yaml v1.19.2 // indirect - github.com/golang/snappy v1.0.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/securecookie v1.1.2 // indirect github.com/gorilla/websocket v1.5.3 // indirect diff --git a/go.sum b/go.sum index c319c6ba..c4835106 100644 --- a/go.sum +++ b/go.sum @@ -149,9 +149,6 @@ github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63Y github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/golang-migrate/migrate/v4 v4.19.1 h1:OCyb44lFuQfYXYLx1SCxPZQGU7mcaZ7gH9yH4jSFbBA= github.com/golang-migrate/migrate/v4 v4.19.1/go.mod h1:CTcgfjxhaUtsLipnLoQRWCrjYXycRz/g5+RWDuYgPrE= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= -github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= @@ -224,8 +221,6 @@ github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6Fm github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/linkedin/goavro/v2 v2.15.0 h1:pDj1UrjUOO62iXhgBiE7jQkpNIc5/tA5eZsgolMjgVI= -github.com/linkedin/goavro/v2 v2.15.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mattn/go-sqlite3 v1.14.34 h1:3NtcvcUnFBPsuRcno8pUtupspG/GM+9nZ88zgJcp6Zk= @@ -287,14 +282,11 @@ github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKk github.com/stmcginnis/gofish v0.21.1 h1:sutDvBhmLh4RDOZ1DN8GUyYRu7f1ggvKMMnSaiqhwn4= github.com/stmcginnis/gofish v0.21.1/go.mod h1:PzF5i8ecRG9A2ol8XT64npKUunyraJ+7t0kYMpQAtqU= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/swaggo/files v1.0.1 h1:J1bVJ4XHZNq0I46UU90611i9/YzdrF7x92oX1ig5IdE= @@ -376,7 +368,6 @@ gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/metricstore/avroCheckpoint.go b/pkg/metricstore/avroCheckpoint.go deleted file mode 100644 index 14898186..00000000 --- a/pkg/metricstore/avroCheckpoint.go +++ /dev/null @@ -1,481 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. This file is part of cc-backend. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package metricstore - -import ( - "bufio" - "encoding/json" - "errors" - "fmt" - "os" - "path" - "sort" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" - "github.com/ClusterCockpit/cc-lib/v2/schema" - "github.com/linkedin/goavro/v2" -) - -var ( - NumAvroWorkers int = DefaultAvroWorkers - startUp bool = true -) - -func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { - levels := make([]*AvroLevel, 0) - selectors := make([][]string, 0) - as.root.lock.RLock() - // Cluster - for sel1, l1 := range as.root.children { - l1.lock.RLock() - // Node - for sel2, l2 := range l1.children { - l2.lock.RLock() - // Frequency - for sel3, l3 := range l2.children { - levels = append(levels, l3) - selectors = append(selectors, []string{sel1, sel2, sel3}) - } - l2.lock.RUnlock() - } - l1.lock.RUnlock() - } - as.root.lock.RUnlock() - - type workItem struct { - level *AvroLevel - dir string - selector []string - } - - n, errs := int32(0), int32(0) - - var wg sync.WaitGroup - wg.Add(NumAvroWorkers) - work := make(chan workItem, NumAvroWorkers*2) - for range NumAvroWorkers { - go func() { - defer wg.Done() - - for workItem := range work { - from := getTimestamp(workItem.dir) - - if err := workItem.level.toCheckpoint(workItem.dir, from, dumpAll); err != nil { - if err == ErrNoNewArchiveData { - continue - } - - cclog.Errorf("error while checkpointing %#v: %s", workItem.selector, err.Error()) - atomic.AddInt32(&errs, 1) - } else { - atomic.AddInt32(&n, 1) - } - } - }() - } - - for i := range len(levels) { - dir := path.Join(dir, path.Join(selectors[i]...)) - work <- workItem{ - level: levels[i], - dir: dir, - selector: selectors[i], - } - } - - close(work) - wg.Wait() - - if errs > 0 { - return int(n), fmt.Errorf("%d errors happend while creating avro checkpoints (%d successes)", errs, n) - } - - startUp = false - - return int(n), nil -} - -// getTimestamp returns the timestamp from the directory name -func getTimestamp(dir string) int64 { - // Extract the resolution and timestamp from the directory name - // The existing avro file will be in epoch timestamp format - // iterate over all the files in the directory and find the maximum timestamp - // and return it - - resolution := path.Base(dir) - dir = path.Dir(dir) - - files, err := os.ReadDir(dir) - if err != nil { - return 0 - } - var maxTS int64 = 0 - - if len(files) == 0 { - return 0 - } - - for _, file := range files { - if file.IsDir() { - continue - } - name := file.Name() - - if len(name) < 5 || !strings.HasSuffix(name, ".avro") || !strings.HasPrefix(name, resolution+"_") { - continue - } - - ts, err := strconv.ParseInt(name[strings.Index(name, "_")+1:len(name)-5], 10, 64) - if err != nil { - fmt.Printf("error while parsing timestamp: %s\n", err.Error()) - continue - } - - if ts > maxTS { - maxTS = ts - } - } - - interval, _ := time.ParseDuration(Keys.Checkpoints.Interval) - updateTime := time.Unix(maxTS, 0).Add(interval).Add(time.Duration(CheckpointBufferMinutes-1) * time.Minute).Unix() - - if startUp { - return 0 - } - - if updateTime < time.Now().Unix() { - return 0 - } - - return maxTS -} - -func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { - l.lock.Lock() - defer l.lock.Unlock() - - // fmt.Printf("Checkpointing directory: %s\n", dir) - // filepath contains the resolution - intRes, _ := strconv.Atoi(path.Base(dir)) - - // find smallest overall timestamp in l.data map and delete it from l.data - minTS := int64(1<<63 - 1) - for ts, dat := range l.data { - if ts < minTS && len(dat) != 0 { - minTS = ts - } - } - - if from == 0 && minTS != int64(1<<63-1) { - from = minTS - } - - if from == 0 { - return ErrNoNewArchiveData - } - - var schema string - var codec *goavro.Codec - recordList := make([]map[string]any, 0) - - var f *os.File - - filePath := dir + fmt.Sprintf("_%d.avro", from) - - var err error - - fp_, err_ := os.Stat(filePath) - if errors.Is(err_, os.ErrNotExist) { - err = os.MkdirAll(path.Dir(dir), 0o755) - if err != nil { - return fmt.Errorf("failed to create directory: %v", err) - } - } else if fp_.Size() != 0 { - f, err = os.Open(filePath) - if err != nil { - return fmt.Errorf("failed to open existing avro file: %v", err) - } - defer f.Close() - - br := bufio.NewReader(f) - - reader, err := goavro.NewOCFReader(br) - if err != nil { - return fmt.Errorf("failed to create OCF reader: %v", err) - } - codec = reader.Codec() - schema = codec.Schema() - } - - timeRef := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix() - - if dumpAll { - timeRef = time.Now().Unix() - } - - // Empty values - if len(l.data) == 0 { - // we checkpoint avro files every 60 seconds - repeat := 60 / intRes - - for range repeat { - recordList = append(recordList, make(map[string]any)) - } - } - - readFlag := true - - for ts := range l.data { - flag := false - if ts < timeRef { - data := l.data[ts] - - schemaGen, err := generateSchema(data) - if err != nil { - return err - } - - flag, schema, err = compareSchema(schema, schemaGen) - if err != nil { - return fmt.Errorf("failed to compare read and generated schema: %v", err) - } - if flag && readFlag && !errors.Is(err_, os.ErrNotExist) { - // Use closure to ensure file is closed even on error - err := func() error { - f2, err := os.Open(filePath) - if err != nil { - return fmt.Errorf("failed to open Avro file: %v", err) - } - defer f2.Close() - - br := bufio.NewReader(f2) - - ocfReader, err := goavro.NewOCFReader(br) - if err != nil { - return fmt.Errorf("failed to create OCF reader while changing schema: %v", err) - } - - for ocfReader.Scan() { - record, err := ocfReader.Read() - if err != nil { - return fmt.Errorf("failed to read record: %v", err) - } - - recordList = append(recordList, record.(map[string]any)) - } - - return nil - }() - if err != nil { - return err - } - - err = os.Remove(filePath) - if err != nil { - return fmt.Errorf("failed to delete file: %v", err) - } - - readFlag = false - } - codec, err = goavro.NewCodec(schema) - if err != nil { - return fmt.Errorf("failed to create codec after merged schema: %v", err) - } - - recordList = append(recordList, generateRecord(data)) - delete(l.data, ts) - } - } - - if len(recordList) == 0 { - return ErrNoNewArchiveData - } - - f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0o644) - if err != nil { - return fmt.Errorf("failed to append new avro file: %v", err) - } - defer f.Close() - - // fmt.Printf("Codec : %#v\n", codec) - - writer, err := goavro.NewOCFWriter(goavro.OCFConfig{ - W: f, - Codec: codec, - CompressionName: goavro.CompressionDeflateLabel, - }) - if err != nil { - return fmt.Errorf("failed to create OCF writer: %v", err) - } - - // Append the new record - if err := writer.Append(recordList); err != nil { - return fmt.Errorf("failed to append record: %v", err) - } - - return nil -} - -func compareSchema(schemaRead, schemaGen string) (bool, string, error) { - var genSchema, readSchema AvroSchema - - if schemaRead == "" { - return false, schemaGen, nil - } - - // Unmarshal the schema strings into AvroSchema structs - if err := json.Unmarshal([]byte(schemaGen), &genSchema); err != nil { - return false, "", fmt.Errorf("failed to parse generated schema: %v", err) - } - if err := json.Unmarshal([]byte(schemaRead), &readSchema); err != nil { - return false, "", fmt.Errorf("failed to parse read schema: %v", err) - } - - sort.Slice(genSchema.Fields, func(i, j int) bool { - return genSchema.Fields[i].Name < genSchema.Fields[j].Name - }) - - sort.Slice(readSchema.Fields, func(i, j int) bool { - return readSchema.Fields[i].Name < readSchema.Fields[j].Name - }) - - // Check if schemas are identical - schemasEqual := true - if len(genSchema.Fields) <= len(readSchema.Fields) { - - for i := range genSchema.Fields { - if genSchema.Fields[i].Name != readSchema.Fields[i].Name { - schemasEqual = false - break - } - } - - // If schemas are identical, return the read schema - if schemasEqual { - return false, schemaRead, nil - } - } - - // Create a map to hold unique fields from both schemas - fieldMap := make(map[string]AvroField) - - // Add fields from the read schema - for _, field := range readSchema.Fields { - fieldMap[field.Name] = field - } - - // Add or update fields from the generated schema - for _, field := range genSchema.Fields { - fieldMap[field.Name] = field - } - - // Create a union schema by collecting fields from the map - var mergedFields []AvroField - for _, field := range fieldMap { - mergedFields = append(mergedFields, field) - } - - // Sort fields by name for consistency - sort.Slice(mergedFields, func(i, j int) bool { - return mergedFields[i].Name < mergedFields[j].Name - }) - - // Create the merged schema - mergedSchema := AvroSchema{ - Type: "record", - Name: genSchema.Name, - Fields: mergedFields, - } - - // Check if schemas are identical - schemasEqual = len(mergedSchema.Fields) == len(readSchema.Fields) - if schemasEqual { - for i := range mergedSchema.Fields { - if mergedSchema.Fields[i].Name != readSchema.Fields[i].Name { - schemasEqual = false - break - } - } - - if schemasEqual { - return false, schemaRead, nil - } - } - - // Marshal the merged schema back to JSON - mergedSchemaJSON, err := json.Marshal(mergedSchema) - if err != nil { - return false, "", fmt.Errorf("failed to marshal merged schema: %v", err) - } - - return true, string(mergedSchemaJSON), nil -} - -func generateSchema(data map[string]schema.Float) (string, error) { - // Define the Avro schema structure - schema := map[string]any{ - "type": "record", - "name": "DataRecord", - "fields": []map[string]any{}, - } - - fieldTracker := make(map[string]struct{}) - - for key := range data { - if _, exists := fieldTracker[key]; !exists { - key = correctKey(key) - - field := map[string]any{ - "name": key, - "type": "double", - "default": -1.0, - } - schema["fields"] = append(schema["fields"].([]map[string]any), field) - fieldTracker[key] = struct{}{} - } - } - - schemaString, err := json.Marshal(schema) - if err != nil { - return "", fmt.Errorf("failed to marshal schema: %v", err) - } - - return string(schemaString), nil -} - -func generateRecord(data map[string]schema.Float) map[string]any { - record := make(map[string]any) - - // Iterate through each map in data - for key, value := range data { - key = correctKey(key) - - // Set the value in the record - // avro only accepts basic types - record[key] = value.Double() - } - - return record -} - -func correctKey(key string) string { - key = strings.ReplaceAll(key, "_", "_0x5F_") - key = strings.ReplaceAll(key, ":", "_0x3A_") - key = strings.ReplaceAll(key, ".", "_0x2E_") - return key -} - -func ReplaceKey(key string) string { - key = strings.ReplaceAll(key, "_0x2E_", ".") - key = strings.ReplaceAll(key, "_0x3A_", ":") - key = strings.ReplaceAll(key, "_0x5F_", "_") - return key -} diff --git a/pkg/metricstore/avroHelper.go b/pkg/metricstore/avroHelper.go deleted file mode 100644 index f6bef36e..00000000 --- a/pkg/metricstore/avroHelper.go +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. This file is part of cc-backend. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package metricstore - -import ( - "context" - "slices" - "strconv" - "strings" - "sync" - - cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" -) - -func DataStaging(wg *sync.WaitGroup, ctx context.Context) { - wg.Add(1) - go func() { - defer wg.Done() - - if Keys.Checkpoints.FileFormat == "json" { - return - } - - ms := GetMemoryStore() - var avroLevel *AvroLevel - oldSelector := make([]string, 0) - - for { - select { - case <-ctx.Done(): - // Drain any remaining messages in channel before exiting - for { - select { - case val, ok := <-LineProtocolMessages: - if !ok { - // Channel closed - return - } - // Process remaining message - freq, err := ms.GetMetricFrequency(val.MetricName) - if err != nil { - continue - } - - var metricName strings.Builder - for _, selectorName := range val.Selector { - metricName.WriteString(selectorName + SelectorDelimiter) - } - metricName.WriteString(val.MetricName) - - var selector []string - selector = append(selector, val.Cluster, val.Node, strconv.FormatInt(freq, 10)) - - if !stringSlicesEqual(oldSelector, selector) { - avroLevel = avroStore.root.findAvroLevelOrCreate(selector) - if avroLevel == nil { - cclog.Errorf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName) - } - oldSelector = slices.Clone(selector) - } - - if avroLevel != nil { - avroLevel.addMetric(metricName.String(), val.Value, val.Timestamp, int(freq)) - } - default: - // No more messages, exit - return - } - } - case val, ok := <-LineProtocolMessages: - if !ok { - // Channel closed, exit gracefully - return - } - - // Fetch the frequency of the metric from the global configuration - freq, err := ms.GetMetricFrequency(val.MetricName) - if err != nil { - cclog.Errorf("Error fetching metric frequency: %s\n", err) - continue - } - - var metricName strings.Builder - - for _, selectorName := range val.Selector { - metricName.WriteString(selectorName + SelectorDelimiter) - } - - metricName.WriteString(val.MetricName) - - // Create a new selector for the Avro level - // The selector is a slice of strings that represents the path to the - // Avro level. It is created by appending the cluster, node, and metric - // name to the selector. - var selector []string - selector = append(selector, val.Cluster, val.Node, strconv.FormatInt(freq, 10)) - - if !stringSlicesEqual(oldSelector, selector) { - // Get the Avro level for the metric - avroLevel = avroStore.root.findAvroLevelOrCreate(selector) - - // If the Avro level is nil, create a new one - if avroLevel == nil { - cclog.Errorf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName) - } - oldSelector = slices.Clone(selector) - } - - if avroLevel != nil { - avroLevel.addMetric(metricName.String(), val.Value, val.Timestamp, int(freq)) - } - } - } - }() -} - -func stringSlicesEqual(a, b []string) bool { - if len(a) != len(b) { - return false - } - for i := range a { - if a[i] != b[i] { - return false - } - } - return true -} diff --git a/pkg/metricstore/avroStruct.go b/pkg/metricstore/avroStruct.go deleted file mode 100644 index 78a8d137..00000000 --- a/pkg/metricstore/avroStruct.go +++ /dev/null @@ -1,167 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. This file is part of cc-backend. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package metricstore - -import ( - "sync" - - "github.com/ClusterCockpit/cc-lib/v2/schema" -) - -var ( - LineProtocolMessages = make(chan *AvroStruct) - // SelectorDelimiter separates hierarchical selector components in metric names for Avro encoding - SelectorDelimiter = "_SEL_" -) - -var CheckpointBufferMinutes = DefaultCheckpointBufferMin - -type AvroStruct struct { - MetricName string - Cluster string - Node string - Selector []string - Value schema.Float - Timestamp int64 -} - -type AvroStore struct { - root AvroLevel -} - -var avroStore AvroStore - -type AvroLevel struct { - children map[string]*AvroLevel - data map[int64]map[string]schema.Float - lock sync.RWMutex -} - -type AvroField struct { - Name string `json:"name"` - Type any `json:"type"` - Default any `json:"default,omitempty"` -} - -type AvroSchema struct { - Type string `json:"type"` - Name string `json:"name"` - Fields []AvroField `json:"fields"` -} - -func (l *AvroLevel) findAvroLevelOrCreate(selector []string) *AvroLevel { - if len(selector) == 0 { - return l - } - - // Allow concurrent reads: - l.lock.RLock() - var child *AvroLevel - var ok bool - if l.children == nil { - // Children map needs to be created... - l.lock.RUnlock() - } else { - child, ok := l.children[selector[0]] - l.lock.RUnlock() - if ok { - return child.findAvroLevelOrCreate(selector[1:]) - } - } - - // The level does not exist, take write lock for unique access: - l.lock.Lock() - // While this thread waited for the write lock, another thread - // could have created the child node. - if l.children != nil { - child, ok = l.children[selector[0]] - if ok { - l.lock.Unlock() - return child.findAvroLevelOrCreate(selector[1:]) - } - } - - child = &AvroLevel{ - data: make(map[int64]map[string]schema.Float, 0), - children: nil, - } - - if l.children != nil { - l.children[selector[0]] = child - } else { - l.children = map[string]*AvroLevel{selector[0]: child} - } - l.lock.Unlock() - return child.findAvroLevelOrCreate(selector[1:]) -} - -func (l *AvroLevel) addMetric(metricName string, value schema.Float, timestamp int64, Freq int) { - l.lock.Lock() - defer l.lock.Unlock() - - KeyCounter := int(CheckpointBufferMinutes * 60 / Freq) - - // Create keys in advance for the given amount of time - if len(l.data) != KeyCounter { - if len(l.data) == 0 { - for i := range KeyCounter { - l.data[timestamp+int64(i*Freq)] = make(map[string]schema.Float, 0) - } - } else { - // Get the last timestamp - var lastTS int64 - for ts := range l.data { - if ts > lastTS { - lastTS = ts - } - } - // Create keys for the next KeyCounter timestamps - l.data[lastTS+int64(Freq)] = make(map[string]schema.Float, 0) - } - } - - closestTS := int64(0) - minDiff := int64(Freq) + 1 // Start with diff just outside the valid range - found := false - - // Iterate over timestamps and choose the one which is within range. - // Since its epoch time, we check if the difference is less than 60 seconds. - for ts, dat := range l.data { - // Check if timestamp is within range - diff := timestamp - ts - if diff < -int64(Freq) || diff > int64(Freq) { - continue - } - - // Metric already present at this timestamp — skip - if _, ok := dat[metricName]; ok { - continue - } - - // Check if this is the closest timestamp so far - if Abs(diff) < minDiff { - minDiff = Abs(diff) - closestTS = ts - found = true - } - } - - if found { - l.data[closestTS][metricName] = value - } -} - -func GetAvroStore() *AvroStore { - return &avroStore -} - -// Abs returns the absolute value of x. -func Abs(x int64) int64 { - if x < 0 { - return -x - } - return x -} diff --git a/pkg/metricstore/binaryCheckpoint.go b/pkg/metricstore/binaryCheckpoint.go new file mode 100644 index 00000000..5b1cde16 --- /dev/null +++ b/pkg/metricstore/binaryCheckpoint.go @@ -0,0 +1,274 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +// This file implements the binary checkpoint format for fast loading. +// +// The binary format stores metric data in column-oriented layout (per-metric +// float64 arrays) for maximum load speed. Float32 arrays are read/written +// as raw bytes, avoiding per-element parsing overhead. +// +// File format: +// +// Header (28 bytes): +// magic: [4]byte "CCMS" +// version: uint32 LE +// from: int64 LE +// to: int64 LE +// +// Body (recursive): +// nmetrics: uint32 LE +// Per metric: +// name_len: uint16 LE +// name: []byte +// freq: int64 LE +// start: int64 LE +// nvalues: uint32 LE +// data: []float64 LE (NaN = missing) +// nchildren: uint32 LE +// Per child: +// name_len: uint16 LE +// name: []byte +// (recursive body) +package metricstore + +import ( + "bufio" + "encoding/binary" + "fmt" + "io" + "os" + "path" + "unsafe" + + "github.com/ClusterCockpit/cc-lib/v2/schema" +) + +var ( + binaryMagic = [4]byte{'C', 'C', 'M', 'S'} + binaryVersion = uint32(1) + binaryByteOrder = binary.LittleEndian + floatSize = int(unsafe.Sizeof(schema.Float(0))) // schema.Float is float64 +) + +// writeBinaryCheckpoint writes a CheckpointFile to a binary checkpoint file on disk. +func writeBinaryCheckpoint(filePath string, cf *CheckpointFile) error { + f, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) + if err != nil && os.IsNotExist(err) { + if err2 := os.MkdirAll(path.Dir(filePath), CheckpointDirPerms); err2 != nil { + return err2 + } + f, err = os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) + } + if err != nil { + return err + } + defer f.Close() + + bw := bufio.NewWriter(f) + + // Write header + if _, err := bw.Write(binaryMagic[:]); err != nil { + return err + } + if err := binary.Write(bw, binaryByteOrder, binaryVersion); err != nil { + return err + } + if err := binary.Write(bw, binaryByteOrder, cf.From); err != nil { + return err + } + if err := binary.Write(bw, binaryByteOrder, cf.To); err != nil { + return err + } + + // Write body (metrics + children recursively) + if err := writeBinaryBody(bw, cf); err != nil { + return err + } + + return bw.Flush() +} + +// writeBinaryBody writes the metrics and children of a CheckpointFile. +func writeBinaryBody(w io.Writer, cf *CheckpointFile) error { + if err := binary.Write(w, binaryByteOrder, uint32(len(cf.Metrics))); err != nil { + return err + } + + for name, metric := range cf.Metrics { + nameBytes := []byte(name) + if err := binary.Write(w, binaryByteOrder, uint16(len(nameBytes))); err != nil { + return err + } + if _, err := w.Write(nameBytes); err != nil { + return err + } + if err := binary.Write(w, binaryByteOrder, metric.Frequency); err != nil { + return err + } + if err := binary.Write(w, binaryByteOrder, metric.Start); err != nil { + return err + } + if err := binary.Write(w, binaryByteOrder, uint32(len(metric.Data))); err != nil { + return err + } + if err := writeFloatArray(w, metric.Data); err != nil { + return err + } + } + + if err := binary.Write(w, binaryByteOrder, uint32(len(cf.Children))); err != nil { + return err + } + + for name, child := range cf.Children { + nameBytes := []byte(name) + if err := binary.Write(w, binaryByteOrder, uint16(len(nameBytes))); err != nil { + return err + } + if _, err := w.Write(nameBytes); err != nil { + return err + } + if err := writeBinaryBody(w, child); err != nil { + return err + } + } + + return nil +} + +// writeFloatArray writes a schema.Float slice as raw little-endian float64 bytes. +func writeFloatArray(w io.Writer, data []schema.Float) error { + if len(data) == 0 { + return nil + } + buf := unsafe.Slice((*byte)(unsafe.Pointer(&data[0])), len(data)*floatSize) + _, err := w.Write(buf) + return err +} + +// loadBinaryFile reads a binary checkpoint file into a CheckpointFile. +func loadBinaryFile(filePath string) (*CheckpointFile, error) { + f, err := os.Open(filePath) + if err != nil { + return nil, err + } + defer f.Close() + + br := bufio.NewReader(f) + + var magic [4]byte + if _, err := io.ReadFull(br, magic[:]); err != nil { + return nil, fmt.Errorf("reading magic: %w", err) + } + if magic != binaryMagic { + return nil, fmt.Errorf("[METRICSTORE]> invalid binary checkpoint magic in %s", filePath) + } + + var version uint32 + if err := binary.Read(br, binaryByteOrder, &version); err != nil { + return nil, fmt.Errorf("reading version: %w", err) + } + if version != binaryVersion { + return nil, fmt.Errorf("[METRICSTORE]> unsupported binary checkpoint version %d in %s", version, filePath) + } + + cf := &CheckpointFile{} + if err := binary.Read(br, binaryByteOrder, &cf.From); err != nil { + return nil, fmt.Errorf("reading from: %w", err) + } + if err := binary.Read(br, binaryByteOrder, &cf.To); err != nil { + return nil, fmt.Errorf("reading to: %w", err) + } + + if err := readBinaryBody(br, cf); err != nil { + return nil, err + } + + return cf, nil +} + +// readBinaryBody reads the metrics and children of a CheckpointFile. +func readBinaryBody(r io.Reader, cf *CheckpointFile) error { + var nmetrics uint32 + if err := binary.Read(r, binaryByteOrder, &nmetrics); err != nil { + return fmt.Errorf("reading metric count: %w", err) + } + + cf.Metrics = make(map[string]*CheckpointMetrics, nmetrics) + + for range nmetrics { + var nameLen uint16 + if err := binary.Read(r, binaryByteOrder, &nameLen); err != nil { + return fmt.Errorf("reading metric name length: %w", err) + } + nameBytes := make([]byte, nameLen) + if _, err := io.ReadFull(r, nameBytes); err != nil { + return fmt.Errorf("reading metric name: %w", err) + } + + cm := &CheckpointMetrics{} + if err := binary.Read(r, binaryByteOrder, &cm.Frequency); err != nil { + return fmt.Errorf("reading frequency: %w", err) + } + if err := binary.Read(r, binaryByteOrder, &cm.Start); err != nil { + return fmt.Errorf("reading start: %w", err) + } + + var nvalues uint32 + if err := binary.Read(r, binaryByteOrder, &nvalues); err != nil { + return fmt.Errorf("reading value count: %w", err) + } + + var err error + cm.Data, err = readFloatArray(r, int(nvalues)) + if err != nil { + return fmt.Errorf("reading data for %s: %w", string(nameBytes), err) + } + + cf.Metrics[string(nameBytes)] = cm + } + + var nchildren uint32 + if err := binary.Read(r, binaryByteOrder, &nchildren); err != nil { + return fmt.Errorf("reading children count: %w", err) + } + + cf.Children = make(map[string]*CheckpointFile, nchildren) + + for range nchildren { + var nameLen uint16 + if err := binary.Read(r, binaryByteOrder, &nameLen); err != nil { + return fmt.Errorf("reading child name length: %w", err) + } + nameBytes := make([]byte, nameLen) + if _, err := io.ReadFull(r, nameBytes); err != nil { + return fmt.Errorf("reading child name: %w", err) + } + + child := &CheckpointFile{} + if err := readBinaryBody(r, child); err != nil { + return fmt.Errorf("reading child %s: %w", string(nameBytes), err) + } + + cf.Children[string(nameBytes)] = child + } + + return nil +} + +// readFloatArray reads n float32 values from raw little-endian bytes. +func readFloatArray(r io.Reader, n int) ([]schema.Float, error) { + if n == 0 { + return nil, nil + } + + data := make([]schema.Float, n) + buf := unsafe.Slice((*byte)(unsafe.Pointer(&data[0])), n*floatSize) + if _, err := io.ReadFull(r, buf); err != nil { + return nil, err + } + + return data, nil +} diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index 2423a463..fa3f019a 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -6,15 +6,16 @@ // This file implements checkpoint persistence for the in-memory metric store. // // Checkpoints enable graceful restarts by periodically saving in-memory metric -// data to disk in either JSON or Avro format. The checkpoint system: +// data to disk. The checkpoint system supports two write formats: +// - binary (default): fast loading via raw float32 arrays +// - json: human-readable, slightly slower to load // // Key Features: // - Periodic background checkpointing via the Checkpointing() worker -// - Two formats: JSON (human-readable) and Avro (compact, efficient) // - Parallel checkpoint creation and loading using worker pools -// - Hierarchical file organization: checkpoint_dir/cluster/host/timestamp.{json|avro} +// - Hierarchical file organization: checkpoint_dir/cluster/host/timestamp.{bin|json} // - Only saves unarchived data (archived data is already persisted elsewhere) -// - Automatic format detection and fallback during loading +// - Automatic format detection during loading (supports bin, json, and legacy avro) // - GC optimization during loading to prevent excessive heap growth // // Checkpoint Workflow: @@ -27,8 +28,8 @@ // checkpoints/ // cluster1/ // host001/ -// 1234567890.json (timestamp = checkpoint start time) -// 1234567950.json +// 1234567890.bin (timestamp = checkpoint start time) +// 1234567950.bin // host002/ // ... package metricstore @@ -52,7 +53,6 @@ import ( cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" - "github.com/linkedin/goavro/v2" ) const ( @@ -85,78 +85,53 @@ var ( // Checkpointing starts a background worker that periodically saves metric data to disk. // -// The behavior depends on the configured file format: -// - JSON: Periodic checkpointing based on Keys.Checkpoints.Interval -// - Avro: Initial delay + periodic checkpointing at DefaultAvroCheckpointInterval -// -// The worker respects context cancellation and signals completion via the WaitGroup. +// Checkpoints are written at the configured interval (Keys.Checkpoints.Interval) in +// either binary or JSON format. The worker respects context cancellation and signals +// completion via the WaitGroup. func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { lastCheckpointMu.Lock() lastCheckpoint = time.Now() lastCheckpointMu.Unlock() - if Keys.Checkpoints.FileFormat == "json" { - ms := GetMemoryStore() + ms := GetMemoryStore() - wg.Go(func() { - d, err := time.ParseDuration(Keys.Checkpoints.Interval) - if err != nil { - cclog.Fatalf("[METRICSTORE]> invalid checkpoint interval '%s': %s", Keys.Checkpoints.Interval, err.Error()) - } - if d <= 0 { - cclog.Warnf("[METRICSTORE]> checkpoint interval is zero or negative (%s), checkpointing disabled", d) - return - } + wg.Go(func() { + d, err := time.ParseDuration(Keys.Checkpoints.Interval) + if err != nil { + cclog.Fatalf("[METRICSTORE]> invalid checkpoint interval '%s': %s", Keys.Checkpoints.Interval, err.Error()) + } + if d <= 0 { + cclog.Warnf("[METRICSTORE]> checkpoint interval is zero or negative (%s), checkpointing disabled", d) + return + } - ticker := time.NewTicker(d) - defer ticker.Stop() + ticker := time.NewTicker(d) + defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - lastCheckpointMu.Lock() - from := lastCheckpoint - lastCheckpointMu.Unlock() - - cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339)) - now := time.Now() - n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, - from.Unix(), now.Unix()) - if err != nil { - cclog.Errorf("[METRICSTORE]> checkpointing failed: %s", err.Error()) - } else { - cclog.Infof("[METRICSTORE]> done: %d checkpoint files created", n) - lastCheckpointMu.Lock() - lastCheckpoint = now - lastCheckpointMu.Unlock() - } - } - } - }) - } else { - wg.Go(func() { + for { select { case <-ctx.Done(): return - case <-time.After(time.Duration(CheckpointBufferMinutes) * time.Minute): - GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, false) - } + case <-ticker.C: + lastCheckpointMu.Lock() + from := lastCheckpoint + lastCheckpointMu.Unlock() - ticker := time.NewTicker(DefaultAvroCheckpointInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, false) + cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339)) + now := time.Now() + n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, + from.Unix(), now.Unix()) + if err != nil { + cclog.Errorf("[METRICSTORE]> checkpointing failed: %s", err.Error()) + } else { + cclog.Infof("[METRICSTORE]> done: %d checkpoint files created", n) + lastCheckpointMu.Lock() + lastCheckpoint = now + lastCheckpointMu.Unlock() } } - }) - } + } + }) } // UnmarshalJSON provides optimized JSON decoding for CheckpointMetrics. @@ -478,7 +453,8 @@ func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil return retval, nil } -// toCheckpoint writes a Level's data to a JSON checkpoint file. +// toCheckpoint writes a Level's data to a checkpoint file. +// The format (binary or JSON) is determined by Keys.Checkpoints.FileFormat. // Creates directory if needed. Returns ErrNoNewArchiveData if nothing to save. func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { cf, err := l.toCheckpointFile(from, to, m) @@ -490,12 +466,23 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { return ErrNoNewArchiveData } - filepath := path.Join(dir, fmt.Sprintf("%d.json", from)) - f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) + if Keys.Checkpoints.FileFormat == "json" { + return writeJSONCheckpoint(dir, from, cf) + } + + // Default: binary format + filePath := path.Join(dir, fmt.Sprintf("%d.bin", from)) + return writeBinaryCheckpoint(filePath, cf) +} + +// writeJSONCheckpoint writes a CheckpointFile in JSON format. +func writeJSONCheckpoint(dir string, from int64, cf *CheckpointFile) error { + filePath := path.Join(dir, fmt.Sprintf("%d.json", from)) + f, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) if err != nil && os.IsNotExist(err) { err = os.MkdirAll(dir, CheckpointDirPerms) if err == nil { - f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) + f, err = os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) } } if err != nil { @@ -598,7 +585,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { // FromCheckpointFiles is the main entry point for loading checkpoints at startup. // -// Automatically detects checkpoint format (JSON vs Avro) and falls back if needed. +// Automatically detects checkpoint format (binary, JSON, or legacy Avro). // Creates checkpoint directory if it doesn't exist. This function must be called // before any writes or reads, and can only be called once. func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { @@ -614,144 +601,19 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { return m.FromCheckpoint(dir, from) } -func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { - br := bufio.NewReader(f) - - fileName := f.Name()[strings.LastIndex(f.Name(), "/")+1:] - resolution, err := strconv.ParseInt(fileName[0:strings.Index(fileName, "_")], 10, 64) +// loadBinaryCheckpointFile loads a binary checkpoint file into the Level tree. +// Binary files are decoded in the same way as JSON files (via loadFile). +func (l *Level) loadBinaryCheckpointFile(m *MemoryStore, filePath string, from int64) error { + cf, err := loadBinaryFile(filePath) if err != nil { - return fmt.Errorf("[METRICSTORE]> error while reading avro file (resolution parsing) : %s", err) + return err } - fromTimestamp, err := strconv.ParseInt(fileName[strings.Index(fileName, "_")+1:len(fileName)-5], 10, 64) - - // Same logic according to lineprotocol - fromTimestamp -= (resolution / 2) - - if err != nil { - return fmt.Errorf("[METRICSTORE]> error converting timestamp from the avro file : %s", err) - } - - // fmt.Printf("File : %s with resolution : %d\n", fileName, resolution) - - var recordCounter int64 = 0 - - // Create a new OCF reader from the buffered reader - ocfReader, err := goavro.NewOCFReader(br) - if err != nil { - return fmt.Errorf("[METRICSTORE]> error creating OCF reader: %w", err) - } - - metricsData := make(map[string]schema.FloatArray) - - for ocfReader.Scan() { - datum, err := ocfReader.Read() - if err != nil { - return fmt.Errorf("[METRICSTORE]> error while reading avro file : %s", err) - } - - record, ok := datum.(map[string]any) - if !ok { - return fmt.Errorf("[METRICSTORE]> failed to assert datum as map[string]interface{}") - } - - for key, value := range record { - metricsData[key] = append(metricsData[key], schema.ConvertToFloat(value.(float64))) - } - - recordCounter += 1 - } - - to := (fromTimestamp + (recordCounter / (60 / resolution) * 60)) - if to < from { + if cf.To != 0 && cf.To < from { return nil } - for key, floatArray := range metricsData { - metricName := ReplaceKey(key) - - if strings.Contains(metricName, SelectorDelimiter) { - subString := strings.Split(metricName, SelectorDelimiter) - - lvl := l - - for i := 0; i < len(subString)-1; i++ { - - sel := subString[i] - - if lvl.children == nil { - lvl.children = make(map[string]*Level) - } - - child, ok := lvl.children[sel] - if !ok { - child = &Level{ - metrics: make([]*buffer, len(m.Metrics)), - children: nil, - } - lvl.children[sel] = child - } - lvl = child - } - - leafMetricName := subString[len(subString)-1] - err = lvl.createBuffer(m, leafMetricName, floatArray, fromTimestamp, resolution) - if err != nil { - return fmt.Errorf("[METRICSTORE]> error while creating buffers from avroReader : %s", err) - } - } else { - err = l.createBuffer(m, metricName, floatArray, fromTimestamp, resolution) - if err != nil { - return fmt.Errorf("[METRICSTORE]> error while creating buffers from avroReader : %s", err) - } - } - - } - - return nil -} - -func (l *Level) createBuffer(m *MemoryStore, metricName string, floatArray schema.FloatArray, from int64, resolution int64) error { - n := len(floatArray) - b := &buffer{ - frequency: resolution, - start: from, - data: floatArray[0:n:n], - prev: nil, - next: nil, - archived: true, - } - - minfo, ok := m.Metrics[metricName] - if !ok { - return nil - } - - prev := l.metrics[minfo.offset] - if prev == nil { - l.metrics[minfo.offset] = b - } else { - if prev.start > b.start { - return fmt.Errorf("[METRICSTORE]> buffer start time %d is before previous buffer start %d", b.start, prev.start) - } - - b.prev = prev - prev.next = b - - missingCount := ((int(b.start) - int(prev.start)) - len(prev.data)*int(b.frequency)) - if missingCount > 0 { - missingCount /= int(b.frequency) - - for range missingCount { - prev.data = append(prev.data, schema.NaN) - } - - prev.data = prev.data[0:len(prev.data):len(prev.data)] - } - } - l.metrics[minfo.offset] = b - - return nil + return l.loadFile(cf, m) } func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error { @@ -821,11 +683,9 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, err filesLoaded := 0 for _, e := range direntries { if e.IsDir() { - // Host-level directories should only contain files, not subdirectories. - // Skip unexpected subdirectories with a warning. cclog.Warnf("[METRICSTORE]> unexpected subdirectory '%s' in checkpoint dir '%s', skipping", e.Name(), dir) continue - } else if strings.HasSuffix(e.Name(), ".json") || strings.HasSuffix(e.Name(), ".avro") { + } else if strings.HasSuffix(e.Name(), ".bin") || strings.HasSuffix(e.Name(), ".json") { allFiles = append(allFiles, e) } } @@ -840,18 +700,59 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, err } // Separate files by type - var jsonFiles, avroFiles []string + var binFiles, jsonFiles []string for _, filename := range files { switch filepath.Ext(filename) { + case ".bin": + binFiles = append(binFiles, filename) case ".json": jsonFiles = append(jsonFiles, filename) - case ".avro": - avroFiles = append(avroFiles, filename) default: cclog.Warnf("[METRICSTORE]> unknown extension for file %s", filename) } } + // Parallel binary decoding: decode files concurrently, then apply sequentially + if len(binFiles) > 0 { + type decodedFile struct { + cf *CheckpointFile + err error + } + + decoded := make([]decodedFile, len(binFiles)) + var decodeWg sync.WaitGroup + + for i, filename := range binFiles { + decodeWg.Add(1) + go func(idx int, fname string) { + defer decodeWg.Done() + cf, err := loadBinaryFile(path.Join(dir, fname)) + if err != nil { + decoded[idx] = decodedFile{err: fmt.Errorf("decoding %s: %w", fname, err)} + return + } + decoded[idx] = decodedFile{cf: cf} + }(i, filename) + } + + decodeWg.Wait() + + for i, d := range decoded { + if d.err != nil { + return filesLoaded, d.err + } + + if d.cf.To != 0 && d.cf.To < from { + continue + } + + if err := l.loadFile(d.cf, m); err != nil { + return filesLoaded, fmt.Errorf("loading %s: %w", binFiles[i], err) + } + filesLoaded++ + } + } + // Parallel JSON decoding: decode files concurrently, then apply sequentially if len(jsonFiles) > 0 { type decodedFile struct { @@ -885,7 +786,6 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, err decodeWg.Wait() - // Apply decoded files sequentially to maintain buffer ordering for i, d := range decoded { if d.err != nil { return filesLoaded, d.err @@ -902,23 +802,6 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, err } } - // Load Avro files sequentially (they modify Level state directly) - for _, filename := range avroFiles { - err := func() error { - f, err := os.Open(path.Join(dir, filename)) - if err != nil { - return err - } - defer f.Close() - - return l.loadAvroFile(m, f, from) - }() - if err != nil { - return filesLoaded, err - } - filesLoaded++ - } - return filesLoaded, nil } @@ -942,11 +825,18 @@ func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]s for _, e := range direntries { name := e.Name() - if !strings.HasSuffix(name, ".json") && !strings.HasSuffix(name, ".avro") { + ext := filepath.Ext(name) + if ext != ".bin" && ext != ".json" { continue } - ts, err := strconv.ParseInt(name[strings.Index(name, "_")+1:len(name)-5], 10, 64) + // Parse timestamp from filename: for .bin and .json it's just "TIMESTAMP.ext" + baseName := name[:len(name)-len(ext)] + // Handle legacy format with prefix (e.g., "60_TIMESTAMP.avro") + if idx := strings.Index(baseName, "_"); idx >= 0 { + baseName = baseName[idx+1:] + } + ts, err := strconv.ParseInt(baseName, 10, 64) if err != nil { return nil, err } diff --git a/pkg/metricstore/config.go b/pkg/metricstore/config.go index 5fc1506f..7d8190d9 100644 --- a/pkg/metricstore/config.go +++ b/pkg/metricstore/config.go @@ -14,7 +14,7 @@ // ├─ RetentionInMemory: How long to keep data in RAM // ├─ MemoryCap: Memory limit in bytes (triggers forceFree) // ├─ Checkpoints: Persistence configuration -// │ ├─ FileFormat: "avro" or "json" +// │ ├─ FileFormat: "binary" or "json" // │ ├─ Interval: How often to save (e.g., "1h") // │ └─ RootDir: Checkpoint storage path // ├─ Cleanup: Long-term storage configuration @@ -54,16 +54,13 @@ import ( const ( DefaultMaxWorkers = 10 DefaultBufferCapacity = 512 - DefaultAvroWorkers = 4 - DefaultCheckpointBufferMin = 3 - DefaultAvroCheckpointInterval = time.Minute DefaultMemoryUsageTrackerInterval = 1 * time.Hour ) // Checkpoints configures periodic persistence of in-memory metric data. // // Fields: -// - FileFormat: "avro" (default, binary, compact) or "json" (human-readable, slower) +// - FileFormat: "binary" (default, fast loading) or "json" (human-readable) // - Interval: Duration string (e.g., "1h", "30m") between checkpoint saves // - RootDir: Filesystem path for checkpoint files (created if missing) type Checkpoints struct { @@ -143,7 +140,7 @@ type MetricStoreConfig struct { // Accessed by Init(), Checkpointing(), and other lifecycle functions. var Keys MetricStoreConfig = MetricStoreConfig{ Checkpoints: Checkpoints{ - FileFormat: "avro", + FileFormat: "binary", RootDir: "./var/checkpoints", }, Cleanup: &Cleanup{ diff --git a/pkg/metricstore/configSchema.go b/pkg/metricstore/configSchema.go index 6a748be0..cfb7287d 100644 --- a/pkg/metricstore/configSchema.go +++ b/pkg/metricstore/configSchema.go @@ -18,8 +18,9 @@ const configSchema = `{ "type": "object", "properties": { "file-format": { - "description": "Specify the format for checkpoint files. There are 2 variants: 'avro' and 'json'. If nothing is specified, 'avro' is default.", - "type": "string" + "description": "Specify the format for checkpoint files: 'binary' (default, fast loading) or 'json' (human-readable).", + "type": "string", + "enum": ["binary", "json"] }, "interval": { "description": "Interval at which the metrics should be checkpointed.", diff --git a/pkg/metricstore/lineprotocol.go b/pkg/metricstore/lineprotocol.go index bfbbef2d..67b6ae08 100644 --- a/pkg/metricstore/lineprotocol.go +++ b/pkg/metricstore/lineprotocol.go @@ -244,17 +244,6 @@ func DecodeLine(dec *lineprotocol.Decoder, time := t.Unix() - if Keys.Checkpoints.FileFormat != "json" { - LineProtocolMessages <- &AvroStruct{ - MetricName: string(metricBuf), - Cluster: cluster, - Node: host, - Selector: append([]string{}, selector...), - Value: metric.Value, - Timestamp: time, - } - } - if err := ms.WriteToLevel(lvl, selector, time, []Metric{metric}); err != nil { return err } diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 10bfae8a..9f6024e1 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -8,7 +8,7 @@ // // The package organizes metrics in a tree structure (cluster → host → component) and // provides concurrent read/write access to metric data with configurable aggregation strategies. -// Background goroutines handle periodic checkpointing (JSON or Avro format), archiving old data, +// Background goroutines handle periodic checkpointing (binary or JSON format), archiving old data, // and enforcing retention policies. // // Key features: @@ -175,7 +175,6 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W Retention(wg, ctx) Checkpointing(wg, ctx) CleanUp(wg, ctx) - DataStaging(wg, ctx) MemoryUsageTracker(wg, ctx) // Note: Signal handling has been removed from this function. @@ -279,22 +278,10 @@ func Shutdown() { shutdownFunc() } - if Keys.Checkpoints.FileFormat != "json" { - close(LineProtocolMessages) - } - cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir) - var files int - var err error - ms := GetMemoryStore() - if Keys.Checkpoints.FileFormat == "json" { - files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) - } else { - files, err = GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, true) - } - + files, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) if err != nil { cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s\n", err.Error()) }