Refactor metricstore. Initial stub for cluster/ subcluster specific retention times

This commit is contained in:
2026-01-13 09:52:00 +01:00
parent 2ebab1e2e2
commit 04a2e460ae
5 changed files with 203 additions and 27 deletions

View File

@@ -24,8 +24,10 @@ import (
"github.com/linkedin/goavro/v2" "github.com/linkedin/goavro/v2"
) )
var NumAvroWorkers int = DefaultAvroWorkers var (
var startUp bool = true NumAvroWorkers int = DefaultAvroWorkers
startUp bool = true
)
func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) {
levels := make([]*AvroLevel, 0) levels := make([]*AvroLevel, 0)

View File

@@ -33,8 +33,19 @@ type MetricStoreConfig struct {
DumpToFile string `json:"dump-to-file"` DumpToFile string `json:"dump-to-file"`
EnableGops bool `json:"gops"` EnableGops bool `json:"gops"`
} `json:"debug"` } `json:"debug"`
// Global default retention duration
RetentionInMemory string `json:"retention-in-memory"` RetentionInMemory string `json:"retention-in-memory"`
Archive struct { // Per-cluster retention overrides
Clusters []struct {
Cluster string `json:"cluster"`
RetentionInMemory string `json:"retention-in-memory"`
// Per-subcluster retention overrides within this cluster
SubClusters []struct {
SubCluster string `json:"subcluster"`
RetentionInMemory string `json:"retention-in-memory"`
} `json:"subclusters,omitempty"`
} `json:"clusters,omitempty"`
Archive struct {
Interval string `json:"interval"` Interval string `json:"interval"`
RootDir string `json:"directory"` RootDir string `json:"directory"`
DeleteInstead bool `json:"delete-instead"` DeleteInstead bool `json:"delete-instead"`
@@ -50,6 +61,14 @@ type MetricStoreConfig struct {
var Keys MetricStoreConfig var Keys MetricStoreConfig
type retentionConfig struct {
global time.Duration
clusterMap map[string]time.Duration
subClusterMap map[string]map[string]time.Duration
}
var retentionLookup *retentionConfig
// AggregationStrategy for aggregation over multiple values at different cpus/sockets/..., not time! // AggregationStrategy for aggregation over multiple values at different cpus/sockets/..., not time!
type AggregationStrategy int type AggregationStrategy int
@@ -113,3 +132,52 @@ func AddMetric(name string, metric MetricConfig) error {
return nil return nil
} }
func GetRetentionDuration(cluster, subCluster string) (time.Duration, error) {
if retentionLookup == nil {
return 0, fmt.Errorf("[METRICSTORE]> retention configuration not initialized")
}
if subCluster != "" {
if subMap, ok := retentionLookup.subClusterMap[cluster]; ok {
if retention, ok := subMap[subCluster]; ok {
return retention, nil
}
}
}
if retention, ok := retentionLookup.clusterMap[cluster]; ok {
return retention, nil
}
return retentionLookup.global, nil
}
// GetShortestRetentionDuration returns the shortest configured retention duration
// across all levels (global, cluster, and subcluster configurations).
// Returns 0 if retentionLookup is not initialized or global retention is not set.
func GetShortestRetentionDuration() time.Duration {
if retentionLookup == nil || retentionLookup.global <= 0 {
return 0
}
shortest := retentionLookup.global
// Check all cluster-level retention durations
for _, clusterRetention := range retentionLookup.clusterMap {
if clusterRetention > 0 && clusterRetention < shortest {
shortest = clusterRetention
}
}
// Check all subcluster-level retention durations
for _, subClusterMap := range retentionLookup.subClusterMap {
for _, scRetention := range subClusterMap {
if scRetention > 0 && scRetention < shortest {
shortest = scRetention
}
}
}
return shortest
}

View File

@@ -46,9 +46,45 @@ const configSchema = `{
} }
}, },
"retention-in-memory": { "retention-in-memory": {
"description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.", "description": "Global default: Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.",
"type": "string" "type": "string"
}, },
"clusters": {
"description": "Optional per-cluster retention overrides",
"type": "array",
"items": {
"type": "object",
"required": ["cluster"],
"properties": {
"cluster": {
"description": "Cluster name",
"type": "string"
},
"retention-in-memory": {
"description": "Cluster-specific retention duration (overrides global default)",
"type": "string"
},
"subclusters": {
"description": "Optional per-subcluster retention overrides",
"type": "array",
"items": {
"type": "object",
"required": ["subcluster"],
"properties": {
"subcluster": {
"description": "Subcluster name",
"type": "string"
},
"retention-in-memory": {
"description": "Subcluster-specific retention duration (overrides cluster and global default)",
"type": "string"
}
}
}
}
}
}
},
"nats": { "nats": {
"description": "Configuration for accepting published data through NATS.", "description": "Configuration for accepting published data through NATS.",
"type": "array", "type": "array",

View File

@@ -98,6 +98,49 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
} }
} }
globalRetention, err := time.ParseDuration(Keys.RetentionInMemory)
if err != nil {
cclog.Fatal(err)
}
retentionLookup = &retentionConfig{
global: globalRetention,
clusterMap: make(map[string]time.Duration),
subClusterMap: make(map[string]map[string]time.Duration),
}
for _, clusterCfg := range Keys.Clusters {
if clusterCfg.RetentionInMemory != "" {
clusterRetention, err := time.ParseDuration(clusterCfg.RetentionInMemory)
if err != nil {
cclog.Warnf("[METRICSTORE]> Invalid retention duration for cluster '%s': %s\n", clusterCfg.Cluster, err.Error())
continue
}
retentionLookup.clusterMap[clusterCfg.Cluster] = clusterRetention
cclog.Debugf("[METRICSTORE]> Cluster '%s' retention: %s\n", clusterCfg.Cluster, clusterRetention)
}
if len(clusterCfg.SubClusters) > 0 {
if retentionLookup.subClusterMap[clusterCfg.Cluster] == nil {
retentionLookup.subClusterMap[clusterCfg.Cluster] = make(map[string]time.Duration)
}
for _, scCfg := range clusterCfg.SubClusters {
if scCfg.RetentionInMemory != "" {
scRetention, err := time.ParseDuration(scCfg.RetentionInMemory)
if err != nil {
cclog.Warnf("[METRICSTORE]> Invalid retention duration for subcluster '%s/%s': %s\n",
clusterCfg.Cluster, scCfg.SubCluster, err.Error())
continue
}
retentionLookup.subClusterMap[clusterCfg.Cluster][scCfg.SubCluster] = scRetention
cclog.Debugf("[METRICSTORE]> SubCluster '%s/%s' retention: %s\n",
clusterCfg.Cluster, scCfg.SubCluster, scRetention)
}
}
}
}
// Pass the config.MetricStoreKeys // Pass the config.MetricStoreKeys
InitMetrics(Metrics) InitMetrics(Metrics)
@@ -208,32 +251,22 @@ func Shutdown() {
cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files) cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files)
} }
func getName(m *MemoryStore, i int) string {
for key, val := range m.Metrics {
if val.offset == i {
return key
}
}
return ""
}
func Retention(wg *sync.WaitGroup, ctx context.Context) { func Retention(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore() ms := GetMemoryStore()
go func() { go func() {
defer wg.Done() defer wg.Done()
d, err := time.ParseDuration(Keys.RetentionInMemory) shortestRetention := GetShortestRetentionDuration()
if err != nil { if shortestRetention <= 0 {
cclog.Fatal(err)
}
if d <= 0 {
return return
} }
tickInterval := d / 2 tickInterval := shortestRetention / 2
if tickInterval <= 0 { if tickInterval <= 0 {
return return
} }
cclog.Debugf("[METRICSTORE]> Retention ticker interval set to %s (half of shortest retention: %s)\n",
tickInterval, shortestRetention)
ticker := time.NewTicker(tickInterval) ticker := time.NewTicker(tickInterval)
defer ticker.Stop() defer ticker.Stop()
@@ -242,13 +275,50 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-ticker.C: case <-ticker.C:
t := time.Now().Add(-d) totalFreed := 0
cclog.Infof("[METRICSTORE]> start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
freed, err := ms.Free(nil, t.Unix()) clusters := ms.ListChildren(nil)
if err != nil { for _, cluster := range clusters {
cclog.Errorf("[METRICSTORE]> freeing up buffers failed: %s\n", err.Error()) retention, err := GetRetentionDuration(cluster, "")
} else { if err != nil {
cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freed) cclog.Warnf("[METRICSTORE]> Could not get retention for cluster '%s': %s\n", cluster, err.Error())
continue
}
if retention <= 0 {
continue
}
t := time.Now().Add(-retention)
cclog.Debugf("[METRICSTORE]> Freeing buffers for cluster '%s' (older than %s, retention: %s)...\n",
cluster, t.Format(time.RFC3339), retention)
subClusters := ms.ListChildren([]string{cluster})
for _, subCluster := range subClusters {
scRetention, err := GetRetentionDuration(cluster, subCluster)
if err != nil {
cclog.Warnf("[METRICSTORE]> Could not get retention for subcluster '%s/%s': %s\n",
cluster, subCluster, err.Error())
continue
}
if scRetention <= 0 {
continue
}
scTime := time.Now().Add(-scRetention)
freed, err := ms.Free([]string{cluster, subCluster}, scTime.Unix())
if err != nil {
cclog.Errorf("[METRICSTORE]> freeing buffers for '%s/%s' failed: %s\n",
cluster, subCluster, err.Error())
} else if freed > 0 {
cclog.Debugf("[METRICSTORE]> freed %d buffers for '%s/%s' (retention: %s)\n",
freed, cluster, subCluster, scRetention)
totalFreed += freed
}
}
}
if totalFreed > 0 {
cclog.Infof("[METRICSTORE]> Total buffers freed: %d\n", totalFreed)
} }
} }
} }