diff --git a/go.mod b/go.mod index e244062c..c561f627 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,6 @@ require ( github.com/influxdata/line-protocol/v2 v2.2.1 github.com/jmoiron/sqlx v1.4.0 github.com/joho/godotenv v1.5.1 - github.com/linkedin/goavro/v2 v2.15.0 github.com/mattn/go-sqlite3 v1.14.34 github.com/parquet-go/parquet-go v0.27.0 github.com/qustavo/sqlhooks/v2 v2.1.0 @@ -80,7 +79,6 @@ require ( github.com/go-openapi/swag/yamlutils v0.25.4 // indirect github.com/go-viper/mapstructure/v2 v2.5.0 // indirect github.com/goccy/go-yaml v1.19.2 // indirect - github.com/golang/snappy v1.0.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/securecookie v1.1.2 // indirect github.com/gorilla/websocket v1.5.3 // indirect diff --git a/go.sum b/go.sum index f2929454..5586b9c5 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,6 @@ github.com/99designs/gqlgen v0.17.86 h1:C8N3UTa5heXX6twl+b0AJyGkTwYL6dNmFrgZNLRc github.com/99designs/gqlgen v0.17.86/go.mod h1:KTrPl+vHA1IUzNlh4EYkl7+tcErL3MgKnhHrBcV74Fw= github.com/Azure/go-ntlmssp v0.1.0 h1:DjFo6YtWzNqNvQdrwEyr/e4nhU3vRiwenz5QX7sFz+A= github.com/Azure/go-ntlmssp v0.1.0/go.mod h1:NYqdhxd/8aAct/s4qSYZEerdPuH1liG2/X9DiVTbhpk= -github.com/ClusterCockpit/cc-lib/v2 v2.5.1 h1:s6M9tyPDty+4zTdQGJYKpGJM9Nz7N6ITMdjPvNSLX5g= -github.com/ClusterCockpit/cc-lib/v2 v2.5.1/go.mod h1:DZ8OIHPUZJpWqErLITt0B8P6/Q7CBW2IQSQ5YiFFaG0= github.com/ClusterCockpit/cc-lib/v2 v2.6.0 h1:Q7zvRAVhfYA9PDB18pfY9A/6Ws4oWpnv8+P9MBRUDzg= github.com/ClusterCockpit/cc-lib/v2 v2.6.0/go.mod h1:DZ8OIHPUZJpWqErLITt0B8P6/Q7CBW2IQSQ5YiFFaG0= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= @@ -151,9 +149,6 @@ github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63Y github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/golang-migrate/migrate/v4 v4.19.1 h1:OCyb44lFuQfYXYLx1SCxPZQGU7mcaZ7gH9yH4jSFbBA= github.com/golang-migrate/migrate/v4 v4.19.1/go.mod h1:CTcgfjxhaUtsLipnLoQRWCrjYXycRz/g5+RWDuYgPrE= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= -github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= @@ -226,8 +221,6 @@ github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6Fm github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/linkedin/goavro/v2 v2.15.0 h1:pDj1UrjUOO62iXhgBiE7jQkpNIc5/tA5eZsgolMjgVI= -github.com/linkedin/goavro/v2 v2.15.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mattn/go-sqlite3 v1.14.34 h1:3NtcvcUnFBPsuRcno8pUtupspG/GM+9nZ88zgJcp6Zk= @@ -289,14 +282,11 @@ github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKk github.com/stmcginnis/gofish v0.21.1 h1:sutDvBhmLh4RDOZ1DN8GUyYRu7f1ggvKMMnSaiqhwn4= github.com/stmcginnis/gofish v0.21.1/go.mod h1:PzF5i8ecRG9A2ol8XT64npKUunyraJ+7t0kYMpQAtqU= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/swaggo/files v1.0.1 h1:J1bVJ4XHZNq0I46UU90611i9/YzdrF7x92oX1ig5IdE= @@ -378,7 +368,6 @@ gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/metricstore/avroCheckpoint.go b/pkg/metricstore/avroCheckpoint.go deleted file mode 100644 index 14898186..00000000 --- a/pkg/metricstore/avroCheckpoint.go +++ /dev/null @@ -1,481 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. This file is part of cc-backend. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package metricstore - -import ( - "bufio" - "encoding/json" - "errors" - "fmt" - "os" - "path" - "sort" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" - "github.com/ClusterCockpit/cc-lib/v2/schema" - "github.com/linkedin/goavro/v2" -) - -var ( - NumAvroWorkers int = DefaultAvroWorkers - startUp bool = true -) - -func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { - levels := make([]*AvroLevel, 0) - selectors := make([][]string, 0) - as.root.lock.RLock() - // Cluster - for sel1, l1 := range as.root.children { - l1.lock.RLock() - // Node - for sel2, l2 := range l1.children { - l2.lock.RLock() - // Frequency - for sel3, l3 := range l2.children { - levels = append(levels, l3) - selectors = append(selectors, []string{sel1, sel2, sel3}) - } - l2.lock.RUnlock() - } - l1.lock.RUnlock() - } - as.root.lock.RUnlock() - - type workItem struct { - level *AvroLevel - dir string - selector []string - } - - n, errs := int32(0), int32(0) - - var wg sync.WaitGroup - wg.Add(NumAvroWorkers) - work := make(chan workItem, NumAvroWorkers*2) - for range NumAvroWorkers { - go func() { - defer wg.Done() - - for workItem := range work { - from := getTimestamp(workItem.dir) - - if err := workItem.level.toCheckpoint(workItem.dir, from, dumpAll); err != nil { - if err == ErrNoNewArchiveData { - continue - } - - cclog.Errorf("error while checkpointing %#v: %s", workItem.selector, err.Error()) - atomic.AddInt32(&errs, 1) - } else { - atomic.AddInt32(&n, 1) - } - } - }() - } - - for i := range len(levels) { - dir := path.Join(dir, path.Join(selectors[i]...)) - work <- workItem{ - level: levels[i], - dir: dir, - selector: selectors[i], - } - } - - close(work) - wg.Wait() - - if errs > 0 { - return int(n), fmt.Errorf("%d errors happend while creating avro checkpoints (%d successes)", errs, n) - } - - startUp = false - - return int(n), nil -} - -// getTimestamp returns the timestamp from the directory name -func getTimestamp(dir string) int64 { - // Extract the resolution and timestamp from the directory name - // The existing avro file will be in epoch timestamp format - // iterate over all the files in the directory and find the maximum timestamp - // and return it - - resolution := path.Base(dir) - dir = path.Dir(dir) - - files, err := os.ReadDir(dir) - if err != nil { - return 0 - } - var maxTS int64 = 0 - - if len(files) == 0 { - return 0 - } - - for _, file := range files { - if file.IsDir() { - continue - } - name := file.Name() - - if len(name) < 5 || !strings.HasSuffix(name, ".avro") || !strings.HasPrefix(name, resolution+"_") { - continue - } - - ts, err := strconv.ParseInt(name[strings.Index(name, "_")+1:len(name)-5], 10, 64) - if err != nil { - fmt.Printf("error while parsing timestamp: %s\n", err.Error()) - continue - } - - if ts > maxTS { - maxTS = ts - } - } - - interval, _ := time.ParseDuration(Keys.Checkpoints.Interval) - updateTime := time.Unix(maxTS, 0).Add(interval).Add(time.Duration(CheckpointBufferMinutes-1) * time.Minute).Unix() - - if startUp { - return 0 - } - - if updateTime < time.Now().Unix() { - return 0 - } - - return maxTS -} - -func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { - l.lock.Lock() - defer l.lock.Unlock() - - // fmt.Printf("Checkpointing directory: %s\n", dir) - // filepath contains the resolution - intRes, _ := strconv.Atoi(path.Base(dir)) - - // find smallest overall timestamp in l.data map and delete it from l.data - minTS := int64(1<<63 - 1) - for ts, dat := range l.data { - if ts < minTS && len(dat) != 0 { - minTS = ts - } - } - - if from == 0 && minTS != int64(1<<63-1) { - from = minTS - } - - if from == 0 { - return ErrNoNewArchiveData - } - - var schema string - var codec *goavro.Codec - recordList := make([]map[string]any, 0) - - var f *os.File - - filePath := dir + fmt.Sprintf("_%d.avro", from) - - var err error - - fp_, err_ := os.Stat(filePath) - if errors.Is(err_, os.ErrNotExist) { - err = os.MkdirAll(path.Dir(dir), 0o755) - if err != nil { - return fmt.Errorf("failed to create directory: %v", err) - } - } else if fp_.Size() != 0 { - f, err = os.Open(filePath) - if err != nil { - return fmt.Errorf("failed to open existing avro file: %v", err) - } - defer f.Close() - - br := bufio.NewReader(f) - - reader, err := goavro.NewOCFReader(br) - if err != nil { - return fmt.Errorf("failed to create OCF reader: %v", err) - } - codec = reader.Codec() - schema = codec.Schema() - } - - timeRef := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix() - - if dumpAll { - timeRef = time.Now().Unix() - } - - // Empty values - if len(l.data) == 0 { - // we checkpoint avro files every 60 seconds - repeat := 60 / intRes - - for range repeat { - recordList = append(recordList, make(map[string]any)) - } - } - - readFlag := true - - for ts := range l.data { - flag := false - if ts < timeRef { - data := l.data[ts] - - schemaGen, err := generateSchema(data) - if err != nil { - return err - } - - flag, schema, err = compareSchema(schema, schemaGen) - if err != nil { - return fmt.Errorf("failed to compare read and generated schema: %v", err) - } - if flag && readFlag && !errors.Is(err_, os.ErrNotExist) { - // Use closure to ensure file is closed even on error - err := func() error { - f2, err := os.Open(filePath) - if err != nil { - return fmt.Errorf("failed to open Avro file: %v", err) - } - defer f2.Close() - - br := bufio.NewReader(f2) - - ocfReader, err := goavro.NewOCFReader(br) - if err != nil { - return fmt.Errorf("failed to create OCF reader while changing schema: %v", err) - } - - for ocfReader.Scan() { - record, err := ocfReader.Read() - if err != nil { - return fmt.Errorf("failed to read record: %v", err) - } - - recordList = append(recordList, record.(map[string]any)) - } - - return nil - }() - if err != nil { - return err - } - - err = os.Remove(filePath) - if err != nil { - return fmt.Errorf("failed to delete file: %v", err) - } - - readFlag = false - } - codec, err = goavro.NewCodec(schema) - if err != nil { - return fmt.Errorf("failed to create codec after merged schema: %v", err) - } - - recordList = append(recordList, generateRecord(data)) - delete(l.data, ts) - } - } - - if len(recordList) == 0 { - return ErrNoNewArchiveData - } - - f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0o644) - if err != nil { - return fmt.Errorf("failed to append new avro file: %v", err) - } - defer f.Close() - - // fmt.Printf("Codec : %#v\n", codec) - - writer, err := goavro.NewOCFWriter(goavro.OCFConfig{ - W: f, - Codec: codec, - CompressionName: goavro.CompressionDeflateLabel, - }) - if err != nil { - return fmt.Errorf("failed to create OCF writer: %v", err) - } - - // Append the new record - if err := writer.Append(recordList); err != nil { - return fmt.Errorf("failed to append record: %v", err) - } - - return nil -} - -func compareSchema(schemaRead, schemaGen string) (bool, string, error) { - var genSchema, readSchema AvroSchema - - if schemaRead == "" { - return false, schemaGen, nil - } - - // Unmarshal the schema strings into AvroSchema structs - if err := json.Unmarshal([]byte(schemaGen), &genSchema); err != nil { - return false, "", fmt.Errorf("failed to parse generated schema: %v", err) - } - if err := json.Unmarshal([]byte(schemaRead), &readSchema); err != nil { - return false, "", fmt.Errorf("failed to parse read schema: %v", err) - } - - sort.Slice(genSchema.Fields, func(i, j int) bool { - return genSchema.Fields[i].Name < genSchema.Fields[j].Name - }) - - sort.Slice(readSchema.Fields, func(i, j int) bool { - return readSchema.Fields[i].Name < readSchema.Fields[j].Name - }) - - // Check if schemas are identical - schemasEqual := true - if len(genSchema.Fields) <= len(readSchema.Fields) { - - for i := range genSchema.Fields { - if genSchema.Fields[i].Name != readSchema.Fields[i].Name { - schemasEqual = false - break - } - } - - // If schemas are identical, return the read schema - if schemasEqual { - return false, schemaRead, nil - } - } - - // Create a map to hold unique fields from both schemas - fieldMap := make(map[string]AvroField) - - // Add fields from the read schema - for _, field := range readSchema.Fields { - fieldMap[field.Name] = field - } - - // Add or update fields from the generated schema - for _, field := range genSchema.Fields { - fieldMap[field.Name] = field - } - - // Create a union schema by collecting fields from the map - var mergedFields []AvroField - for _, field := range fieldMap { - mergedFields = append(mergedFields, field) - } - - // Sort fields by name for consistency - sort.Slice(mergedFields, func(i, j int) bool { - return mergedFields[i].Name < mergedFields[j].Name - }) - - // Create the merged schema - mergedSchema := AvroSchema{ - Type: "record", - Name: genSchema.Name, - Fields: mergedFields, - } - - // Check if schemas are identical - schemasEqual = len(mergedSchema.Fields) == len(readSchema.Fields) - if schemasEqual { - for i := range mergedSchema.Fields { - if mergedSchema.Fields[i].Name != readSchema.Fields[i].Name { - schemasEqual = false - break - } - } - - if schemasEqual { - return false, schemaRead, nil - } - } - - // Marshal the merged schema back to JSON - mergedSchemaJSON, err := json.Marshal(mergedSchema) - if err != nil { - return false, "", fmt.Errorf("failed to marshal merged schema: %v", err) - } - - return true, string(mergedSchemaJSON), nil -} - -func generateSchema(data map[string]schema.Float) (string, error) { - // Define the Avro schema structure - schema := map[string]any{ - "type": "record", - "name": "DataRecord", - "fields": []map[string]any{}, - } - - fieldTracker := make(map[string]struct{}) - - for key := range data { - if _, exists := fieldTracker[key]; !exists { - key = correctKey(key) - - field := map[string]any{ - "name": key, - "type": "double", - "default": -1.0, - } - schema["fields"] = append(schema["fields"].([]map[string]any), field) - fieldTracker[key] = struct{}{} - } - } - - schemaString, err := json.Marshal(schema) - if err != nil { - return "", fmt.Errorf("failed to marshal schema: %v", err) - } - - return string(schemaString), nil -} - -func generateRecord(data map[string]schema.Float) map[string]any { - record := make(map[string]any) - - // Iterate through each map in data - for key, value := range data { - key = correctKey(key) - - // Set the value in the record - // avro only accepts basic types - record[key] = value.Double() - } - - return record -} - -func correctKey(key string) string { - key = strings.ReplaceAll(key, "_", "_0x5F_") - key = strings.ReplaceAll(key, ":", "_0x3A_") - key = strings.ReplaceAll(key, ".", "_0x2E_") - return key -} - -func ReplaceKey(key string) string { - key = strings.ReplaceAll(key, "_0x2E_", ".") - key = strings.ReplaceAll(key, "_0x3A_", ":") - key = strings.ReplaceAll(key, "_0x5F_", "_") - return key -} diff --git a/pkg/metricstore/avroHelper.go b/pkg/metricstore/avroHelper.go deleted file mode 100644 index f6bef36e..00000000 --- a/pkg/metricstore/avroHelper.go +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. This file is part of cc-backend. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package metricstore - -import ( - "context" - "slices" - "strconv" - "strings" - "sync" - - cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" -) - -func DataStaging(wg *sync.WaitGroup, ctx context.Context) { - wg.Add(1) - go func() { - defer wg.Done() - - if Keys.Checkpoints.FileFormat == "json" { - return - } - - ms := GetMemoryStore() - var avroLevel *AvroLevel - oldSelector := make([]string, 0) - - for { - select { - case <-ctx.Done(): - // Drain any remaining messages in channel before exiting - for { - select { - case val, ok := <-LineProtocolMessages: - if !ok { - // Channel closed - return - } - // Process remaining message - freq, err := ms.GetMetricFrequency(val.MetricName) - if err != nil { - continue - } - - var metricName strings.Builder - for _, selectorName := range val.Selector { - metricName.WriteString(selectorName + SelectorDelimiter) - } - metricName.WriteString(val.MetricName) - - var selector []string - selector = append(selector, val.Cluster, val.Node, strconv.FormatInt(freq, 10)) - - if !stringSlicesEqual(oldSelector, selector) { - avroLevel = avroStore.root.findAvroLevelOrCreate(selector) - if avroLevel == nil { - cclog.Errorf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName) - } - oldSelector = slices.Clone(selector) - } - - if avroLevel != nil { - avroLevel.addMetric(metricName.String(), val.Value, val.Timestamp, int(freq)) - } - default: - // No more messages, exit - return - } - } - case val, ok := <-LineProtocolMessages: - if !ok { - // Channel closed, exit gracefully - return - } - - // Fetch the frequency of the metric from the global configuration - freq, err := ms.GetMetricFrequency(val.MetricName) - if err != nil { - cclog.Errorf("Error fetching metric frequency: %s\n", err) - continue - } - - var metricName strings.Builder - - for _, selectorName := range val.Selector { - metricName.WriteString(selectorName + SelectorDelimiter) - } - - metricName.WriteString(val.MetricName) - - // Create a new selector for the Avro level - // The selector is a slice of strings that represents the path to the - // Avro level. It is created by appending the cluster, node, and metric - // name to the selector. - var selector []string - selector = append(selector, val.Cluster, val.Node, strconv.FormatInt(freq, 10)) - - if !stringSlicesEqual(oldSelector, selector) { - // Get the Avro level for the metric - avroLevel = avroStore.root.findAvroLevelOrCreate(selector) - - // If the Avro level is nil, create a new one - if avroLevel == nil { - cclog.Errorf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName) - } - oldSelector = slices.Clone(selector) - } - - if avroLevel != nil { - avroLevel.addMetric(metricName.String(), val.Value, val.Timestamp, int(freq)) - } - } - } - }() -} - -func stringSlicesEqual(a, b []string) bool { - if len(a) != len(b) { - return false - } - for i := range a { - if a[i] != b[i] { - return false - } - } - return true -} diff --git a/pkg/metricstore/avroStruct.go b/pkg/metricstore/avroStruct.go deleted file mode 100644 index 78a8d137..00000000 --- a/pkg/metricstore/avroStruct.go +++ /dev/null @@ -1,167 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. This file is part of cc-backend. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package metricstore - -import ( - "sync" - - "github.com/ClusterCockpit/cc-lib/v2/schema" -) - -var ( - LineProtocolMessages = make(chan *AvroStruct) - // SelectorDelimiter separates hierarchical selector components in metric names for Avro encoding - SelectorDelimiter = "_SEL_" -) - -var CheckpointBufferMinutes = DefaultCheckpointBufferMin - -type AvroStruct struct { - MetricName string - Cluster string - Node string - Selector []string - Value schema.Float - Timestamp int64 -} - -type AvroStore struct { - root AvroLevel -} - -var avroStore AvroStore - -type AvroLevel struct { - children map[string]*AvroLevel - data map[int64]map[string]schema.Float - lock sync.RWMutex -} - -type AvroField struct { - Name string `json:"name"` - Type any `json:"type"` - Default any `json:"default,omitempty"` -} - -type AvroSchema struct { - Type string `json:"type"` - Name string `json:"name"` - Fields []AvroField `json:"fields"` -} - -func (l *AvroLevel) findAvroLevelOrCreate(selector []string) *AvroLevel { - if len(selector) == 0 { - return l - } - - // Allow concurrent reads: - l.lock.RLock() - var child *AvroLevel - var ok bool - if l.children == nil { - // Children map needs to be created... - l.lock.RUnlock() - } else { - child, ok := l.children[selector[0]] - l.lock.RUnlock() - if ok { - return child.findAvroLevelOrCreate(selector[1:]) - } - } - - // The level does not exist, take write lock for unique access: - l.lock.Lock() - // While this thread waited for the write lock, another thread - // could have created the child node. - if l.children != nil { - child, ok = l.children[selector[0]] - if ok { - l.lock.Unlock() - return child.findAvroLevelOrCreate(selector[1:]) - } - } - - child = &AvroLevel{ - data: make(map[int64]map[string]schema.Float, 0), - children: nil, - } - - if l.children != nil { - l.children[selector[0]] = child - } else { - l.children = map[string]*AvroLevel{selector[0]: child} - } - l.lock.Unlock() - return child.findAvroLevelOrCreate(selector[1:]) -} - -func (l *AvroLevel) addMetric(metricName string, value schema.Float, timestamp int64, Freq int) { - l.lock.Lock() - defer l.lock.Unlock() - - KeyCounter := int(CheckpointBufferMinutes * 60 / Freq) - - // Create keys in advance for the given amount of time - if len(l.data) != KeyCounter { - if len(l.data) == 0 { - for i := range KeyCounter { - l.data[timestamp+int64(i*Freq)] = make(map[string]schema.Float, 0) - } - } else { - // Get the last timestamp - var lastTS int64 - for ts := range l.data { - if ts > lastTS { - lastTS = ts - } - } - // Create keys for the next KeyCounter timestamps - l.data[lastTS+int64(Freq)] = make(map[string]schema.Float, 0) - } - } - - closestTS := int64(0) - minDiff := int64(Freq) + 1 // Start with diff just outside the valid range - found := false - - // Iterate over timestamps and choose the one which is within range. - // Since its epoch time, we check if the difference is less than 60 seconds. - for ts, dat := range l.data { - // Check if timestamp is within range - diff := timestamp - ts - if diff < -int64(Freq) || diff > int64(Freq) { - continue - } - - // Metric already present at this timestamp — skip - if _, ok := dat[metricName]; ok { - continue - } - - // Check if this is the closest timestamp so far - if Abs(diff) < minDiff { - minDiff = Abs(diff) - closestTS = ts - found = true - } - } - - if found { - l.data[closestTS][metricName] = value - } -} - -func GetAvroStore() *AvroStore { - return &avroStore -} - -// Abs returns the absolute value of x. -func Abs(x int64) int64 { - if x < 0 { - return -x - } - return x -} diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index b4097ff2..590197e3 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -6,15 +6,15 @@ // This file implements checkpoint persistence for the in-memory metric store. // // Checkpoints enable graceful restarts by periodically saving in-memory metric -// data to disk in either JSON or Avro format. The checkpoint system: +// data to disk in JSON or binary format. The checkpoint system: // // Key Features: // - Periodic background checkpointing via the Checkpointing() worker -// - Two formats: JSON (human-readable) and Avro (compact, efficient) +// - Two format families: JSON (human-readable) and WAL+binary (compact, crash-safe) // - Parallel checkpoint creation and loading using worker pools -// - Hierarchical file organization: checkpoint_dir/cluster/host/timestamp.{json|avro} +// - Hierarchical file organization: checkpoint_dir/cluster/host/timestamp.{json|bin} +// - WAL file: checkpoint_dir/cluster/host/current.wal (append-only, per-entry) // - Only saves unarchived data (archived data is already persisted elsewhere) -// - Automatic format detection and fallback during loading // - GC optimization during loading to prevent excessive heap growth // // Checkpoint Workflow: @@ -27,8 +27,9 @@ // checkpoints/ // cluster1/ // host001/ -// 1234567890.json (timestamp = checkpoint start time) -// 1234567950.json +// 1234567890.json (JSON format: full subtree snapshot) +// 1234567890.bin (binary format: full subtree snapshot) +// current.wal (WAL format: append-only per-entry log) // host002/ // ... package metricstore @@ -52,7 +53,6 @@ import ( cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" - "github.com/linkedin/goavro/v2" ) const ( @@ -86,47 +86,58 @@ var ( // Checkpointing starts a background worker that periodically saves metric data to disk. // -// The behavior depends on the configured file format: -// - JSON: Periodic checkpointing based on Keys.Checkpoints.Interval -// - Avro: Initial delay + periodic checkpointing at DefaultAvroCheckpointInterval -// -// The worker respects context cancellation and signals completion via the WaitGroup. +// Format behaviour: +// - "json": Periodic checkpointing based on Keys.Checkpoints.Interval +// - "wal": Periodic binary snapshots + WAL rotation at Keys.Checkpoints.Interval func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { lastCheckpointMu.Lock() lastCheckpoint = time.Now() lastCheckpointMu.Unlock() - if Keys.Checkpoints.FileFormat == "json" { - ms := GetMemoryStore() + ms := GetMemoryStore() - wg.Add(1) - go func() { - defer wg.Done() - d, err := time.ParseDuration(Keys.Checkpoints.Interval) - if err != nil { - cclog.Fatalf("[METRICSTORE]> invalid checkpoint interval '%s': %s", Keys.Checkpoints.Interval, err.Error()) - } - if d <= 0 { - cclog.Warnf("[METRICSTORE]> checkpoint interval is zero or negative (%s), checkpointing disabled", d) + wg.Add(1) + go func() { + defer wg.Done() + + d, err := time.ParseDuration(Keys.Checkpoints.Interval) + if err != nil { + cclog.Fatalf("[METRICSTORE]> invalid checkpoint interval '%s': %s", Keys.Checkpoints.Interval, err.Error()) + } + if d <= 0 { + cclog.Warnf("[METRICSTORE]> checkpoint interval is zero or negative (%s), checkpointing disabled", d) + return + } + + ticker := time.NewTicker(d) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): return - } + case <-ticker.C: + lastCheckpointMu.Lock() + from := lastCheckpoint + lastCheckpointMu.Unlock() - ticker := time.NewTicker(d) - defer ticker.Stop() + now := time.Now() + cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339)) - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - lastCheckpointMu.Lock() - from := lastCheckpoint - lastCheckpointMu.Unlock() - - cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339)) - now := time.Now() - n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, - from.Unix(), now.Unix()) + if Keys.Checkpoints.FileFormat == "wal" { + n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix()) + if err != nil { + cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error()) + } else { + cclog.Infof("[METRICSTORE]> done: %d binary snapshot files created", n) + lastCheckpointMu.Lock() + lastCheckpoint = now + lastCheckpointMu.Unlock() + // Rotate WAL files for successfully checkpointed hosts. + RotateWALFiles(hostDirs) + } + } else { + n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix()) if err != nil { cclog.Errorf("[METRICSTORE]> checkpointing failed: %s", err.Error()) } else { @@ -137,32 +148,8 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { } } } - }() - } else { - wg.Add(1) - go func() { - defer wg.Done() - - select { - case <-ctx.Done(): - return - case <-time.After(time.Duration(CheckpointBufferMinutes) * time.Minute): - GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, false) - } - - ticker := time.NewTicker(DefaultAvroCheckpointInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, false) - } - } - }() - } + } + }() } // MarshalJSON provides optimized JSON encoding for CheckpointMetrics. @@ -190,7 +177,7 @@ func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) { return buf, nil } -// ToCheckpoint writes metric data to checkpoint files in parallel. +// ToCheckpoint writes metric data to checkpoint files in parallel (JSON format). // // Metrics at root and cluster levels are skipped. One file per host is created. // Uses worker pool (Keys.NumWorkers) for parallel processing. Only locks one host @@ -378,7 +365,6 @@ func enqueueCheckpointHosts(dir string, work chan<- [2]string) error { return err } - gcCounter := 0 for _, clusterDir := range clustersDir { if !clusterDir.IsDir() { return errors.New("[METRICSTORE]> expected only directories at first level of checkpoints/ directory") @@ -394,16 +380,6 @@ func enqueueCheckpointHosts(dir string, work chan<- [2]string) error { return errors.New("[METRICSTORE]> expected only directories at second level of checkpoints/ directory") } - gcCounter++ - // if gcCounter%GCTriggerInterval == 0 { - // Forcing garbage collection runs here regulary during the loading of checkpoints - // will decrease the total heap size after loading everything back to memory is done. - // While loading data, the heap will grow fast, so the GC target size will double - // almost always. By forcing GCs here, we can keep it growing more slowly so that - // at the end, less memory is wasted. - // runtime.GC() - // } - work <- [2]string{clusterDir.Name(), hostDir.Name()} } } @@ -413,8 +389,8 @@ func enqueueCheckpointHosts(dir string, work chan<- [2]string) error { // FromCheckpoint loads checkpoint files from disk into memory in parallel. // -// Uses worker pool to load cluster/host combinations. Periodically triggers GC -// to prevent excessive heap growth. Returns number of files loaded and any errors. +// Uses worker pool to load cluster/host combinations. Returns number of files +// loaded and any errors. func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { var wg sync.WaitGroup work := make(chan [2]string, Keys.NumWorkers*4) @@ -452,13 +428,11 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { // FromCheckpointFiles is the main entry point for loading checkpoints at startup. // -// Automatically detects checkpoint format (JSON vs Avro) and falls back if needed. // Creates checkpoint directory if it doesn't exist. This function must be called // before any writes or reads, and can only be called once. func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { if _, err := os.Stat(dir); os.IsNotExist(err) { - // The directory does not exist, so create it using os.MkdirAll() - err := os.MkdirAll(dir, CheckpointDirPerms) // CheckpointDirPerms sets the permissions for the directory + err := os.MkdirAll(dir, CheckpointDirPerms) if err != nil { cclog.Fatalf("[METRICSTORE]> Error creating directory: %#v\n", err) } @@ -468,146 +442,6 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { return m.FromCheckpoint(dir, from) } -func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { - br := bufio.NewReader(f) - - fileName := f.Name()[strings.LastIndex(f.Name(), "/")+1:] - resolution, err := strconv.ParseInt(fileName[0:strings.Index(fileName, "_")], 10, 64) - if err != nil { - return fmt.Errorf("[METRICSTORE]> error while reading avro file (resolution parsing) : %s", err) - } - - fromTimestamp, err := strconv.ParseInt(fileName[strings.Index(fileName, "_")+1:len(fileName)-5], 10, 64) - - // Same logic according to lineprotocol - fromTimestamp -= (resolution / 2) - - if err != nil { - return fmt.Errorf("[METRICSTORE]> error converting timestamp from the avro file : %s", err) - } - - // fmt.Printf("File : %s with resolution : %d\n", fileName, resolution) - - var recordCounter int64 = 0 - - // Create a new OCF reader from the buffered reader - ocfReader, err := goavro.NewOCFReader(br) - if err != nil { - return fmt.Errorf("[METRICSTORE]> error creating OCF reader: %w", err) - } - - metricsData := make(map[string]schema.FloatArray) - - for ocfReader.Scan() { - datum, err := ocfReader.Read() - if err != nil { - return fmt.Errorf("[METRICSTORE]> error while reading avro file : %s", err) - } - - record, ok := datum.(map[string]any) - if !ok { - return fmt.Errorf("[METRICSTORE]> failed to assert datum as map[string]interface{}") - } - - for key, value := range record { - metricsData[key] = append(metricsData[key], schema.ConvertToFloat(value.(float64))) - } - - recordCounter += 1 - } - - to := (fromTimestamp + (recordCounter / (60 / resolution) * 60)) - if to < from { - return nil - } - - for key, floatArray := range metricsData { - metricName := ReplaceKey(key) - - if strings.Contains(metricName, SelectorDelimiter) { - subString := strings.Split(metricName, SelectorDelimiter) - - lvl := l - - for i := 0; i < len(subString)-1; i++ { - - sel := subString[i] - - if lvl.children == nil { - lvl.children = make(map[string]*Level) - } - - child, ok := lvl.children[sel] - if !ok { - child = &Level{ - metrics: make([]*buffer, len(m.Metrics)), - children: nil, - } - lvl.children[sel] = child - } - lvl = child - } - - leafMetricName := subString[len(subString)-1] - err = lvl.createBuffer(m, leafMetricName, floatArray, fromTimestamp, resolution) - if err != nil { - return fmt.Errorf("[METRICSTORE]> error while creating buffers from avroReader : %s", err) - } - } else { - err = l.createBuffer(m, metricName, floatArray, fromTimestamp, resolution) - if err != nil { - return fmt.Errorf("[METRICSTORE]> error while creating buffers from avroReader : %s", err) - } - } - - } - - return nil -} - -func (l *Level) createBuffer(m *MemoryStore, metricName string, floatArray schema.FloatArray, from int64, resolution int64) error { - n := len(floatArray) - b := &buffer{ - frequency: resolution, - start: from, - data: floatArray[0:n:n], - prev: nil, - next: nil, - archived: true, - } - - minfo, ok := m.Metrics[metricName] - if !ok { - return nil - } - - prev := l.metrics[minfo.offset] - if prev == nil { - l.metrics[minfo.offset] = b - } else { - if prev.start > b.start { - return fmt.Errorf("[METRICSTORE]> buffer start time %d is before previous buffer start %d", b.start, prev.start) - } - - b.prev = prev - prev.next = b - - missingCount := ((int(b.start) - int(prev.start)) - len(prev.data)*int(b.frequency)) - if missingCount > 0 { - missingCount /= int(b.frequency) - - for range missingCount { - prev.data = append(prev.data, schema.NaN) - } - - prev.data = prev.data[0:len(prev.data):len(prev.data)] - } - } - l.metrics[minfo.offset] = b - - return nil -} - func (l *Level) loadJSONFile(m *MemoryStore, f *os.File, from int64) error { br := bufio.NewReader(f) cf := &CheckpointFile{} @@ -679,37 +513,37 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error { return nil } +// fromCheckpoint loads all checkpoint files (JSON, binary snapshot, WAL) for a +// single host directory. Snapshot files are loaded first (sorted by timestamp), +// then current.wal is replayed on top. func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, error) { direntries, err := os.ReadDir(dir) if err != nil { if os.IsNotExist(err) { return 0, nil } - return 0, err } allFiles := make([]fs.DirEntry, 0) + var walEntry fs.DirEntry filesLoaded := 0 + for _, e := range direntries { if e.IsDir() { - child := &Level{ - metrics: make([]*buffer, len(m.Metrics)), - children: make(map[string]*Level), - } - - files, err := child.fromCheckpoint(m, path.Join(dir, e.Name()), from) - filesLoaded += files - if err != nil { - return filesLoaded, err - } - - l.children[e.Name()] = child - } else if strings.HasSuffix(e.Name(), ".json") || strings.HasSuffix(e.Name(), ".avro") { - allFiles = append(allFiles, e) - } else { + // Legacy: skip subdirectories (only used by old Avro format). + // These are ignored; their data is not loaded. + cclog.Debugf("[METRICSTORE]> skipping subdirectory %s in checkpoint dir %s", e.Name(), dir) continue } + + name := e.Name() + if strings.HasSuffix(name, ".json") || strings.HasSuffix(name, ".bin") { + allFiles = append(allFiles, e) + } else if name == "current.wal" { + walEntry = e + } + // Silently ignore other files (e.g., .tmp, .bin.tmp from interrupted writes). } files, err := findFiles(allFiles, from, true) @@ -719,54 +553,81 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, err loaders := map[string]func(*MemoryStore, *os.File, int64) error{ ".json": l.loadJSONFile, - ".avro": l.loadAvroFile, + ".bin": l.loadBinaryFile, } for _, filename := range files { ext := filepath.Ext(filename) loader := loaders[ext] if loader == nil { - cclog.Warnf("Unknown extension for file %s", filename) + cclog.Warnf("[METRICSTORE]> unknown extension for checkpoint file %s", filename) continue } - // Use a closure to ensure file is closed immediately after use err := func() error { f, err := os.Open(path.Join(dir, filename)) if err != nil { return err } defer f.Close() - return loader(m, f, from) }() if err != nil { return filesLoaded, err } + filesLoaded++ + } - filesLoaded += 1 + // Replay WAL after all snapshot files so it fills in data since the last snapshot. + if walEntry != nil { + err := func() error { + f, err := os.Open(path.Join(dir, walEntry.Name())) + if err != nil { + return err + } + defer f.Close() + return l.loadWALFile(m, f, from) + }() + if err != nil { + // WAL errors are non-fatal: the snapshot already loaded the bulk of data. + cclog.Warnf("[METRICSTORE]> WAL replay error for %s: %v (data since last snapshot may be missing)", dir, err) + } else { + filesLoaded++ + } } return filesLoaded, nil } -// This will probably get very slow over time! -// A solution could be some sort of an index file in which all other files -// and the timespan they contain is listed. -// NOTE: This now assumes that you have distinct timestamps for json and avro files -// Also, it assumes that the timestamps are not overlapping/self-modified. +// parseTimestampFromFilename extracts a Unix timestamp from a checkpoint filename. +// Supports ".json" (format: ".json") and ".bin" (format: ".bin"). +func parseTimestampFromFilename(name string) (int64, error) { + switch { + case strings.HasSuffix(name, ".json"): + return strconv.ParseInt(name[:len(name)-5], 10, 64) + case strings.HasSuffix(name, ".bin"): + return strconv.ParseInt(name[:len(name)-4], 10, 64) + default: + return 0, fmt.Errorf("unknown checkpoint extension for file %q", name) + } +} + +// findFiles returns filenames from direntries whose timestamps satisfy the filter. +// If findMoreRecentFiles is true, returns files with timestamps >= t (plus the +// last file before t if t falls between two files). func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) { nums := map[string]int64{} for _, e := range direntries { - if !strings.HasSuffix(e.Name(), ".json") && !strings.HasSuffix(e.Name(), ".avro") { + name := e.Name() + if !strings.HasSuffix(name, ".json") && !strings.HasSuffix(name, ".bin") { continue } - ts, err := strconv.ParseInt(e.Name()[strings.Index(e.Name(), "_")+1:len(e.Name())-5], 10, 64) + ts, err := parseTimestampFromFilename(name) if err != nil { return nil, err } - nums[e.Name()] = ts + nums[name] = ts } sort.Slice(direntries, func(i, j int) bool { @@ -783,16 +644,12 @@ func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]s for i, e := range direntries { ts1 := nums[e.Name()] - // Logic to look for files in forward or direction - // If logic: All files greater than or after - // the given timestamp will be selected - // Else If logic: All files less than or before - // the given timestamp will be selected if findMoreRecentFiles && t <= ts1 { filenames = append(filenames, e.Name()) } else if !findMoreRecentFiles && ts1 <= t && ts1 != 0 { filenames = append(filenames, e.Name()) } + if i == len(direntries)-1 { continue } diff --git a/pkg/metricstore/config.go b/pkg/metricstore/config.go index 1efee61a..53716967 100644 --- a/pkg/metricstore/config.go +++ b/pkg/metricstore/config.go @@ -14,7 +14,7 @@ // ├─ RetentionInMemory: How long to keep data in RAM // ├─ MemoryCap: Memory limit in bytes (triggers forceFree) // ├─ Checkpoints: Persistence configuration -// │ ├─ FileFormat: "avro" or "json" +// │ ├─ FileFormat: "json" or "wal" // │ ├─ Interval: How often to save (e.g., "1h") // │ └─ RootDir: Checkpoint storage path // ├─ Cleanup: Long-term storage configuration @@ -55,16 +55,13 @@ const ( DefaultMaxWorkers = 10 DefaultBufferCapacity = 512 DefaultGCTriggerInterval = 100 - DefaultAvroWorkers = 4 - DefaultCheckpointBufferMin = 3 - DefaultAvroCheckpointInterval = time.Minute DefaultMemoryUsageTrackerInterval = 1 * time.Hour ) // Checkpoints configures periodic persistence of in-memory metric data. // // Fields: -// - FileFormat: "avro" (default, binary, compact) or "json" (human-readable, slower) +// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe) // - Interval: Duration string (e.g., "1h", "30m") between checkpoint saves // - RootDir: Filesystem path for checkpoint files (created if missing) type Checkpoints struct { diff --git a/pkg/metricstore/configSchema.go b/pkg/metricstore/configSchema.go index 6a748be0..67f30976 100644 --- a/pkg/metricstore/configSchema.go +++ b/pkg/metricstore/configSchema.go @@ -18,7 +18,7 @@ const configSchema = `{ "type": "object", "properties": { "file-format": { - "description": "Specify the format for checkpoint files. There are 2 variants: 'avro' and 'json'. If nothing is specified, 'avro' is default.", + "description": "Specify the format for checkpoint files. Two variants: 'json' (human-readable, periodic) and 'wal' (binary snapshot + Write-Ahead Log, crash-safe). Default is 'json'.", "type": "string" }, "interval": { diff --git a/pkg/metricstore/lineprotocol.go b/pkg/metricstore/lineprotocol.go index bfbbef2d..1e04bba0 100644 --- a/pkg/metricstore/lineprotocol.go +++ b/pkg/metricstore/lineprotocol.go @@ -244,8 +244,8 @@ func DecodeLine(dec *lineprotocol.Decoder, time := t.Unix() - if Keys.Checkpoints.FileFormat != "json" { - LineProtocolMessages <- &AvroStruct{ + if Keys.Checkpoints.FileFormat == "wal" { + WALMessages <- &WALMessage{ MetricName: string(metricBuf), Cluster: cluster, Node: host, diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 789c6d07..3fe64d55 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -172,7 +172,7 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W Retention(wg, ctx) Checkpointing(wg, ctx) CleanUp(wg, ctx) - DataStaging(wg, ctx) + WALStaging(wg, ctx) MemoryUsageTracker(wg, ctx) // Note: Signal handling has been removed from this function. @@ -264,7 +264,7 @@ func (ms *MemoryStore) SetNodeProvider(provider NodeProvider) { // // The function will: // 1. Cancel the context to stop all background workers -// 2. Close NATS message channels if using Avro format +// 2. Close the WAL messages channel if using WAL format // 3. Write a final checkpoint to preserve in-memory data // 4. Log any errors encountered during shutdown // @@ -276,8 +276,8 @@ func Shutdown() { shutdownFunc() } - if Keys.Checkpoints.FileFormat != "json" { - close(LineProtocolMessages) + if Keys.Checkpoints.FileFormat == "wal" { + close(WALMessages) } cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir) @@ -286,10 +286,18 @@ func Shutdown() { ms := GetMemoryStore() - if Keys.Checkpoints.FileFormat == "json" { - files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) + lastCheckpointMu.Lock() + from := lastCheckpoint + lastCheckpointMu.Unlock() + + if Keys.Checkpoints.FileFormat == "wal" { + var hostDirs []string + files, hostDirs, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix()) + if err == nil { + RotateWALFiles(hostDirs) + } } else { - files, err = GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, true) + files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix()) } if err != nil { diff --git a/pkg/metricstore/walCheckpoint.go b/pkg/metricstore/walCheckpoint.go new file mode 100644 index 00000000..e8a71ce2 --- /dev/null +++ b/pkg/metricstore/walCheckpoint.go @@ -0,0 +1,787 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +// Package metricstore provides walCheckpoint.go: WAL-based checkpoint implementation. +// +// This replaces the Avro shadow tree with an append-only Write-Ahead Log (WAL) +// per host, eliminating the extra memory overhead of the AvroStore and providing +// truly continuous (per-write) crash safety. +// +// # Architecture +// +// Metric write (DecodeLine) +// │ +// ├─► WriteToLevel() → main MemoryStore (unchanged) +// │ +// └─► WALMessages channel +// │ +// ▼ +// WALStaging goroutine +// │ +// ▼ +// checkpoints/cluster/host/current.wal (append-only, binary) +// +// Periodic checkpoint (Checkpointing goroutine): +// 1. Write .bin snapshot (column-oriented, from main tree) +// 2. Signal WALStaging to truncate current.wal per host +// +// On restart (FromCheckpoint): +// 1. Load most recent .bin snapshot +// 2. Replay current.wal (overwrite-safe: buffer.write handles duplicate timestamps) +// +// # WAL Record Format +// +// [4B magic 0xCC1DA7A1][4B payload_len][payload][4B CRC32] +// +// payload: +// [8B timestamp int64] +// [2B metric_name_len uint16][N metric name bytes] +// [1B selector_count uint8] +// per selector: [1B selector_len uint8][M selector bytes] +// [4B value float32 bits] +// +// # Binary Snapshot Format +// +// [4B magic 0xCC5B0001][8B from int64][8B to int64] +// Level tree (recursive): +// [4B num_metrics uint32] +// per metric: +// [2B name_len uint16][N name bytes] +// [8B frequency int64][8B start int64] +// [4B num_values uint32][num_values × 4B float32] +// [4B num_children uint32] +// per child: [2B name_len uint16][N name bytes] + Level (recursive) +package metricstore + +import ( + "bufio" + "context" + "encoding/binary" + "fmt" + "hash/crc32" + "io" + "math" + "os" + "path" + "sync" + "sync/atomic" + + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" +) + +// Magic numbers for binary formats. +const ( + walFileMagic = uint32(0xCC1DA701) // WAL file header magic + walRecordMagic = uint32(0xCC1DA7A1) // WAL record magic + snapFileMagic = uint32(0xCC5B0001) // Binary snapshot magic +) + +// WALMessages is the channel for sending metric writes to the WAL staging goroutine. +// Buffered to allow burst writes without blocking the metric ingestion path. +var WALMessages = make(chan *WALMessage, 4096) + +// walRotateCh is used by the checkpoint goroutine to request WAL file rotation +// (close, delete, reopen) after a binary snapshot has been written. +var walRotateCh = make(chan walRotateReq, 256) + +// WALMessage represents a single metric write to be appended to the WAL. +// Cluster and Node are NOT stored in the WAL record (inferred from file path). +type WALMessage struct { + MetricName string + Cluster string + Node string + Selector []string + Value schema.Float + Timestamp int64 +} + +// walRotateReq requests WAL file rotation for a specific host directory. +// The done channel is closed by the WAL goroutine when rotation is complete. +type walRotateReq struct { + hostDir string + done chan struct{} +} + +// walFileState holds an open WAL file handle for one host directory. +type walFileState struct { + f *os.File +} + +// WALStaging starts a background goroutine that receives WALMessage items +// and appends binary WAL records to per-host current.wal files. +// Also handles WAL rotation requests from the checkpoint goroutine. +func WALStaging(wg *sync.WaitGroup, ctx context.Context) { + wg.Add(1) + go func() { + defer wg.Done() + + if Keys.Checkpoints.FileFormat == "json" { + return + } + + hostFiles := make(map[string]*walFileState) + + defer func() { + for _, ws := range hostFiles { + if ws.f != nil { + ws.f.Close() + } + } + }() + + getOrOpenWAL := func(hostDir string) *os.File { + ws, ok := hostFiles[hostDir] + if ok { + return ws.f + } + + if err := os.MkdirAll(hostDir, CheckpointDirPerms); err != nil { + cclog.Errorf("[METRICSTORE]> WAL: mkdir %s: %v", hostDir, err) + return nil + } + + walPath := path.Join(hostDir, "current.wal") + f, err := os.OpenFile(walPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, CheckpointFilePerms) + if err != nil { + cclog.Errorf("[METRICSTORE]> WAL: open %s: %v", walPath, err) + return nil + } + + // Write file header magic if file is new (empty). + info, err := f.Stat() + if err == nil && info.Size() == 0 { + var hdr [4]byte + binary.LittleEndian.PutUint32(hdr[:], walFileMagic) + if _, err := f.Write(hdr[:]); err != nil { + cclog.Errorf("[METRICSTORE]> WAL: write header %s: %v", walPath, err) + f.Close() + return nil + } + } + + hostFiles[hostDir] = &walFileState{f: f} + return f + } + + processMsg := func(msg *WALMessage) { + hostDir := path.Join(Keys.Checkpoints.RootDir, msg.Cluster, msg.Node) + f := getOrOpenWAL(hostDir) + if f == nil { + return + } + if err := writeWALRecord(f, msg); err != nil { + cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) + } + } + + processRotate := func(req walRotateReq) { + ws, ok := hostFiles[req.hostDir] + if ok && ws.f != nil { + ws.f.Close() + walPath := path.Join(req.hostDir, "current.wal") + if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { + cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err) + } + delete(hostFiles, req.hostDir) + } + close(req.done) + } + + drain := func() { + for { + select { + case msg, ok := <-WALMessages: + if !ok { + return + } + processMsg(msg) + case req := <-walRotateCh: + processRotate(req) + default: + return + } + } + } + + for { + select { + case <-ctx.Done(): + drain() + return + case msg, ok := <-WALMessages: + if !ok { + return + } + processMsg(msg) + case req := <-walRotateCh: + processRotate(req) + } + } + }() +} + +// RotateWALFiles sends rotation requests for the given host directories +// and blocks until all rotations complete. +func RotateWALFiles(hostDirs []string) { + dones := make([]chan struct{}, len(hostDirs)) + for i, dir := range hostDirs { + dones[i] = make(chan struct{}) + walRotateCh <- walRotateReq{hostDir: dir, done: dones[i]} + } + for _, done := range dones { + <-done + } +} + +// buildWALPayload encodes a WALMessage into a binary payload (without magic/length/CRC). +func buildWALPayload(msg *WALMessage) []byte { + size := 8 + 2 + len(msg.MetricName) + 1 + 4 + for _, s := range msg.Selector { + size += 1 + len(s) + } + + buf := make([]byte, 0, size) + + // Timestamp (8 bytes, little-endian int64) + var ts [8]byte + binary.LittleEndian.PutUint64(ts[:], uint64(msg.Timestamp)) + buf = append(buf, ts[:]...) + + // Metric name (2-byte length prefix + bytes) + var mLen [2]byte + binary.LittleEndian.PutUint16(mLen[:], uint16(len(msg.MetricName))) + buf = append(buf, mLen[:]...) + buf = append(buf, msg.MetricName...) + + // Selector count (1 byte) + buf = append(buf, byte(len(msg.Selector))) + + // Selectors (1-byte length prefix + bytes each) + for _, sel := range msg.Selector { + buf = append(buf, byte(len(sel))) + buf = append(buf, sel...) + } + + // Value (4 bytes, float32 bit representation) + var val [4]byte + binary.LittleEndian.PutUint32(val[:], math.Float32bits(float32(msg.Value))) + buf = append(buf, val[:]...) + + return buf +} + +// writeWALRecord appends a binary WAL record to the file. +// Format: [4B magic][4B payload_len][payload][4B CRC32] +func writeWALRecord(f *os.File, msg *WALMessage) error { + payload := buildWALPayload(msg) + crc := crc32.ChecksumIEEE(payload) + + record := make([]byte, 0, 4+4+len(payload)+4) + + var magic [4]byte + binary.LittleEndian.PutUint32(magic[:], walRecordMagic) + record = append(record, magic[:]...) + + var pLen [4]byte + binary.LittleEndian.PutUint32(pLen[:], uint32(len(payload))) + record = append(record, pLen[:]...) + + record = append(record, payload...) + + var crcBytes [4]byte + binary.LittleEndian.PutUint32(crcBytes[:], crc) + record = append(record, crcBytes[:]...) + + _, err := f.Write(record) + return err +} + +// readWALRecord reads one WAL record from the reader. +// Returns (nil, nil) on clean EOF. Returns error on data corruption. +// A CRC mismatch indicates a truncated trailing record (expected on crash). +func readWALRecord(r io.Reader) (*WALMessage, error) { + var magic uint32 + if err := binary.Read(r, binary.LittleEndian, &magic); err != nil { + if err == io.EOF { + return nil, nil // Clean EOF + } + return nil, fmt.Errorf("read record magic: %w", err) + } + + if magic != walRecordMagic { + return nil, fmt.Errorf("invalid record magic 0x%08X (expected 0x%08X)", magic, walRecordMagic) + } + + var payloadLen uint32 + if err := binary.Read(r, binary.LittleEndian, &payloadLen); err != nil { + return nil, fmt.Errorf("read payload length: %w", err) + } + + if payloadLen > 1<<20 { // 1 MB sanity limit + return nil, fmt.Errorf("record payload too large: %d bytes", payloadLen) + } + + payload := make([]byte, payloadLen) + if _, err := io.ReadFull(r, payload); err != nil { + return nil, fmt.Errorf("read payload: %w", err) + } + + var storedCRC uint32 + if err := binary.Read(r, binary.LittleEndian, &storedCRC); err != nil { + return nil, fmt.Errorf("read CRC: %w", err) + } + + if crc32.ChecksumIEEE(payload) != storedCRC { + return nil, fmt.Errorf("CRC mismatch (truncated write or corruption)") + } + + return parseWALPayload(payload) +} + +// parseWALPayload decodes a binary payload into a WALMessage. +func parseWALPayload(payload []byte) (*WALMessage, error) { + if len(payload) < 8+2+1+4 { + return nil, fmt.Errorf("payload too short: %d bytes", len(payload)) + } + + offset := 0 + + // Timestamp (8 bytes) + ts := int64(binary.LittleEndian.Uint64(payload[offset : offset+8])) + offset += 8 + + // Metric name (2-byte length + bytes) + if offset+2 > len(payload) { + return nil, fmt.Errorf("metric name length overflows payload") + } + mLen := int(binary.LittleEndian.Uint16(payload[offset : offset+2])) + offset += 2 + + if offset+mLen > len(payload) { + return nil, fmt.Errorf("metric name overflows payload") + } + metricName := string(payload[offset : offset+mLen]) + offset += mLen + + // Selector count (1 byte) + if offset >= len(payload) { + return nil, fmt.Errorf("selector count overflows payload") + } + selCount := int(payload[offset]) + offset++ + + selectors := make([]string, selCount) + for i := range selCount { + if offset >= len(payload) { + return nil, fmt.Errorf("selector[%d] length overflows payload", i) + } + sLen := int(payload[offset]) + offset++ + + if offset+sLen > len(payload) { + return nil, fmt.Errorf("selector[%d] data overflows payload", i) + } + selectors[i] = string(payload[offset : offset+sLen]) + offset += sLen + } + + // Value (4 bytes, float32 bits) + if offset+4 > len(payload) { + return nil, fmt.Errorf("value overflows payload") + } + bits := binary.LittleEndian.Uint32(payload[offset : offset+4]) + value := schema.Float(math.Float32frombits(bits)) + + return &WALMessage{ + MetricName: metricName, + Timestamp: ts, + Selector: selectors, + Value: value, + }, nil +} + +// loadWALFile reads a WAL file and replays all valid records into the Level tree. +// l is the host-level node. Corrupt or partial trailing records are silently skipped +// (expected on crash). Records older than 'from' are skipped. +func (l *Level) loadWALFile(m *MemoryStore, f *os.File, from int64) error { + br := bufio.NewReader(f) + + // Verify file header magic. + var fileMagic uint32 + if err := binary.Read(br, binary.LittleEndian, &fileMagic); err != nil { + if err == io.EOF { + return nil // Empty file, no data + } + return fmt.Errorf("[METRICSTORE]> WAL: read file header: %w", err) + } + + if fileMagic != walFileMagic { + return fmt.Errorf("[METRICSTORE]> WAL: invalid file magic 0x%08X (expected 0x%08X)", fileMagic, walFileMagic) + } + + // Cache level lookups to avoid repeated tree traversal. + lvlCache := make(map[string]*Level) + + for { + msg, err := readWALRecord(br) + if err != nil { + // Truncated trailing record is expected after a crash; stop replaying. + cclog.Debugf("[METRICSTORE]> WAL: stopping replay at corrupted/partial record: %v", err) + break + } + if msg == nil { + break // Clean EOF + } + + if msg.Timestamp < from { + continue // Older than retention window + } + + minfo, ok := m.Metrics[msg.MetricName] + if !ok { + continue // Unknown metric (config may have changed) + } + + // Cache key is the null-separated selector path. + cacheKey := joinSelector(msg.Selector) + lvl, ok := lvlCache[cacheKey] + if !ok { + lvl = l.findLevelOrCreate(msg.Selector, len(m.Metrics)) + lvlCache[cacheKey] = lvl + } + + // Write directly to the buffer, same as WriteToLevel but without the + // global level lookup (we already have the right level). + lvl.lock.Lock() + b := lvl.metrics[minfo.offset] + if b == nil { + b = newBuffer(msg.Timestamp, minfo.Frequency) + lvl.metrics[minfo.offset] = b + } + nb, writeErr := b.write(msg.Timestamp, msg.Value) + if writeErr == nil && b != nb { + lvl.metrics[minfo.offset] = nb + } + // Ignore write errors for timestamps before buffer start (can happen when + // replaying WAL entries that predate a loaded snapshot's start time). + lvl.lock.Unlock() + } + + return nil +} + +// joinSelector builds a cache key from a selector slice using null bytes as separators. +func joinSelector(sel []string) string { + if len(sel) == 0 { + return "" + } + result := sel[0] + for i := 1; i < len(sel); i++ { + result += "\x00" + sel[i] + } + return result +} + +// ToCheckpointWAL writes binary snapshot files for all hosts in parallel. +// Returns the number of files written, the list of host directories that were +// successfully checkpointed (for WAL rotation), and any errors. +func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string, error) { + // Collect all cluster/host pairs. + m.root.lock.RLock() + totalHosts := 0 + for _, l1 := range m.root.children { + l1.lock.RLock() + totalHosts += len(l1.children) + l1.lock.RUnlock() + } + m.root.lock.RUnlock() + + levels := make([]*Level, 0, totalHosts) + selectors := make([][]string, 0, totalHosts) + + m.root.lock.RLock() + for sel1, l1 := range m.root.children { + l1.lock.RLock() + for sel2, l2 := range l1.children { + levels = append(levels, l2) + selectors = append(selectors, []string{sel1, sel2}) + } + l1.lock.RUnlock() + } + m.root.lock.RUnlock() + + type workItem struct { + level *Level + hostDir string + selector []string + } + + n, errs := int32(0), int32(0) + var successDirs []string + var successMu sync.Mutex + + var wg sync.WaitGroup + wg.Add(Keys.NumWorkers) + work := make(chan workItem, Keys.NumWorkers*2) + + for range Keys.NumWorkers { + go func() { + defer wg.Done() + for wi := range work { + err := wi.level.toCheckpointBinary(wi.hostDir, from, to, m) + if err != nil { + if err == ErrNoNewArchiveData { + continue + } + cclog.Errorf("[METRICSTORE]> binary checkpoint error for %s: %v", wi.hostDir, err) + atomic.AddInt32(&errs, 1) + } else { + atomic.AddInt32(&n, 1) + successMu.Lock() + successDirs = append(successDirs, wi.hostDir) + successMu.Unlock() + } + } + }() + } + + for i := range levels { + hostDir := path.Join(dir, path.Join(selectors[i]...)) + work <- workItem{ + level: levels[i], + hostDir: hostDir, + selector: selectors[i], + } + } + close(work) + wg.Wait() + + if errs > 0 { + return int(n), successDirs, fmt.Errorf("[METRICSTORE]> %d errors during binary checkpoint (%d successes)", errs, n) + } + return int(n), successDirs, nil +} + +// toCheckpointBinary writes a binary snapshot file for a single host-level node. +// Uses atomic rename (write to .tmp then rename) to avoid partial reads on crash. +func (l *Level) toCheckpointBinary(dir string, from, to int64, m *MemoryStore) error { + cf, err := l.toCheckpointFile(from, to, m) + if err != nil { + return err + } + if cf == nil { + return ErrNoNewArchiveData + } + + if err := os.MkdirAll(dir, CheckpointDirPerms); err != nil { + return fmt.Errorf("mkdir %s: %w", dir, err) + } + + // Write to a temp file first, then rename (atomic on POSIX). + tmpPath := path.Join(dir, fmt.Sprintf("%d.bin.tmp", from)) + finalPath := path.Join(dir, fmt.Sprintf("%d.bin", from)) + + f, err := os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) + if err != nil { + return fmt.Errorf("open binary snapshot %s: %w", tmpPath, err) + } + + bw := bufio.NewWriter(f) + if err := writeBinarySnapshotFile(bw, cf); err != nil { + f.Close() + os.Remove(tmpPath) + return fmt.Errorf("write binary snapshot: %w", err) + } + if err := bw.Flush(); err != nil { + f.Close() + os.Remove(tmpPath) + return err + } + f.Close() + + return os.Rename(tmpPath, finalPath) +} + +// writeBinarySnapshotFile writes the binary snapshot file header and level tree. +func writeBinarySnapshotFile(w io.Writer, cf *CheckpointFile) error { + if err := binary.Write(w, binary.LittleEndian, snapFileMagic); err != nil { + return err + } + if err := binary.Write(w, binary.LittleEndian, cf.From); err != nil { + return err + } + if err := binary.Write(w, binary.LittleEndian, cf.To); err != nil { + return err + } + return writeBinaryLevel(w, cf) +} + +// writeBinaryLevel recursively writes a CheckpointFile level in binary format. +func writeBinaryLevel(w io.Writer, cf *CheckpointFile) error { + if err := binary.Write(w, binary.LittleEndian, uint32(len(cf.Metrics))); err != nil { + return err + } + + for name, metric := range cf.Metrics { + if err := writeString16(w, name); err != nil { + return err + } + if err := binary.Write(w, binary.LittleEndian, metric.Frequency); err != nil { + return err + } + if err := binary.Write(w, binary.LittleEndian, metric.Start); err != nil { + return err + } + if err := binary.Write(w, binary.LittleEndian, uint32(len(metric.Data))); err != nil { + return err + } + for _, v := range metric.Data { + if err := binary.Write(w, binary.LittleEndian, math.Float32bits(float32(v))); err != nil { + return err + } + } + } + + if err := binary.Write(w, binary.LittleEndian, uint32(len(cf.Children))); err != nil { + return err + } + + for name, child := range cf.Children { + if err := writeString16(w, name); err != nil { + return err + } + if err := writeBinaryLevel(w, child); err != nil { + return err + } + } + + return nil +} + +// writeString16 writes a 2-byte length-prefixed string to w. +func writeString16(w io.Writer, s string) error { + if err := binary.Write(w, binary.LittleEndian, uint16(len(s))); err != nil { + return err + } + _, err := io.WriteString(w, s) + return err +} + +// loadBinaryFile reads a binary snapshot file and loads data into the Level tree. +// The retention check (from) is applied to the file's 'to' timestamp. +func (l *Level) loadBinaryFile(m *MemoryStore, f *os.File, from int64) error { + br := bufio.NewReader(f) + + var magic uint32 + if err := binary.Read(br, binary.LittleEndian, &magic); err != nil { + return fmt.Errorf("[METRICSTORE]> binary snapshot: read magic: %w", err) + } + if magic != snapFileMagic { + return fmt.Errorf("[METRICSTORE]> binary snapshot: invalid magic 0x%08X (expected 0x%08X)", magic, snapFileMagic) + } + + var fileFrom, fileTo int64 + if err := binary.Read(br, binary.LittleEndian, &fileFrom); err != nil { + return fmt.Errorf("[METRICSTORE]> binary snapshot: read from: %w", err) + } + if err := binary.Read(br, binary.LittleEndian, &fileTo); err != nil { + return fmt.Errorf("[METRICSTORE]> binary snapshot: read to: %w", err) + } + + if fileTo != 0 && fileTo < from { + return nil // File is older than retention window, skip it + } + + cf, err := readBinaryLevel(br) + if err != nil { + return fmt.Errorf("[METRICSTORE]> binary snapshot: read level tree: %w", err) + } + cf.From = fileFrom + cf.To = fileTo + + return l.loadFile(cf, m) +} + +// readBinaryLevel recursively reads a level from the binary snapshot format. +func readBinaryLevel(r io.Reader) (*CheckpointFile, error) { + cf := &CheckpointFile{ + Metrics: make(map[string]*CheckpointMetrics), + Children: make(map[string]*CheckpointFile), + } + + var numMetrics uint32 + if err := binary.Read(r, binary.LittleEndian, &numMetrics); err != nil { + return nil, fmt.Errorf("read num_metrics: %w", err) + } + + for range numMetrics { + name, err := readString16(r) + if err != nil { + return nil, fmt.Errorf("read metric name: %w", err) + } + + var freq, start int64 + if err := binary.Read(r, binary.LittleEndian, &freq); err != nil { + return nil, fmt.Errorf("read frequency for %s: %w", name, err) + } + if err := binary.Read(r, binary.LittleEndian, &start); err != nil { + return nil, fmt.Errorf("read start for %s: %w", name, err) + } + + var numValues uint32 + if err := binary.Read(r, binary.LittleEndian, &numValues); err != nil { + return nil, fmt.Errorf("read num_values for %s: %w", name, err) + } + + data := make([]schema.Float, numValues) + for i := range numValues { + var bits uint32 + if err := binary.Read(r, binary.LittleEndian, &bits); err != nil { + return nil, fmt.Errorf("read value[%d] for %s: %w", i, name, err) + } + data[i] = schema.Float(math.Float32frombits(bits)) + } + + cf.Metrics[name] = &CheckpointMetrics{ + Frequency: freq, + Start: start, + Data: data, + } + } + + var numChildren uint32 + if err := binary.Read(r, binary.LittleEndian, &numChildren); err != nil { + return nil, fmt.Errorf("read num_children: %w", err) + } + + for range numChildren { + childName, err := readString16(r) + if err != nil { + return nil, fmt.Errorf("read child name: %w", err) + } + + child, err := readBinaryLevel(r) + if err != nil { + return nil, fmt.Errorf("read child %s: %w", childName, err) + } + cf.Children[childName] = child + } + + return cf, nil +} + +// readString16 reads a 2-byte length-prefixed string from r. +func readString16(r io.Reader) (string, error) { + var sLen uint16 + if err := binary.Read(r, binary.LittleEndian, &sLen); err != nil { + return "", err + } + buf := make([]byte, sLen) + if _, err := io.ReadFull(r, buf); err != nil { + return "", err + } + return string(buf), nil +}