From 3276ed7785a19709b866745551a220cc7808bbb6 Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Wed, 14 Jan 2026 14:56:36 +0100 Subject: [PATCH] Half-baked commit for new dynamic retention logic --- cmd/cc-backend/init.go | 2 +- configs/config-demo.json | 2 +- internal/metricstore/level.go | 23 +++++++ internal/metricstore/metricstore.go | 102 +++++++++++++++++++++++++++- internal/repository/job.go | 2 +- web/frontend/package-lock.json | 5 -- 6 files changed, 126 insertions(+), 10 deletions(-) diff --git a/cmd/cc-backend/init.go b/cmd/cc-backend/init.go index e30ae2e1..19c8dd29 100644 --- a/cmd/cc-backend/init.go +++ b/cmd/cc-backend/init.go @@ -36,7 +36,7 @@ const configString = ` "short-running-jobs-duration": 300, "resampling": { "minimumPoints": 600, - "trigger": 180, + "trigger": 300, "resolutions": [ 240, 60 diff --git a/configs/config-demo.json b/configs/config-demo.json index bd492e31..e512c9de 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -4,7 +4,7 @@ "short-running-jobs-duration": 300, "resampling": { "minimumPoints": 600, - "trigger": 180, + "trigger": 300, "resolutions": [240, 60] }, "apiAllowedIPs": ["*"], diff --git a/internal/metricstore/level.go b/internal/metricstore/level.go index d46f893a..87aeefc9 100644 --- a/internal/metricstore/level.go +++ b/internal/metricstore/level.go @@ -72,6 +72,29 @@ func (l *Level) findLevelOrCreate(selector []string, nMetrics int) *Level { return child.findLevelOrCreate(selector[1:], nMetrics) } +func (l *Level) collectPaths(currentDepth, targetDepth int, currentPath []string, results *[][]string) { + l.lock.RLock() + defer l.lock.RUnlock() + + for key, child := range l.children { + if child == nil { + continue + } + + // We explicitly make a new slice and copy data to avoid sharing underlying arrays between siblings + newPath := make([]string, len(currentPath)) + copy(newPath, currentPath) + newPath = append(newPath, key) + + // Check depth, and just return if depth reached + if currentDepth+1 == targetDepth { + *results = append(*results, newPath) + } else { + child.collectPaths(currentDepth+1, targetDepth, newPath, results) + } + } +} + func (l *Level) free(t int64) (int, error) { l.lock.Lock() defer l.lock.Unlock() diff --git a/internal/metricstore/metricstore.go b/internal/metricstore/metricstore.go index ac8948ae..e35b4d58 100644 --- a/internal/metricstore/metricstore.go +++ b/internal/metricstore/metricstore.go @@ -235,8 +235,9 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { case <-ticker.C: t := time.Now().Add(-d) cclog.Infof("[METRICSTORE]> start freeing buffers (older than %s)...\n", t.Format(time.RFC3339)) - freed, err := ms.Free(nil, t.Unix()) - if err != nil { + + freed, err := Free(ms, t) + if err != nil { cclog.Errorf("[METRICSTORE]> freeing up buffers failed: %s\n", err.Error()) } else { cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freed) @@ -246,6 +247,103 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { }() } +func Free(ms *MemoryStore, t time.Time) (int, error) { + // jobRepo := repository.GetJobRepository() + // excludeSelectors, err := jobRepo.GetUsedNodes(t.Unix()) + // if err != nil { + // return 0, err + // } + + excludeSelectors := make(map[string][]string, 0) + + // excludeSelectors := map[string][]string{ + // "alex": {"a0122", "a0123", "a0225"}, + // "fritz": {"f0201", "f0202"}, + // } + + switch lenMap := len(excludeSelectors); lenMap { + + // If the length of the map returned by GetUsedNodes() is 0, + // then use default Free method with nil selector + case 0: + return ms.Free(nil, t.Unix()) + + // Else formulate selectors, exclude those from the map + // and free the rest of the selectors + default: + selectors := GetSelectors(ms, excludeSelectors) + return FreeSelected(ms, selectors, t) + } +} + +// A function to free specific selectors. Used when we want to retain some specific nodes +// beyond the retention time. +func FreeSelected(ms *MemoryStore, selectors [][]string, t time.Time) (int, error) { + freed := 0 + + for _, selector := range selectors { + + freedBuffers, err := ms.Free(selector, t.Unix()) + if err != nil { + cclog.Errorf("error while freeing selected buffers: %#v", err) + } + freed += freedBuffers + + } + + return freed, nil +} + +// This function will populate all the second last levels - meaning nodes +// From that we can exclude the specific selectosr/node we want to retain. +func GetSelectors(ms *MemoryStore, excludeSelectors map[string][]string) [][]string { + allSelectors := ms.GetPaths(2) + + filteredSelectors := make([][]string, 0, len(allSelectors)) + + for _, path := range allSelectors { + if len(path) < 2 { + continue + } + + key := path[0] // The "Key" (Level 1) + value := path[1] // The "Value" (Level 2) + + exclude := false + + // Check if the key exists in our exclusion map + if excludedValues, exists := excludeSelectors[key]; exists { + // The key exists, now check if the specific value is in the exclusion list + for _, ev := range excludedValues { + if ev == value { + exclude = true + break + } + } + } + + if !exclude { + filteredSelectors = append(filteredSelectors, path) + } + } + + // fmt.Printf("All selectors: %#v\n\n", allSelectors) + // fmt.Printf("filteredSelectors: %#v\n\n", filteredSelectors) + + return filteredSelectors +} + +// GetPaths returns a list of lists (paths) to the specified depth. +func (ms *MemoryStore) GetPaths(targetDepth int) [][]string { + var results [][]string + + // Start recursion. Initial path is empty. + // We treat Root as depth 0. + ms.root.collectPaths(0, targetDepth, []string{}, &results) + + return results +} + // Write all values in `metrics` to the level specified by `selector` for time `ts`. // Look at `findLevelOrCreate` for how selectors work. func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error { diff --git a/internal/repository/job.go b/internal/repository/job.go index 293c28d4..b1e92424 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -821,7 +821,7 @@ func (r *JobRepository) UpdateFootprint( // relevant jobs. Returns an error if the database query fails or row iteration // encounters errors. Individual row parsing errors are logged but don't fail // the entire operation. -func (r *JobRepository) GetUsedNodes(ts uint64) (map[string][]string, error) { +func (r *JobRepository) GetUsedNodes(ts int64) (map[string][]string, error) { // Note: Query expects index on (job_state, start_time) for optimal performance q := sq.Select("job.cluster", "job.resources").From("job"). Where("job.start_time < ?", ts). diff --git a/web/frontend/package-lock.json b/web/frontend/package-lock.json index 8db152f8..2820b72b 100644 --- a/web/frontend/package-lock.json +++ b/web/frontend/package-lock.json @@ -621,7 +621,6 @@ "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.15.0.tgz", "integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==", "license": "MIT", - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -822,7 +821,6 @@ "resolved": "https://registry.npmjs.org/graphql/-/graphql-16.12.0.tgz", "integrity": "sha512-DKKrynuQRne0PNpEbzuEdHlYOMksHSUI8Zc9Unei5gTsMNA2/vMpoMz/yKba50pejK56qj98qM0SjYxAKi13gQ==", "license": "MIT", - "peer": true, "engines": { "node": "^12.22.0 || ^14.16.0 || ^16.0.0 || >=17.0.0" } @@ -929,7 +927,6 @@ "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz", "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -984,7 +981,6 @@ "integrity": "sha512-3nk8Y3a9Ea8szgKhinMlGMhGMw89mqule3KWczxhIzqudyHdCIOHw8WJlj/r329fACjKLEh13ZSk7oE22kyeIw==", "devOptional": true, "license": "MIT", - "peer": true, "dependencies": { "@types/estree": "1.0.8" }, @@ -1165,7 +1161,6 @@ "resolved": "https://registry.npmjs.org/svelte/-/svelte-5.46.1.tgz", "integrity": "sha512-ynjfCHD3nP2el70kN5Pmg37sSi0EjOm9FgHYQdC4giWG/hzO3AatzXXJJgP305uIhGQxSufJLuYWtkY8uK/8RA==", "license": "MIT", - "peer": true, "dependencies": { "@jridgewell/remapping": "^2.3.4", "@jridgewell/sourcemap-codec": "^1.5.0",