Revert retention config in metricstore

This commit is contained in:
2026-01-13 14:42:24 +01:00
parent 754f7e16f6
commit 25c8fca561
3 changed files with 13 additions and 196 deletions

View File

@@ -98,49 +98,6 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
}
}
globalRetention, err := time.ParseDuration(Keys.RetentionInMemory)
if err != nil {
cclog.Fatal(err)
}
retentionLookup = &retentionConfig{
global: globalRetention,
clusterMap: make(map[string]time.Duration),
subClusterMap: make(map[string]map[string]time.Duration),
}
for _, clusterCfg := range Keys.Clusters {
if clusterCfg.RetentionInMemory != "" {
clusterRetention, err := time.ParseDuration(clusterCfg.RetentionInMemory)
if err != nil {
cclog.Warnf("[METRICSTORE]> Invalid retention duration for cluster '%s': %s\n", clusterCfg.Cluster, err.Error())
continue
}
retentionLookup.clusterMap[clusterCfg.Cluster] = clusterRetention
cclog.Debugf("[METRICSTORE]> Cluster '%s' retention: %s\n", clusterCfg.Cluster, clusterRetention)
}
if len(clusterCfg.SubClusters) > 0 {
if retentionLookup.subClusterMap[clusterCfg.Cluster] == nil {
retentionLookup.subClusterMap[clusterCfg.Cluster] = make(map[string]time.Duration)
}
for _, scCfg := range clusterCfg.SubClusters {
if scCfg.RetentionInMemory != "" {
scRetention, err := time.ParseDuration(scCfg.RetentionInMemory)
if err != nil {
cclog.Warnf("[METRICSTORE]> Invalid retention duration for subcluster '%s/%s': %s\n",
clusterCfg.Cluster, scCfg.SubCluster, err.Error())
continue
}
retentionLookup.subClusterMap[clusterCfg.Cluster][scCfg.SubCluster] = scRetention
cclog.Debugf("[METRICSTORE]> SubCluster '%s/%s' retention: %s\n",
clusterCfg.Cluster, scCfg.SubCluster, scRetention)
}
}
}
}
// Pass the config.MetricStoreKeys
InitMetrics(Metrics)
@@ -256,17 +213,18 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
go func() {
defer wg.Done()
shortestRetention := GetShortestRetentionDuration()
if shortestRetention <= 0 {
d, err := time.ParseDuration(Keys.RetentionInMemory)
if err != nil {
cclog.Fatal(err)
}
if d <= 0 {
return
}
tickInterval := shortestRetention / 2
tickInterval := d / 2
if tickInterval <= 0 {
return
}
cclog.Debugf("[METRICSTORE]> Retention ticker interval set to %s (half of shortest retention: %s)\n",
tickInterval, shortestRetention)
ticker := time.NewTicker(tickInterval)
defer ticker.Stop()
@@ -275,50 +233,13 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
totalFreed := 0
clusters := ms.ListChildren(nil)
for _, cluster := range clusters {
retention, err := GetRetentionDuration(cluster, "")
if err != nil {
cclog.Warnf("[METRICSTORE]> Could not get retention for cluster '%s': %s\n", cluster, err.Error())
continue
}
if retention <= 0 {
continue
}
t := time.Now().Add(-retention)
cclog.Debugf("[METRICSTORE]> Freeing buffers for cluster '%s' (older than %s, retention: %s)...\n",
cluster, t.Format(time.RFC3339), retention)
subClusters := ms.ListChildren([]string{cluster})
for _, subCluster := range subClusters {
scRetention, err := GetRetentionDuration(cluster, subCluster)
t := time.Now().Add(-d)
cclog.Infof("[METRICSTORE]> start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
freed, err := ms.Free(nil, t.Unix())
if err != nil {
cclog.Warnf("[METRICSTORE]> Could not get retention for subcluster '%s/%s': %s\n",
cluster, subCluster, err.Error())
continue
}
if scRetention <= 0 {
continue
}
scTime := time.Now().Add(-scRetention)
freed, err := ms.Free([]string{cluster, subCluster}, scTime.Unix())
if err != nil {
cclog.Errorf("[METRICSTORE]> freeing buffers for '%s/%s' failed: %s\n",
cluster, subCluster, err.Error())
} else if freed > 0 {
cclog.Debugf("[METRICSTORE]> freed %d buffers for '%s/%s' (retention: %s)\n",
freed, cluster, subCluster, scRetention)
totalFreed += freed
}
}
}
if totalFreed > 0 {
cclog.Infof("[METRICSTORE]> Total buffers freed: %d\n", totalFreed)
cclog.Errorf("[METRICSTORE]> freeing up buffers failed: %s\n", err.Error())
} else {
cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freed)
}
}
}