Replace avro checkpoints with native binary format

This commit is contained in:
2026-02-23 14:21:17 +01:00
parent b6c574c7ec
commit 9fc1836c30
11 changed files with 400 additions and 1051 deletions

View File

@@ -6,15 +6,16 @@
// This file implements checkpoint persistence for the in-memory metric store.
//
// Checkpoints enable graceful restarts by periodically saving in-memory metric
// data to disk in either JSON or Avro format. The checkpoint system:
// data to disk. The checkpoint system supports two write formats:
// - binary (default): fast loading via raw float32 arrays
// - json: human-readable, slightly slower to load
//
// Key Features:
// - Periodic background checkpointing via the Checkpointing() worker
// - Two formats: JSON (human-readable) and Avro (compact, efficient)
// - Parallel checkpoint creation and loading using worker pools
// - Hierarchical file organization: checkpoint_dir/cluster/host/timestamp.{json|avro}
// - Hierarchical file organization: checkpoint_dir/cluster/host/timestamp.{bin|json}
// - Only saves unarchived data (archived data is already persisted elsewhere)
// - Automatic format detection and fallback during loading
// - Automatic format detection during loading (supports bin, json, and legacy avro)
// - GC optimization during loading to prevent excessive heap growth
//
// Checkpoint Workflow:
@@ -27,8 +28,8 @@
// checkpoints/
// cluster1/
// host001/
// 1234567890.json (timestamp = checkpoint start time)
// 1234567950.json
// 1234567890.bin (timestamp = checkpoint start time)
// 1234567950.bin
// host002/
// ...
package metricstore
@@ -52,7 +53,6 @@ import (
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
"github.com/linkedin/goavro/v2"
)
const (
@@ -85,78 +85,53 @@ var (
// Checkpointing starts a background worker that periodically saves metric data to disk.
//
// The behavior depends on the configured file format:
// - JSON: Periodic checkpointing based on Keys.Checkpoints.Interval
// - Avro: Initial delay + periodic checkpointing at DefaultAvroCheckpointInterval
//
// The worker respects context cancellation and signals completion via the WaitGroup.
// Checkpoints are written at the configured interval (Keys.Checkpoints.Interval) in
// either binary or JSON format. The worker respects context cancellation and signals
// completion via the WaitGroup.
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
lastCheckpointMu.Lock()
lastCheckpoint = time.Now()
lastCheckpointMu.Unlock()
if Keys.Checkpoints.FileFormat == "json" {
ms := GetMemoryStore()
ms := GetMemoryStore()
wg.Go(func() {
d, err := time.ParseDuration(Keys.Checkpoints.Interval)
if err != nil {
cclog.Fatalf("[METRICSTORE]> invalid checkpoint interval '%s': %s", Keys.Checkpoints.Interval, err.Error())
}
if d <= 0 {
cclog.Warnf("[METRICSTORE]> checkpoint interval is zero or negative (%s), checkpointing disabled", d)
return
}
wg.Go(func() {
d, err := time.ParseDuration(Keys.Checkpoints.Interval)
if err != nil {
cclog.Fatalf("[METRICSTORE]> invalid checkpoint interval '%s': %s", Keys.Checkpoints.Interval, err.Error())
}
if d <= 0 {
cclog.Warnf("[METRICSTORE]> checkpoint interval is zero or negative (%s), checkpointing disabled", d)
return
}
ticker := time.NewTicker(d)
defer ticker.Stop()
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
lastCheckpointMu.Lock()
from := lastCheckpoint
lastCheckpointMu.Unlock()
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339))
now := time.Now()
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir,
from.Unix(), now.Unix())
if err != nil {
cclog.Errorf("[METRICSTORE]> checkpointing failed: %s", err.Error())
} else {
cclog.Infof("[METRICSTORE]> done: %d checkpoint files created", n)
lastCheckpointMu.Lock()
lastCheckpoint = now
lastCheckpointMu.Unlock()
}
}
}
})
} else {
wg.Go(func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(CheckpointBufferMinutes) * time.Minute):
GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, false)
}
case <-ticker.C:
lastCheckpointMu.Lock()
from := lastCheckpoint
lastCheckpointMu.Unlock()
ticker := time.NewTicker(DefaultAvroCheckpointInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, false)
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339))
now := time.Now()
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir,
from.Unix(), now.Unix())
if err != nil {
cclog.Errorf("[METRICSTORE]> checkpointing failed: %s", err.Error())
} else {
cclog.Infof("[METRICSTORE]> done: %d checkpoint files created", n)
lastCheckpointMu.Lock()
lastCheckpoint = now
lastCheckpointMu.Unlock()
}
}
})
}
}
})
}
// UnmarshalJSON provides optimized JSON decoding for CheckpointMetrics.
@@ -478,7 +453,8 @@ func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil
return retval, nil
}
// toCheckpoint writes a Level's data to a JSON checkpoint file.
// toCheckpoint writes a Level's data to a checkpoint file.
// The format (binary or JSON) is determined by Keys.Checkpoints.FileFormat.
// Creates directory if needed. Returns ErrNoNewArchiveData if nothing to save.
func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
cf, err := l.toCheckpointFile(from, to, m)
@@ -490,12 +466,23 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
return ErrNoNewArchiveData
}
filepath := path.Join(dir, fmt.Sprintf("%d.json", from))
f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
if Keys.Checkpoints.FileFormat == "json" {
return writeJSONCheckpoint(dir, from, cf)
}
// Default: binary format
filePath := path.Join(dir, fmt.Sprintf("%d.bin", from))
return writeBinaryCheckpoint(filePath, cf)
}
// writeJSONCheckpoint writes a CheckpointFile in JSON format.
func writeJSONCheckpoint(dir string, from int64, cf *CheckpointFile) error {
filePath := path.Join(dir, fmt.Sprintf("%d.json", from))
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
if err != nil && os.IsNotExist(err) {
err = os.MkdirAll(dir, CheckpointDirPerms)
if err == nil {
f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
f, err = os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
}
}
if err != nil {
@@ -598,7 +585,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
// FromCheckpointFiles is the main entry point for loading checkpoints at startup.
//
// Automatically detects checkpoint format (JSON vs Avro) and falls back if needed.
// Automatically detects checkpoint format (binary, JSON, or legacy Avro).
// Creates checkpoint directory if it doesn't exist. This function must be called
// before any writes or reads, and can only be called once.
func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
@@ -614,144 +601,19 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
return m.FromCheckpoint(dir, from)
}
func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error {
br := bufio.NewReader(f)
fileName := f.Name()[strings.LastIndex(f.Name(), "/")+1:]
resolution, err := strconv.ParseInt(fileName[0:strings.Index(fileName, "_")], 10, 64)
// loadBinaryCheckpointFile loads a binary checkpoint file into the Level tree.
// Binary files are decoded in the same way as JSON files (via loadFile).
func (l *Level) loadBinaryCheckpointFile(m *MemoryStore, filePath string, from int64) error {
cf, err := loadBinaryFile(filePath)
if err != nil {
return fmt.Errorf("[METRICSTORE]> error while reading avro file (resolution parsing) : %s", err)
return err
}
fromTimestamp, err := strconv.ParseInt(fileName[strings.Index(fileName, "_")+1:len(fileName)-5], 10, 64)
// Same logic according to lineprotocol
fromTimestamp -= (resolution / 2)
if err != nil {
return fmt.Errorf("[METRICSTORE]> error converting timestamp from the avro file : %s", err)
}
// fmt.Printf("File : %s with resolution : %d\n", fileName, resolution)
var recordCounter int64 = 0
// Create a new OCF reader from the buffered reader
ocfReader, err := goavro.NewOCFReader(br)
if err != nil {
return fmt.Errorf("[METRICSTORE]> error creating OCF reader: %w", err)
}
metricsData := make(map[string]schema.FloatArray)
for ocfReader.Scan() {
datum, err := ocfReader.Read()
if err != nil {
return fmt.Errorf("[METRICSTORE]> error while reading avro file : %s", err)
}
record, ok := datum.(map[string]any)
if !ok {
return fmt.Errorf("[METRICSTORE]> failed to assert datum as map[string]interface{}")
}
for key, value := range record {
metricsData[key] = append(metricsData[key], schema.ConvertToFloat(value.(float64)))
}
recordCounter += 1
}
to := (fromTimestamp + (recordCounter / (60 / resolution) * 60))
if to < from {
if cf.To != 0 && cf.To < from {
return nil
}
for key, floatArray := range metricsData {
metricName := ReplaceKey(key)
if strings.Contains(metricName, SelectorDelimiter) {
subString := strings.Split(metricName, SelectorDelimiter)
lvl := l
for i := 0; i < len(subString)-1; i++ {
sel := subString[i]
if lvl.children == nil {
lvl.children = make(map[string]*Level)
}
child, ok := lvl.children[sel]
if !ok {
child = &Level{
metrics: make([]*buffer, len(m.Metrics)),
children: nil,
}
lvl.children[sel] = child
}
lvl = child
}
leafMetricName := subString[len(subString)-1]
err = lvl.createBuffer(m, leafMetricName, floatArray, fromTimestamp, resolution)
if err != nil {
return fmt.Errorf("[METRICSTORE]> error while creating buffers from avroReader : %s", err)
}
} else {
err = l.createBuffer(m, metricName, floatArray, fromTimestamp, resolution)
if err != nil {
return fmt.Errorf("[METRICSTORE]> error while creating buffers from avroReader : %s", err)
}
}
}
return nil
}
func (l *Level) createBuffer(m *MemoryStore, metricName string, floatArray schema.FloatArray, from int64, resolution int64) error {
n := len(floatArray)
b := &buffer{
frequency: resolution,
start: from,
data: floatArray[0:n:n],
prev: nil,
next: nil,
archived: true,
}
minfo, ok := m.Metrics[metricName]
if !ok {
return nil
}
prev := l.metrics[minfo.offset]
if prev == nil {
l.metrics[minfo.offset] = b
} else {
if prev.start > b.start {
return fmt.Errorf("[METRICSTORE]> buffer start time %d is before previous buffer start %d", b.start, prev.start)
}
b.prev = prev
prev.next = b
missingCount := ((int(b.start) - int(prev.start)) - len(prev.data)*int(b.frequency))
if missingCount > 0 {
missingCount /= int(b.frequency)
for range missingCount {
prev.data = append(prev.data, schema.NaN)
}
prev.data = prev.data[0:len(prev.data):len(prev.data)]
}
}
l.metrics[minfo.offset] = b
return nil
return l.loadFile(cf, m)
}
func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
@@ -821,11 +683,9 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, err
filesLoaded := 0
for _, e := range direntries {
if e.IsDir() {
// Host-level directories should only contain files, not subdirectories.
// Skip unexpected subdirectories with a warning.
cclog.Warnf("[METRICSTORE]> unexpected subdirectory '%s' in checkpoint dir '%s', skipping", e.Name(), dir)
continue
} else if strings.HasSuffix(e.Name(), ".json") || strings.HasSuffix(e.Name(), ".avro") {
} else if strings.HasSuffix(e.Name(), ".bin") || strings.HasSuffix(e.Name(), ".json") {
allFiles = append(allFiles, e)
}
}
@@ -840,18 +700,59 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, err
}
// Separate files by type
var jsonFiles, avroFiles []string
var binFiles, jsonFiles []string
for _, filename := range files {
switch filepath.Ext(filename) {
case ".bin":
binFiles = append(binFiles, filename)
case ".json":
jsonFiles = append(jsonFiles, filename)
case ".avro":
avroFiles = append(avroFiles, filename)
default:
cclog.Warnf("[METRICSTORE]> unknown extension for file %s", filename)
}
}
// Parallel binary decoding: decode files concurrently, then apply sequentially
if len(binFiles) > 0 {
type decodedFile struct {
cf *CheckpointFile
err error
}
decoded := make([]decodedFile, len(binFiles))
var decodeWg sync.WaitGroup
for i, filename := range binFiles {
decodeWg.Add(1)
go func(idx int, fname string) {
defer decodeWg.Done()
cf, err := loadBinaryFile(path.Join(dir, fname))
if err != nil {
decoded[idx] = decodedFile{err: fmt.Errorf("decoding %s: %w", fname, err)}
return
}
decoded[idx] = decodedFile{cf: cf}
}(i, filename)
}
decodeWg.Wait()
for i, d := range decoded {
if d.err != nil {
return filesLoaded, d.err
}
if d.cf.To != 0 && d.cf.To < from {
continue
}
if err := l.loadFile(d.cf, m); err != nil {
return filesLoaded, fmt.Errorf("loading %s: %w", binFiles[i], err)
}
filesLoaded++
}
}
// Parallel JSON decoding: decode files concurrently, then apply sequentially
if len(jsonFiles) > 0 {
type decodedFile struct {
@@ -885,7 +786,6 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, err
decodeWg.Wait()
// Apply decoded files sequentially to maintain buffer ordering
for i, d := range decoded {
if d.err != nil {
return filesLoaded, d.err
@@ -902,23 +802,6 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, err
}
}
// Load Avro files sequentially (they modify Level state directly)
for _, filename := range avroFiles {
err := func() error {
f, err := os.Open(path.Join(dir, filename))
if err != nil {
return err
}
defer f.Close()
return l.loadAvroFile(m, f, from)
}()
if err != nil {
return filesLoaded, err
}
filesLoaded++
}
return filesLoaded, nil
}
@@ -942,11 +825,18 @@ func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]s
for _, e := range direntries {
name := e.Name()
if !strings.HasSuffix(name, ".json") && !strings.HasSuffix(name, ".avro") {
ext := filepath.Ext(name)
if ext != ".bin" && ext != ".json" {
continue
}
ts, err := strconv.ParseInt(name[strings.Index(name, "_")+1:len(name)-5], 10, 64)
// Parse timestamp from filename: for .bin and .json it's just "TIMESTAMP.ext"
baseName := name[:len(name)-len(ext)]
// Handle legacy format with prefix (e.g., "60_TIMESTAMP.avro")
if idx := strings.Index(baseName, "_"); idx >= 0 {
baseName = baseName[idx+1:]
}
ts, err := strconv.ParseInt(baseName, 10, 64)
if err != nil {
return nil, err
}