Merge pull request #533 from ClusterCockpit/hotfix

metricstore archive fixes
This commit is contained in:
Jan Eitzinger
2026-03-23 07:20:15 +01:00
committed by GitHub
5 changed files with 77 additions and 43 deletions

View File

@@ -344,17 +344,17 @@ func (s *Server) init() error {
// Server timeout defaults (in seconds) // Server timeout defaults (in seconds)
const ( const (
defaultReadTimeout = 20 defaultReadHeaderTimeout = 20
defaultWriteTimeout = 20 defaultWriteTimeout = 20
) )
func (s *Server) Start(ctx context.Context) error { func (s *Server) Start(ctx context.Context) error {
// Use configurable timeouts with defaults // Use configurable timeouts with defaults
readTimeout := time.Duration(defaultReadTimeout) * time.Second readHeaderTimeout := time.Duration(defaultReadHeaderTimeout) * time.Second
writeTimeout := time.Duration(defaultWriteTimeout) * time.Second writeTimeout := time.Duration(defaultWriteTimeout) * time.Second
s.server = &http.Server{ s.server = &http.Server{
ReadTimeout: readTimeout, ReadHeaderTimeout: readHeaderTimeout,
WriteTimeout: writeTimeout, WriteTimeout: writeTimeout,
Handler: s.router, Handler: s.router,
Addr: config.Keys.Addr, Addr: config.Keys.Addr,

View File

@@ -198,25 +198,12 @@ func GetSubCluster(cluster, subcluster string) (*schema.SubCluster, error) {
func GetMetricConfigSubCluster(cluster, subcluster string) map[string]*schema.Metric { func GetMetricConfigSubCluster(cluster, subcluster string) map[string]*schema.Metric {
metrics := make(map[string]*schema.Metric) metrics := make(map[string]*schema.Metric)
for _, c := range Clusters { sc, err := GetSubCluster(cluster, subcluster)
if c.Name == cluster { if err != nil {
for _, m := range c.MetricConfig { return metrics
for _, s := range m.SubClusters {
if s.Name == subcluster {
metrics[m.Name] = &schema.Metric{
Name: m.Name,
Unit: s.Unit,
Peak: s.Peak,
Normal: s.Normal,
Caution: s.Caution,
Alert: s.Alert,
}
break
}
} }
_, ok := metrics[m.Name] for _, m := range sc.MetricConfig {
if !ok {
metrics[m.Name] = &schema.Metric{ metrics[m.Name] = &schema.Metric{
Name: m.Name, Name: m.Name,
Unit: m.Unit, Unit: m.Unit,
@@ -226,10 +213,6 @@ func GetMetricConfigSubCluster(cluster, subcluster string) map[string]*schema.Me
Alert: m.Alert, Alert: m.Alert,
} }
} }
}
break
}
}
return metrics return metrics
} }

View File

@@ -37,3 +37,27 @@ func TestClusterConfig(t *testing.T) {
// spew.Dump(archive.GlobalMetricList) // spew.Dump(archive.GlobalMetricList)
// t.Fail() // t.Fail()
} }
func TestGetMetricConfigSubClusterRespectsRemovedMetrics(t *testing.T) {
if err := archive.Init(json.RawMessage(`{"kind": "file","path": "testdata/archive"}`)); err != nil {
t.Fatal(err)
}
sc, err := archive.GetSubCluster("fritz", "spr2tb")
if err != nil {
t.Fatal(err)
}
metrics := archive.GetMetricConfigSubCluster("fritz", "spr2tb")
if len(metrics) != len(sc.MetricConfig) {
t.Fatalf("GetMetricConfigSubCluster() returned %d metrics, want %d", len(metrics), len(sc.MetricConfig))
}
if _, ok := metrics["flops_any"]; ok {
t.Fatalf("GetMetricConfigSubCluster() returned removed metric flops_any for subcluster spr2tb")
}
if _, ok := metrics["cpu_power"]; !ok {
t.Fatalf("GetMetricConfigSubCluster() missing active metric cpu_power for subcluster spr2tb")
}
}

View File

@@ -11,6 +11,7 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -22,6 +23,7 @@ import (
func CleanUp(wg *sync.WaitGroup, ctx context.Context) { func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
if Keys.Cleanup.Mode == "archive" { if Keys.Cleanup.Mode == "archive" {
cclog.Info("[METRICSTORE]> enable archive cleanup to parquet")
// Run as Archiver // Run as Archiver
cleanUpWorker(wg, ctx, cleanUpWorker(wg, ctx,
Keys.RetentionInMemory, Keys.RetentionInMemory,
@@ -43,7 +45,6 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
// cleanUpWorker takes simple values to configure what it does // cleanUpWorker takes simple values to configure what it does
func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) { func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) {
wg.Go(func() { wg.Go(func() {
d, err := time.ParseDuration(interval) d, err := time.ParseDuration(interval)
if err != nil { if err != nil {
cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err) cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err)
@@ -181,6 +182,7 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
} }
totalFiles := 0 totalFiles := 0
var clusterErrors []string
for _, clusterEntry := range clusterEntries { for _, clusterEntry := range clusterEntries {
if !clusterEntry.IsDir() { if !clusterEntry.IsDir() {
@@ -190,7 +192,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
cluster := clusterEntry.Name() cluster := clusterEntry.Name()
hostEntries, err := os.ReadDir(filepath.Join(checkpointsDir, cluster)) hostEntries, err := os.ReadDir(filepath.Join(checkpointsDir, cluster))
if err != nil { if err != nil {
return totalFiles, err cclog.Errorf("[METRICSTORE]> error reading host entries for cluster %s: %s", cluster, err.Error())
clusterErrors = append(clusterErrors, cluster)
continue
} }
// Workers load checkpoint files from disk; main thread writes to parquet. // Workers load checkpoint files from disk; main thread writes to parquet.
@@ -255,7 +259,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
// Drain results channel to unblock workers // Drain results channel to unblock workers
for range results { for range results {
} }
return totalFiles, fmt.Errorf("creating parquet writer for cluster %s: %w", cluster, err) cclog.Errorf("[METRICSTORE]> error creating parquet writer for cluster %s: %s", cluster, err.Error())
clusterErrors = append(clusterErrors, cluster)
continue
} }
type deleteItem struct { type deleteItem struct {
@@ -275,6 +281,12 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
break break
} }
} }
// Flush once per host to keep row group count within parquet limits.
if writeErr == nil {
if err := writer.FlushRowGroup(); err != nil {
writeErr = err
}
}
} }
// Always track files for deletion (even if write failed, we still drain) // Always track files for deletion (even if write failed, we still drain)
toDelete = append(toDelete, deleteItem{dir: r.dir, files: r.files}) toDelete = append(toDelete, deleteItem{dir: r.dir, files: r.files})
@@ -285,7 +297,10 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
} }
if errs > 0 { if errs > 0 {
return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster) cclog.Errorf("[METRICSTORE]> %d errors reading checkpoints for cluster %s", errs, cluster)
clusterErrors = append(clusterErrors, cluster)
os.Remove(parquetFile)
continue
} }
if writer.count == 0 { if writer.count == 0 {
@@ -296,7 +311,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
if writeErr != nil { if writeErr != nil {
os.Remove(parquetFile) os.Remove(parquetFile)
return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, writeErr) cclog.Errorf("[METRICSTORE]> error writing parquet archive for cluster %s: %s", cluster, writeErr.Error())
clusterErrors = append(clusterErrors, cluster)
continue
} }
// Delete archived checkpoint files // Delete archived checkpoint files
@@ -316,5 +333,10 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
} }
cclog.Infof("[METRICSTORE]> archiving checkpoints completed in %s (%d files)", time.Since(startTime).Round(time.Millisecond), totalFiles) cclog.Infof("[METRICSTORE]> archiving checkpoints completed in %s (%d files)", time.Since(startTime).Round(time.Millisecond), totalFiles)
if len(clusterErrors) > 0 {
return totalFiles, fmt.Errorf("archiving failed for clusters: %s", strings.Join(clusterErrors, ", "))
}
return totalFiles, nil return totalFiles, nil
} }

View File

@@ -99,7 +99,7 @@ func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) {
// WriteCheckpointFile streams a CheckpointFile tree directly to Parquet rows, // WriteCheckpointFile streams a CheckpointFile tree directly to Parquet rows,
// writing metrics in sorted order without materializing all rows in memory. // writing metrics in sorted order without materializing all rows in memory.
// Produces one row group per call (typically one host's data). // Call FlushRowGroup() after writing all checkpoint files for a host.
func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string) error { func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string) error {
w.writeLevel(cf, cluster, hostname, scope, scopeID) w.writeLevel(cf, cluster, hostname, scope, scopeID)
@@ -112,10 +112,15 @@ func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster,
w.batch = w.batch[:0] w.batch = w.batch[:0]
} }
return nil
}
// FlushRowGroup flushes the current row group to the Parquet file.
// Should be called once per host after all checkpoint files for that host are written.
func (w *parquetArchiveWriter) FlushRowGroup() error {
if err := w.writer.Flush(); err != nil { if err := w.writer.Flush(); err != nil {
return fmt.Errorf("flushing parquet row group: %w", err) return fmt.Errorf("flushing parquet row group: %w", err)
} }
return nil return nil
} }