From 04a2e460ae8b1e884795bdf5200e1efb671ac958 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 13 Jan 2026 09:52:00 +0100 Subject: [PATCH] Refactor metricstore. Initial stub for cluster/ subcluster specific retention times --- internal/metricstore/avroCheckpoint.go | 6 +- internal/metricstore/config.go | 70 ++++++++++- internal/metricstore/configSchema.go | 38 +++++- .../{memorystore.go => metricstore.go} | 114 ++++++++++++++---- ...emorystore_test.go => metricstore_test.go} | 2 +- 5 files changed, 203 insertions(+), 27 deletions(-) rename internal/metricstore/{memorystore.go => metricstore.go} (76%) rename internal/metricstore/{memorystore_test.go => metricstore_test.go} (99%) diff --git a/internal/metricstore/avroCheckpoint.go b/internal/metricstore/avroCheckpoint.go index 275a64bd..aa14ce5a 100644 --- a/internal/metricstore/avroCheckpoint.go +++ b/internal/metricstore/avroCheckpoint.go @@ -24,8 +24,10 @@ import ( "github.com/linkedin/goavro/v2" ) -var NumAvroWorkers int = DefaultAvroWorkers -var startUp bool = true +var ( + NumAvroWorkers int = DefaultAvroWorkers + startUp bool = true +) func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { levels := make([]*AvroLevel, 0) diff --git a/internal/metricstore/config.go b/internal/metricstore/config.go index 97f16c46..06ae774d 100644 --- a/internal/metricstore/config.go +++ b/internal/metricstore/config.go @@ -33,8 +33,19 @@ type MetricStoreConfig struct { DumpToFile string `json:"dump-to-file"` EnableGops bool `json:"gops"` } `json:"debug"` + // Global default retention duration RetentionInMemory string `json:"retention-in-memory"` - Archive struct { + // Per-cluster retention overrides + Clusters []struct { + Cluster string `json:"cluster"` + RetentionInMemory string `json:"retention-in-memory"` + // Per-subcluster retention overrides within this cluster + SubClusters []struct { + SubCluster string `json:"subcluster"` + RetentionInMemory string `json:"retention-in-memory"` + } `json:"subclusters,omitempty"` + } `json:"clusters,omitempty"` + Archive struct { Interval string `json:"interval"` RootDir string `json:"directory"` DeleteInstead bool `json:"delete-instead"` @@ -50,6 +61,14 @@ type MetricStoreConfig struct { var Keys MetricStoreConfig +type retentionConfig struct { + global time.Duration + clusterMap map[string]time.Duration + subClusterMap map[string]map[string]time.Duration +} + +var retentionLookup *retentionConfig + // AggregationStrategy for aggregation over multiple values at different cpus/sockets/..., not time! type AggregationStrategy int @@ -113,3 +132,52 @@ func AddMetric(name string, metric MetricConfig) error { return nil } + +func GetRetentionDuration(cluster, subCluster string) (time.Duration, error) { + if retentionLookup == nil { + return 0, fmt.Errorf("[METRICSTORE]> retention configuration not initialized") + } + + if subCluster != "" { + if subMap, ok := retentionLookup.subClusterMap[cluster]; ok { + if retention, ok := subMap[subCluster]; ok { + return retention, nil + } + } + } + + if retention, ok := retentionLookup.clusterMap[cluster]; ok { + return retention, nil + } + + return retentionLookup.global, nil +} + +// GetShortestRetentionDuration returns the shortest configured retention duration +// across all levels (global, cluster, and subcluster configurations). +// Returns 0 if retentionLookup is not initialized or global retention is not set. +func GetShortestRetentionDuration() time.Duration { + if retentionLookup == nil || retentionLookup.global <= 0 { + return 0 + } + + shortest := retentionLookup.global + + // Check all cluster-level retention durations + for _, clusterRetention := range retentionLookup.clusterMap { + if clusterRetention > 0 && clusterRetention < shortest { + shortest = clusterRetention + } + } + + // Check all subcluster-level retention durations + for _, subClusterMap := range retentionLookup.subClusterMap { + for _, scRetention := range subClusterMap { + if scRetention > 0 && scRetention < shortest { + shortest = scRetention + } + } + } + + return shortest +} diff --git a/internal/metricstore/configSchema.go b/internal/metricstore/configSchema.go index f1a20a73..868bacc5 100644 --- a/internal/metricstore/configSchema.go +++ b/internal/metricstore/configSchema.go @@ -46,9 +46,45 @@ const configSchema = `{ } }, "retention-in-memory": { - "description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.", + "description": "Global default: Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.", "type": "string" }, + "clusters": { + "description": "Optional per-cluster retention overrides", + "type": "array", + "items": { + "type": "object", + "required": ["cluster"], + "properties": { + "cluster": { + "description": "Cluster name", + "type": "string" + }, + "retention-in-memory": { + "description": "Cluster-specific retention duration (overrides global default)", + "type": "string" + }, + "subclusters": { + "description": "Optional per-subcluster retention overrides", + "type": "array", + "items": { + "type": "object", + "required": ["subcluster"], + "properties": { + "subcluster": { + "description": "Subcluster name", + "type": "string" + }, + "retention-in-memory": { + "description": "Subcluster-specific retention duration (overrides cluster and global default)", + "type": "string" + } + } + } + } + } + } + }, "nats": { "description": "Configuration for accepting published data through NATS.", "type": "array", diff --git a/internal/metricstore/memorystore.go b/internal/metricstore/metricstore.go similarity index 76% rename from internal/metricstore/memorystore.go rename to internal/metricstore/metricstore.go index 14a02fcd..5a5c3bce 100644 --- a/internal/metricstore/memorystore.go +++ b/internal/metricstore/metricstore.go @@ -98,6 +98,49 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { } } + globalRetention, err := time.ParseDuration(Keys.RetentionInMemory) + if err != nil { + cclog.Fatal(err) + } + + retentionLookup = &retentionConfig{ + global: globalRetention, + clusterMap: make(map[string]time.Duration), + subClusterMap: make(map[string]map[string]time.Duration), + } + + for _, clusterCfg := range Keys.Clusters { + if clusterCfg.RetentionInMemory != "" { + clusterRetention, err := time.ParseDuration(clusterCfg.RetentionInMemory) + if err != nil { + cclog.Warnf("[METRICSTORE]> Invalid retention duration for cluster '%s': %s\n", clusterCfg.Cluster, err.Error()) + continue + } + retentionLookup.clusterMap[clusterCfg.Cluster] = clusterRetention + cclog.Debugf("[METRICSTORE]> Cluster '%s' retention: %s\n", clusterCfg.Cluster, clusterRetention) + } + + if len(clusterCfg.SubClusters) > 0 { + if retentionLookup.subClusterMap[clusterCfg.Cluster] == nil { + retentionLookup.subClusterMap[clusterCfg.Cluster] = make(map[string]time.Duration) + } + + for _, scCfg := range clusterCfg.SubClusters { + if scCfg.RetentionInMemory != "" { + scRetention, err := time.ParseDuration(scCfg.RetentionInMemory) + if err != nil { + cclog.Warnf("[METRICSTORE]> Invalid retention duration for subcluster '%s/%s': %s\n", + clusterCfg.Cluster, scCfg.SubCluster, err.Error()) + continue + } + retentionLookup.subClusterMap[clusterCfg.Cluster][scCfg.SubCluster] = scRetention + cclog.Debugf("[METRICSTORE]> SubCluster '%s/%s' retention: %s\n", + clusterCfg.Cluster, scCfg.SubCluster, scRetention) + } + } + } + } + // Pass the config.MetricStoreKeys InitMetrics(Metrics) @@ -208,32 +251,22 @@ func Shutdown() { cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files) } -func getName(m *MemoryStore, i int) string { - for key, val := range m.Metrics { - if val.offset == i { - return key - } - } - return "" -} - func Retention(wg *sync.WaitGroup, ctx context.Context) { ms := GetMemoryStore() go func() { defer wg.Done() - d, err := time.ParseDuration(Keys.RetentionInMemory) - if err != nil { - cclog.Fatal(err) - } - if d <= 0 { + shortestRetention := GetShortestRetentionDuration() + if shortestRetention <= 0 { return } - tickInterval := d / 2 + tickInterval := shortestRetention / 2 if tickInterval <= 0 { return } + cclog.Debugf("[METRICSTORE]> Retention ticker interval set to %s (half of shortest retention: %s)\n", + tickInterval, shortestRetention) ticker := time.NewTicker(tickInterval) defer ticker.Stop() @@ -242,13 +275,50 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - t := time.Now().Add(-d) - cclog.Infof("[METRICSTORE]> start freeing buffers (older than %s)...\n", t.Format(time.RFC3339)) - freed, err := ms.Free(nil, t.Unix()) - if err != nil { - cclog.Errorf("[METRICSTORE]> freeing up buffers failed: %s\n", err.Error()) - } else { - cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freed) + totalFreed := 0 + + clusters := ms.ListChildren(nil) + for _, cluster := range clusters { + retention, err := GetRetentionDuration(cluster, "") + if err != nil { + cclog.Warnf("[METRICSTORE]> Could not get retention for cluster '%s': %s\n", cluster, err.Error()) + continue + } + if retention <= 0 { + continue + } + + t := time.Now().Add(-retention) + cclog.Debugf("[METRICSTORE]> Freeing buffers for cluster '%s' (older than %s, retention: %s)...\n", + cluster, t.Format(time.RFC3339), retention) + + subClusters := ms.ListChildren([]string{cluster}) + for _, subCluster := range subClusters { + scRetention, err := GetRetentionDuration(cluster, subCluster) + if err != nil { + cclog.Warnf("[METRICSTORE]> Could not get retention for subcluster '%s/%s': %s\n", + cluster, subCluster, err.Error()) + continue + } + if scRetention <= 0 { + continue + } + + scTime := time.Now().Add(-scRetention) + freed, err := ms.Free([]string{cluster, subCluster}, scTime.Unix()) + if err != nil { + cclog.Errorf("[METRICSTORE]> freeing buffers for '%s/%s' failed: %s\n", + cluster, subCluster, err.Error()) + } else if freed > 0 { + cclog.Debugf("[METRICSTORE]> freed %d buffers for '%s/%s' (retention: %s)\n", + freed, cluster, subCluster, scRetention) + totalFreed += freed + } + } + } + + if totalFreed > 0 { + cclog.Infof("[METRICSTORE]> Total buffers freed: %d\n", totalFreed) } } } diff --git a/internal/metricstore/memorystore_test.go b/internal/metricstore/metricstore_test.go similarity index 99% rename from internal/metricstore/memorystore_test.go rename to internal/metricstore/metricstore_test.go index 29379d21..fd7c963f 100644 --- a/internal/metricstore/memorystore_test.go +++ b/internal/metricstore/metricstore_test.go @@ -131,7 +131,7 @@ func TestBufferWrite(t *testing.T) { func TestBufferRead(t *testing.T) { b := newBuffer(100, 10) - + // Write some test data b.write(100, schema.Float(1.0)) b.write(110, schema.Float(2.0))