Files
cc-backend/pkg/metricstore/archive.go
Jan Eitzinger 09501df3c2 fix: reduce memory usage in parquet checkpoint archiver
Stream CheckpointFile trees directly to parquet rows instead of
materializing all rows in a giant intermediate slice. This eliminates
~1.9GB per host of redundant allocations (repeated string headers)
and removes the expensive sort on millions of 104-byte structs.

Key changes:
- Replace flattenCheckpointFile + sortParquetRows + WriteHostRows with
  streaming WriteCheckpointFile that walks the tree with sorted keys
- Reduce results channel buffer from len(hostEntries) to 2 for
  back-pressure (at most NumWorkers+2 results in flight)
- Workers send CheckpointFile trees instead of []ParquetMetricRow
- Write rows in small 1024-element batches via reusable buffer

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Entire-Checkpoint: f31dc1847539
2026-03-18 17:32:16 +01:00

321 lines
8.4 KiB
Go

// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package metricstore
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
)
// Worker for either Archiving or Deleting files
func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
if Keys.Cleanup.Mode == "archive" {
// Run as Archiver
cleanUpWorker(wg, ctx,
Keys.RetentionInMemory,
"archiving",
Keys.Cleanup.RootDir,
false,
)
} else {
// Run as Deleter
cleanUpWorker(wg, ctx,
Keys.RetentionInMemory,
"deleting",
"",
true,
)
}
}
// cleanUpWorker takes simple values to configure what it does
func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) {
wg.Go(func() {
d, err := time.ParseDuration(interval)
if err != nil {
cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err)
}
if d <= 0 {
return
}
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
t := time.Now().Add(-d)
cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339))
n, err := CleanupCheckpoints(Keys.Checkpoints.RootDir, cleanupDir, t.Unix(), delete)
if err != nil {
cclog.Errorf("[METRICSTORE]> %s failed: %s", mode, err.Error())
} else {
if delete {
cclog.Infof("[METRICSTORE]> done: %d checkpoints deleted", n)
} else {
cclog.Infof("[METRICSTORE]> done: %d checkpoint files archived to parquet", n)
}
}
}
}
})
}
var ErrNoNewArchiveData error = errors.New("all data already archived")
// CleanupCheckpoints deletes or archives all checkpoint files older than `from`.
// When archiving, consolidates all hosts per cluster into a single Parquet file.
func CleanupCheckpoints(checkpointsDir, cleanupDir string, from int64, deleteInstead bool) (int, error) {
if deleteInstead {
return deleteCheckpoints(checkpointsDir, from)
}
return archiveCheckpoints(checkpointsDir, cleanupDir, from)
}
// deleteCheckpoints removes checkpoint files older than `from` across all clusters/hosts.
func deleteCheckpoints(checkpointsDir string, from int64) (int, error) {
entries1, err := os.ReadDir(checkpointsDir)
if err != nil {
return 0, err
}
type workItem struct {
dir string
cluster, host string
}
var wg sync.WaitGroup
n, errs := int32(0), int32(0)
work := make(chan workItem, Keys.NumWorkers)
wg.Add(Keys.NumWorkers)
for worker := 0; worker < Keys.NumWorkers; worker++ {
go func() {
defer wg.Done()
for item := range work {
entries, err := os.ReadDir(item.dir)
if err != nil {
cclog.Errorf("error reading %s/%s: %s", item.cluster, item.host, err.Error())
atomic.AddInt32(&errs, 1)
continue
}
files, err := findFiles(entries, from, false)
if err != nil {
cclog.Errorf("error finding files in %s/%s: %s", item.cluster, item.host, err.Error())
atomic.AddInt32(&errs, 1)
continue
}
for _, checkpoint := range files {
if err := os.Remove(filepath.Join(item.dir, checkpoint)); err != nil {
cclog.Errorf("error deleting %s/%s/%s: %s", item.cluster, item.host, checkpoint, err.Error())
atomic.AddInt32(&errs, 1)
} else {
atomic.AddInt32(&n, 1)
}
}
}
}()
}
for _, de1 := range entries1 {
entries2, e := os.ReadDir(filepath.Join(checkpointsDir, de1.Name()))
if e != nil {
err = e
continue
}
for _, de2 := range entries2 {
work <- workItem{
dir: filepath.Join(checkpointsDir, de1.Name(), de2.Name()),
cluster: de1.Name(),
host: de2.Name(),
}
}
}
close(work)
wg.Wait()
if err != nil {
return int(n), err
}
if errs > 0 {
return int(n), fmt.Errorf("%d errors happened while deleting (%d successes)", errs, n)
}
return int(n), nil
}
// archiveCheckpoints archives checkpoint files to Parquet format.
// Produces one Parquet file per cluster: <cleanupDir>/<cluster>/<timestamp>.parquet
// Workers load checkpoint files from disk and send CheckpointFile trees on a
// back-pressured channel. The main thread streams each tree directly to Parquet
// rows without materializing all rows in memory.
func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, error) {
cclog.Info("[METRICSTORE]> start archiving checkpoints to parquet")
startTime := time.Now()
clusterEntries, err := os.ReadDir(checkpointsDir)
if err != nil {
return 0, err
}
totalFiles := 0
for _, clusterEntry := range clusterEntries {
if !clusterEntry.IsDir() {
continue
}
cluster := clusterEntry.Name()
hostEntries, err := os.ReadDir(filepath.Join(checkpointsDir, cluster))
if err != nil {
return totalFiles, err
}
// Workers load checkpoint files from disk; main thread writes to parquet.
type hostResult struct {
checkpoints []*CheckpointFile
hostname string
files []string // checkpoint filenames to delete after successful write
dir string // checkpoint directory for this host
}
// Small buffer provides back-pressure: at most NumWorkers+2 results in flight.
results := make(chan hostResult, 2)
work := make(chan struct {
dir, host string
}, Keys.NumWorkers)
var wg sync.WaitGroup
errs := int32(0)
wg.Add(Keys.NumWorkers)
for w := 0; w < Keys.NumWorkers; w++ {
go func() {
defer wg.Done()
for item := range work {
checkpoints, files, err := loadCheckpointFiles(item.dir, from)
if err != nil {
cclog.Errorf("[METRICSTORE]> error reading checkpoints for %s/%s: %s", cluster, item.host, err.Error())
atomic.AddInt32(&errs, 1)
continue
}
if len(checkpoints) > 0 {
results <- hostResult{
checkpoints: checkpoints,
hostname: item.host,
files: files,
dir: item.dir,
}
}
}
}()
}
go func() {
for _, hostEntry := range hostEntries {
if !hostEntry.IsDir() {
continue
}
dir := filepath.Join(checkpointsDir, cluster, hostEntry.Name())
work <- struct {
dir, host string
}{dir: dir, host: hostEntry.Name()}
}
close(work)
wg.Wait()
close(results)
}()
// Open streaming writer and write each host's checkpoint files as a row group
parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from))
writer, err := newParquetArchiveWriter(parquetFile)
if err != nil {
// Drain results channel to unblock workers
for range results {
}
return totalFiles, fmt.Errorf("creating parquet writer for cluster %s: %w", cluster, err)
}
type deleteItem struct {
dir string
files []string
}
var toDelete []deleteItem
writeErr := error(nil)
for r := range results {
if writeErr == nil {
// Stream each checkpoint file directly to parquet rows.
// Each checkpoint is processed and discarded before the next.
for _, cf := range r.checkpoints {
if err := writer.WriteCheckpointFile(cf, cluster, r.hostname, "node", ""); err != nil {
writeErr = err
break
}
}
}
// Always track files for deletion (even if write failed, we still drain)
toDelete = append(toDelete, deleteItem{dir: r.dir, files: r.files})
}
if err := writer.Close(); err != nil && writeErr == nil {
writeErr = err
}
if errs > 0 {
return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster)
}
if writer.count == 0 {
// No data written — remove empty file
os.Remove(parquetFile)
continue
}
if writeErr != nil {
os.Remove(parquetFile)
return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, writeErr)
}
// Delete archived checkpoint files
for _, item := range toDelete {
for _, file := range item.files {
filename := filepath.Join(item.dir, file)
if err := os.Remove(filename); err != nil {
cclog.Warnf("[METRICSTORE]> could not remove archived checkpoint %s: %v", filename, err)
} else {
totalFiles++
}
}
}
cclog.Infof("[METRICSTORE]> archived %d rows from %d files for cluster %s to %s",
writer.count, totalFiles, cluster, parquetFile)
}
cclog.Infof("[METRICSTORE]> archiving checkpoints completed in %s (%d files)", time.Since(startTime).Round(time.Millisecond), totalFiles)
return totalFiles, nil
}