From 25c8fca56136eb04cbfe14d5f18a67082512bc64 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 13 Jan 2026 14:42:24 +0100 Subject: [PATCH] Revert retention config in metricstore --- internal/metricstore/config.go | 68 ------------------ internal/metricstore/configSchema.go | 38 +--------- internal/metricstore/metricstore.go | 103 ++++----------------------- 3 files changed, 13 insertions(+), 196 deletions(-) diff --git a/internal/metricstore/config.go b/internal/metricstore/config.go index 06ae774d..c789f11c 100644 --- a/internal/metricstore/config.go +++ b/internal/metricstore/config.go @@ -33,18 +33,7 @@ type MetricStoreConfig struct { DumpToFile string `json:"dump-to-file"` EnableGops bool `json:"gops"` } `json:"debug"` - // Global default retention duration - RetentionInMemory string `json:"retention-in-memory"` - // Per-cluster retention overrides - Clusters []struct { - Cluster string `json:"cluster"` RetentionInMemory string `json:"retention-in-memory"` - // Per-subcluster retention overrides within this cluster - SubClusters []struct { - SubCluster string `json:"subcluster"` - RetentionInMemory string `json:"retention-in-memory"` - } `json:"subclusters,omitempty"` - } `json:"clusters,omitempty"` Archive struct { Interval string `json:"interval"` RootDir string `json:"directory"` @@ -61,14 +50,6 @@ type MetricStoreConfig struct { var Keys MetricStoreConfig -type retentionConfig struct { - global time.Duration - clusterMap map[string]time.Duration - subClusterMap map[string]map[string]time.Duration -} - -var retentionLookup *retentionConfig - // AggregationStrategy for aggregation over multiple values at different cpus/sockets/..., not time! type AggregationStrategy int @@ -132,52 +113,3 @@ func AddMetric(name string, metric MetricConfig) error { return nil } - -func GetRetentionDuration(cluster, subCluster string) (time.Duration, error) { - if retentionLookup == nil { - return 0, fmt.Errorf("[METRICSTORE]> retention configuration not initialized") - } - - if subCluster != "" { - if subMap, ok := retentionLookup.subClusterMap[cluster]; ok { - if retention, ok := subMap[subCluster]; ok { - return retention, nil - } - } - } - - if retention, ok := retentionLookup.clusterMap[cluster]; ok { - return retention, nil - } - - return retentionLookup.global, nil -} - -// GetShortestRetentionDuration returns the shortest configured retention duration -// across all levels (global, cluster, and subcluster configurations). -// Returns 0 if retentionLookup is not initialized or global retention is not set. -func GetShortestRetentionDuration() time.Duration { - if retentionLookup == nil || retentionLookup.global <= 0 { - return 0 - } - - shortest := retentionLookup.global - - // Check all cluster-level retention durations - for _, clusterRetention := range retentionLookup.clusterMap { - if clusterRetention > 0 && clusterRetention < shortest { - shortest = clusterRetention - } - } - - // Check all subcluster-level retention durations - for _, subClusterMap := range retentionLookup.subClusterMap { - for _, scRetention := range subClusterMap { - if scRetention > 0 && scRetention < shortest { - shortest = scRetention - } - } - } - - return shortest -} diff --git a/internal/metricstore/configSchema.go b/internal/metricstore/configSchema.go index 868bacc5..f1a20a73 100644 --- a/internal/metricstore/configSchema.go +++ b/internal/metricstore/configSchema.go @@ -46,45 +46,9 @@ const configSchema = `{ } }, "retention-in-memory": { - "description": "Global default: Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.", + "description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.", "type": "string" }, - "clusters": { - "description": "Optional per-cluster retention overrides", - "type": "array", - "items": { - "type": "object", - "required": ["cluster"], - "properties": { - "cluster": { - "description": "Cluster name", - "type": "string" - }, - "retention-in-memory": { - "description": "Cluster-specific retention duration (overrides global default)", - "type": "string" - }, - "subclusters": { - "description": "Optional per-subcluster retention overrides", - "type": "array", - "items": { - "type": "object", - "required": ["subcluster"], - "properties": { - "subcluster": { - "description": "Subcluster name", - "type": "string" - }, - "retention-in-memory": { - "description": "Subcluster-specific retention duration (overrides cluster and global default)", - "type": "string" - } - } - } - } - } - } - }, "nats": { "description": "Configuration for accepting published data through NATS.", "type": "array", diff --git a/internal/metricstore/metricstore.go b/internal/metricstore/metricstore.go index 5a5c3bce..ac8948ae 100644 --- a/internal/metricstore/metricstore.go +++ b/internal/metricstore/metricstore.go @@ -98,49 +98,6 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { } } - globalRetention, err := time.ParseDuration(Keys.RetentionInMemory) - if err != nil { - cclog.Fatal(err) - } - - retentionLookup = &retentionConfig{ - global: globalRetention, - clusterMap: make(map[string]time.Duration), - subClusterMap: make(map[string]map[string]time.Duration), - } - - for _, clusterCfg := range Keys.Clusters { - if clusterCfg.RetentionInMemory != "" { - clusterRetention, err := time.ParseDuration(clusterCfg.RetentionInMemory) - if err != nil { - cclog.Warnf("[METRICSTORE]> Invalid retention duration for cluster '%s': %s\n", clusterCfg.Cluster, err.Error()) - continue - } - retentionLookup.clusterMap[clusterCfg.Cluster] = clusterRetention - cclog.Debugf("[METRICSTORE]> Cluster '%s' retention: %s\n", clusterCfg.Cluster, clusterRetention) - } - - if len(clusterCfg.SubClusters) > 0 { - if retentionLookup.subClusterMap[clusterCfg.Cluster] == nil { - retentionLookup.subClusterMap[clusterCfg.Cluster] = make(map[string]time.Duration) - } - - for _, scCfg := range clusterCfg.SubClusters { - if scCfg.RetentionInMemory != "" { - scRetention, err := time.ParseDuration(scCfg.RetentionInMemory) - if err != nil { - cclog.Warnf("[METRICSTORE]> Invalid retention duration for subcluster '%s/%s': %s\n", - clusterCfg.Cluster, scCfg.SubCluster, err.Error()) - continue - } - retentionLookup.subClusterMap[clusterCfg.Cluster][scCfg.SubCluster] = scRetention - cclog.Debugf("[METRICSTORE]> SubCluster '%s/%s' retention: %s\n", - clusterCfg.Cluster, scCfg.SubCluster, scRetention) - } - } - } - } - // Pass the config.MetricStoreKeys InitMetrics(Metrics) @@ -256,17 +213,18 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { go func() { defer wg.Done() - shortestRetention := GetShortestRetentionDuration() - if shortestRetention <= 0 { + d, err := time.ParseDuration(Keys.RetentionInMemory) + if err != nil { + cclog.Fatal(err) + } + if d <= 0 { return } - tickInterval := shortestRetention / 2 + tickInterval := d / 2 if tickInterval <= 0 { return } - cclog.Debugf("[METRICSTORE]> Retention ticker interval set to %s (half of shortest retention: %s)\n", - tickInterval, shortestRetention) ticker := time.NewTicker(tickInterval) defer ticker.Stop() @@ -275,50 +233,13 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - totalFreed := 0 - - clusters := ms.ListChildren(nil) - for _, cluster := range clusters { - retention, err := GetRetentionDuration(cluster, "") - if err != nil { - cclog.Warnf("[METRICSTORE]> Could not get retention for cluster '%s': %s\n", cluster, err.Error()) - continue - } - if retention <= 0 { - continue - } - - t := time.Now().Add(-retention) - cclog.Debugf("[METRICSTORE]> Freeing buffers for cluster '%s' (older than %s, retention: %s)...\n", - cluster, t.Format(time.RFC3339), retention) - - subClusters := ms.ListChildren([]string{cluster}) - for _, subCluster := range subClusters { - scRetention, err := GetRetentionDuration(cluster, subCluster) + t := time.Now().Add(-d) + cclog.Infof("[METRICSTORE]> start freeing buffers (older than %s)...\n", t.Format(time.RFC3339)) + freed, err := ms.Free(nil, t.Unix()) if err != nil { - cclog.Warnf("[METRICSTORE]> Could not get retention for subcluster '%s/%s': %s\n", - cluster, subCluster, err.Error()) - continue - } - if scRetention <= 0 { - continue - } - - scTime := time.Now().Add(-scRetention) - freed, err := ms.Free([]string{cluster, subCluster}, scTime.Unix()) - if err != nil { - cclog.Errorf("[METRICSTORE]> freeing buffers for '%s/%s' failed: %s\n", - cluster, subCluster, err.Error()) - } else if freed > 0 { - cclog.Debugf("[METRICSTORE]> freed %d buffers for '%s/%s' (retention: %s)\n", - freed, cluster, subCluster, scRetention) - totalFreed += freed - } - } - } - - if totalFreed > 0 { - cclog.Infof("[METRICSTORE]> Total buffers freed: %d\n", totalFreed) + cclog.Errorf("[METRICSTORE]> freeing up buffers failed: %s\n", err.Error()) + } else { + cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freed) } } }