diff --git a/configs/config-demo.json b/configs/config-demo.json index 8cbd2ed6..c3042993 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -16,7 +16,7 @@ "checkpoints": { "interval": "12h" }, - "retention-in-memory": "2m", + "retention-in-memory": "48h", "memory-cap": 100 } } \ No newline at end of file diff --git a/internal/metricstore/buffer.go b/internal/metricstore/buffer.go index 3687c4dc..46eb5149 100644 --- a/internal/metricstore/buffer.go +++ b/internal/metricstore/buffer.go @@ -265,13 +265,9 @@ func (b *buffer) forceFreeOldest() (delme bool, n int) { // If the previous buffer signals it should be deleted: if delPrev { - // Unlink references + // Clear links on the dying buffer to prevent leaks b.prev.next = nil - - // Return to pool if capacity matches - if cap(b.prev.data) == BufferCap { - bufferPool.Put(b.prev) - } + b.prev.data = nil // Release the underlying float slice immediately // Remove the link from the current buffer b.prev = nil diff --git a/internal/metricstore/level.go b/internal/metricstore/level.go index bc99b884..b35137ef 100644 --- a/internal/metricstore/level.go +++ b/internal/metricstore/level.go @@ -234,9 +234,12 @@ func (l *Level) forceFree() (int, error) { // If delme is true, it means 'b' itself (the head) was the oldest // and needs to be removed from the slice. if delme { - if cap(b.data) == BufferCap { - bufferPool.Put(b) - } + // Nil out fields to ensure no hanging references + + b.next = nil + b.prev = nil + b.data = nil + l.metrics[i] = nil } } diff --git a/internal/metricstore/metricstore.go b/internal/metricstore/metricstore.go index b016e725..a50f4ab5 100644 --- a/internal/metricstore/metricstore.go +++ b/internal/metricstore/metricstore.go @@ -37,6 +37,13 @@ import ( "github.com/ClusterCockpit/cc-lib/v2/util" ) +// Define a struct to hold your globals and the mutex +type GlobalState struct { + mu sync.RWMutex + lastRetentionTime int64 + selectorsExcluded bool +} + var ( singleton sync.Once msInstance *MemoryStore @@ -44,6 +51,8 @@ var ( // and is called during Shutdown to cancel all background goroutines shutdownFunc context.CancelFunc shutdownFuncMu sync.Mutex // Protects shutdownFunc from concurrent access + // Create a global instance + state = &GlobalState{} ) // NodeProvider provides information about nodes currently in use by running jobs. @@ -356,7 +365,12 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: + state.mu.Lock() + t := time.Now().Add(-d) + + state.lastRetentionTime = t.Unix() + cclog.Infof("[METRICSTORE]> start freeing buffers (older than %s)...\n", t.Format(time.RFC3339)) freed, err := Free(ms, t) @@ -365,6 +379,8 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { } else { cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freed) } + + state.mu.Unlock() } } }() @@ -400,14 +416,36 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: + state.mu.RLock() + memoryUsageGB := ms.SizeInGB() cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB\n", memoryUsageGB) + freedTotal := 0 + var err error + + // First force-free all the checkpoints that were + if state.lastRetentionTime != 0 && state.selectorsExcluded { + freedTotal, err = ms.Free(nil, state.lastRetentionTime) + if err != nil { + cclog.Errorf("[METRICSTORE]> error while force-freeing the excluded buffers: %s", err) + } + + // Calling runtime.GC() twice in succession tp completely empty a bufferPool (sync.Pool) + runtime.GC() + runtime.GC() + + cclog.Infof("[METRICSTORE]> done: %d excluded buffers force-freed\n", freedTotal) + } + + state.mu.RUnlock() + + memoryUsageGB = ms.SizeInGB() + if memoryUsageGB > float64(Keys.MemoryCap) { - cclog.Warnf("[METRICSTORE]> current memory usage is greater than the Memory Cap: %d GB\n", Keys.MemoryCap) + cclog.Warnf("[METRICSTORE]> memory usage is still greater than the Memory Cap: %d GB\n", Keys.MemoryCap) cclog.Warnf("[METRICSTORE]> starting to force-free the buffers from the Metric Store\n") - freedTotal := 0 const maxIterations = 100 for range maxIterations { @@ -432,7 +470,7 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { if memoryUsageGB >= float64(Keys.MemoryCap) { cclog.Errorf("[METRICSTORE]> reached maximum iterations (%d) or no more buffers to free, current memory usage: %.2f GB\n", maxIterations, memoryUsageGB) } else { - cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freedTotal) + cclog.Infof("[METRICSTORE]> done: %d buffers force-freed\n", freedTotal) cclog.Infof("[METRICSTORE]> current memory usage after force-freeing the buffers: %.2f GB\n", memoryUsageGB) } } @@ -476,11 +514,13 @@ func Free(ms *MemoryStore, t time.Time) (int, error) { // If the length of the map returned by GetUsedNodes() is 0, // then use default Free method with nil selector case 0: + state.selectorsExcluded = false return ms.Free(nil, t.Unix()) // Else formulate selectors, exclude those from the map // and free the rest of the selectors default: + state.selectorsExcluded = true selectors := GetSelectors(ms, excludeSelectors) return FreeSelected(ms, selectors, t) } diff --git a/web/frontend/src/Job.root.svelte b/web/frontend/src/Job.root.svelte index 2ff7ab62..f2cff319 100644 --- a/web/frontend/src/Job.root.svelte +++ b/web/frontend/src/Job.root.svelte @@ -94,6 +94,7 @@ let totalMetrics = $state(0); /* Derived */ + const showSummary = $derived((!!ccconfig[`jobView_showFootprint`] || !!ccconfig[`jobView_showPolarPlot`])) const jobMetrics = $derived(queryStore({ client: client, query: query, @@ -261,7 +262,9 @@ {#if $initq.error} {$initq.error.message} {:else if $initq?.data} - + {#if showSummary} + + {/if} {:else} {/if}