From 01ec70baa83b973e6e2636dc6845c1d2268d285c Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 20 Mar 2026 11:39:34 +0100 Subject: [PATCH 1/4] Iterate over subCluster MetricConfig directly so that removed metrics are not included Entire-Checkpoint: efb6f0a96069 --- pkg/archive/clusterConfig.go | 41 +++++++++---------------------- pkg/archive/clusterConfig_test.go | 24 ++++++++++++++++++ 2 files changed, 36 insertions(+), 29 deletions(-) diff --git a/pkg/archive/clusterConfig.go b/pkg/archive/clusterConfig.go index 3e27e415..48fc5e48 100644 --- a/pkg/archive/clusterConfig.go +++ b/pkg/archive/clusterConfig.go @@ -198,36 +198,19 @@ func GetSubCluster(cluster, subcluster string) (*schema.SubCluster, error) { func GetMetricConfigSubCluster(cluster, subcluster string) map[string]*schema.Metric { metrics := make(map[string]*schema.Metric) - for _, c := range Clusters { - if c.Name == cluster { - for _, m := range c.MetricConfig { - for _, s := range m.SubClusters { - if s.Name == subcluster { - metrics[m.Name] = &schema.Metric{ - Name: m.Name, - Unit: s.Unit, - Peak: s.Peak, - Normal: s.Normal, - Caution: s.Caution, - Alert: s.Alert, - } - break - } - } + sc, err := GetSubCluster(cluster, subcluster) + if err != nil { + return metrics + } - _, ok := metrics[m.Name] - if !ok { - metrics[m.Name] = &schema.Metric{ - Name: m.Name, - Unit: m.Unit, - Peak: m.Peak, - Normal: m.Normal, - Caution: m.Caution, - Alert: m.Alert, - } - } - } - break + for _, m := range sc.MetricConfig { + metrics[m.Name] = &schema.Metric{ + Name: m.Name, + Unit: m.Unit, + Peak: m.Peak, + Normal: m.Normal, + Caution: m.Caution, + Alert: m.Alert, } } diff --git a/pkg/archive/clusterConfig_test.go b/pkg/archive/clusterConfig_test.go index 510c1747..7c3d8bea 100644 --- a/pkg/archive/clusterConfig_test.go +++ b/pkg/archive/clusterConfig_test.go @@ -37,3 +37,27 @@ func TestClusterConfig(t *testing.T) { // spew.Dump(archive.GlobalMetricList) // t.Fail() } + +func TestGetMetricConfigSubClusterRespectsRemovedMetrics(t *testing.T) { + if err := archive.Init(json.RawMessage(`{"kind": "file","path": "testdata/archive"}`)); err != nil { + t.Fatal(err) + } + + sc, err := archive.GetSubCluster("fritz", "spr2tb") + if err != nil { + t.Fatal(err) + } + + metrics := archive.GetMetricConfigSubCluster("fritz", "spr2tb") + if len(metrics) != len(sc.MetricConfig) { + t.Fatalf("GetMetricConfigSubCluster() returned %d metrics, want %d", len(metrics), len(sc.MetricConfig)) + } + + if _, ok := metrics["flops_any"]; ok { + t.Fatalf("GetMetricConfigSubCluster() returned removed metric flops_any for subcluster spr2tb") + } + + if _, ok := metrics["cpu_power"]; !ok { + t.Fatalf("GetMetricConfigSubCluster() missing active metric cpu_power for subcluster spr2tb") + } +} From 586c902044161182617584be7bd0f1a31824853b Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 23 Mar 2026 06:32:24 +0100 Subject: [PATCH 2/4] Restructure metricstore cleanup archiving to stay withinh 32k parquet-go limit Entire-Checkpoint: 1660b8cf2571 --- pkg/metricstore/archive.go | 12 +++++++++--- pkg/metricstore/parquetArchive.go | 9 +++++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index 7eb1b72f..2246b0f3 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -22,6 +22,7 @@ import ( func CleanUp(wg *sync.WaitGroup, ctx context.Context) { if Keys.Cleanup.Mode == "archive" { + cclog.Info("[METRICSTORE]> enable archive cleanup to parquet") // Run as Archiver cleanUpWorker(wg, ctx, Keys.RetentionInMemory, @@ -43,7 +44,6 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) { // cleanUpWorker takes simple values to configure what it does func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) { wg.Go(func() { - d, err := time.ParseDuration(interval) if err != nil { cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err) @@ -99,8 +99,8 @@ func deleteCheckpoints(checkpointsDir string, from int64) (int, error) { } type workItem struct { - dir string - cluster, host string + dir string + cluster, host string } var wg sync.WaitGroup @@ -275,6 +275,12 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err break } } + // Flush once per host to keep row group count within parquet limits. + if writeErr == nil { + if err := writer.FlushRowGroup(); err != nil { + writeErr = err + } + } } // Always track files for deletion (even if write failed, we still drain) toDelete = append(toDelete, deleteItem{dir: r.dir, files: r.files}) diff --git a/pkg/metricstore/parquetArchive.go b/pkg/metricstore/parquetArchive.go index 260bd8dd..81c8be5a 100644 --- a/pkg/metricstore/parquetArchive.go +++ b/pkg/metricstore/parquetArchive.go @@ -99,7 +99,7 @@ func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) { // WriteCheckpointFile streams a CheckpointFile tree directly to Parquet rows, // writing metrics in sorted order without materializing all rows in memory. -// Produces one row group per call (typically one host's data). +// Call FlushRowGroup() after writing all checkpoint files for a host. func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string) error { w.writeLevel(cf, cluster, hostname, scope, scopeID) @@ -112,10 +112,15 @@ func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster, w.batch = w.batch[:0] } + return nil +} + +// FlushRowGroup flushes the current row group to the Parquet file. +// Should be called once per host after all checkpoint files for that host are written. +func (w *parquetArchiveWriter) FlushRowGroup() error { if err := w.writer.Flush(); err != nil { return fmt.Errorf("flushing parquet row group: %w", err) } - return nil } From e41d1251ba52de53e18fdf7c8920d0f7b28209cf Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 23 Mar 2026 06:37:24 +0100 Subject: [PATCH 3/4] fix: Continue on error Entire-Checkpoint: 6000eb5a5bb8 --- pkg/metricstore/archive.go | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index 2246b0f3..3b92a3e0 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -11,6 +11,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "sync" "sync/atomic" "time" @@ -181,6 +182,7 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err } totalFiles := 0 + var clusterErrors []string for _, clusterEntry := range clusterEntries { if !clusterEntry.IsDir() { @@ -190,7 +192,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err cluster := clusterEntry.Name() hostEntries, err := os.ReadDir(filepath.Join(checkpointsDir, cluster)) if err != nil { - return totalFiles, err + cclog.Errorf("[METRICSTORE]> error reading host entries for cluster %s: %s", cluster, err.Error()) + clusterErrors = append(clusterErrors, cluster) + continue } // Workers load checkpoint files from disk; main thread writes to parquet. @@ -255,7 +259,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err // Drain results channel to unblock workers for range results { } - return totalFiles, fmt.Errorf("creating parquet writer for cluster %s: %w", cluster, err) + cclog.Errorf("[METRICSTORE]> error creating parquet writer for cluster %s: %s", cluster, err.Error()) + clusterErrors = append(clusterErrors, cluster) + continue } type deleteItem struct { @@ -291,7 +297,10 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err } if errs > 0 { - return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster) + cclog.Errorf("[METRICSTORE]> %d errors reading checkpoints for cluster %s", errs, cluster) + clusterErrors = append(clusterErrors, cluster) + os.Remove(parquetFile) + continue } if writer.count == 0 { @@ -302,7 +311,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err if writeErr != nil { os.Remove(parquetFile) - return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, writeErr) + cclog.Errorf("[METRICSTORE]> error writing parquet archive for cluster %s: %s", cluster, writeErr.Error()) + clusterErrors = append(clusterErrors, cluster) + continue } // Delete archived checkpoint files @@ -322,5 +333,10 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err } cclog.Infof("[METRICSTORE]> archiving checkpoints completed in %s (%d files)", time.Since(startTime).Round(time.Millisecond), totalFiles) + + if len(clusterErrors) > 0 { + return totalFiles, fmt.Errorf("archiving failed for clusters: %s", strings.Join(clusterErrors, ", ")) + } + return totalFiles, nil } From 192c94a78d97ade717ada97d1a2bb781e41c49a5 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 23 Mar 2026 07:12:13 +0100 Subject: [PATCH 4/4] fix: Prevent interruption of body lineprotocol parsing on locks Entire-Checkpoint: ccda3b2ff4cb --- cmd/cc-backend/server.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index bdbb8a7e..4e6120e9 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -344,18 +344,18 @@ func (s *Server) init() error { // Server timeout defaults (in seconds) const ( - defaultReadTimeout = 20 - defaultWriteTimeout = 20 + defaultReadHeaderTimeout = 20 + defaultWriteTimeout = 20 ) func (s *Server) Start(ctx context.Context) error { // Use configurable timeouts with defaults - readTimeout := time.Duration(defaultReadTimeout) * time.Second + readHeaderTimeout := time.Duration(defaultReadHeaderTimeout) * time.Second writeTimeout := time.Duration(defaultWriteTimeout) * time.Second s.server = &http.Server{ - ReadTimeout: readTimeout, - WriteTimeout: writeTimeout, + ReadHeaderTimeout: readHeaderTimeout, + WriteTimeout: writeTimeout, Handler: s.router, Addr: config.Keys.Addr, }