mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-01-20 11:31:46 +01:00
Merge branch 'dev' of github.com:ClusterCockpit/cc-backend into dev
This commit is contained in:
@@ -16,7 +16,7 @@
|
|||||||
"checkpoints": {
|
"checkpoints": {
|
||||||
"interval": "12h"
|
"interval": "12h"
|
||||||
},
|
},
|
||||||
"retention-in-memory": "2m",
|
"retention-in-memory": "48h",
|
||||||
"memory-cap": 100
|
"memory-cap": 100
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -265,13 +265,9 @@ func (b *buffer) forceFreeOldest() (delme bool, n int) {
|
|||||||
|
|
||||||
// If the previous buffer signals it should be deleted:
|
// If the previous buffer signals it should be deleted:
|
||||||
if delPrev {
|
if delPrev {
|
||||||
// Unlink references
|
// Clear links on the dying buffer to prevent leaks
|
||||||
b.prev.next = nil
|
b.prev.next = nil
|
||||||
|
b.prev.data = nil // Release the underlying float slice immediately
|
||||||
// Return to pool if capacity matches
|
|
||||||
if cap(b.prev.data) == BufferCap {
|
|
||||||
bufferPool.Put(b.prev)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove the link from the current buffer
|
// Remove the link from the current buffer
|
||||||
b.prev = nil
|
b.prev = nil
|
||||||
|
|||||||
@@ -234,9 +234,12 @@ func (l *Level) forceFree() (int, error) {
|
|||||||
// If delme is true, it means 'b' itself (the head) was the oldest
|
// If delme is true, it means 'b' itself (the head) was the oldest
|
||||||
// and needs to be removed from the slice.
|
// and needs to be removed from the slice.
|
||||||
if delme {
|
if delme {
|
||||||
if cap(b.data) == BufferCap {
|
// Nil out fields to ensure no hanging references
|
||||||
bufferPool.Put(b)
|
|
||||||
}
|
b.next = nil
|
||||||
|
b.prev = nil
|
||||||
|
b.data = nil
|
||||||
|
|
||||||
l.metrics[i] = nil
|
l.metrics[i] = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,6 +37,13 @@ import (
|
|||||||
"github.com/ClusterCockpit/cc-lib/v2/util"
|
"github.com/ClusterCockpit/cc-lib/v2/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Define a struct to hold your globals and the mutex
|
||||||
|
type GlobalState struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
lastRetentionTime int64
|
||||||
|
selectorsExcluded bool
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
singleton sync.Once
|
singleton sync.Once
|
||||||
msInstance *MemoryStore
|
msInstance *MemoryStore
|
||||||
@@ -44,6 +51,8 @@ var (
|
|||||||
// and is called during Shutdown to cancel all background goroutines
|
// and is called during Shutdown to cancel all background goroutines
|
||||||
shutdownFunc context.CancelFunc
|
shutdownFunc context.CancelFunc
|
||||||
shutdownFuncMu sync.Mutex // Protects shutdownFunc from concurrent access
|
shutdownFuncMu sync.Mutex // Protects shutdownFunc from concurrent access
|
||||||
|
// Create a global instance
|
||||||
|
state = &GlobalState{}
|
||||||
)
|
)
|
||||||
|
|
||||||
// NodeProvider provides information about nodes currently in use by running jobs.
|
// NodeProvider provides information about nodes currently in use by running jobs.
|
||||||
@@ -356,7 +365,12 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
state.mu.Lock()
|
||||||
|
|
||||||
t := time.Now().Add(-d)
|
t := time.Now().Add(-d)
|
||||||
|
|
||||||
|
state.lastRetentionTime = t.Unix()
|
||||||
|
|
||||||
cclog.Infof("[METRICSTORE]> start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
|
cclog.Infof("[METRICSTORE]> start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
|
||||||
|
|
||||||
freed, err := Free(ms, t)
|
freed, err := Free(ms, t)
|
||||||
@@ -365,6 +379,8 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
} else {
|
} else {
|
||||||
cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freed)
|
cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
state.mu.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@@ -400,14 +416,36 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
state.mu.RLock()
|
||||||
|
|
||||||
memoryUsageGB := ms.SizeInGB()
|
memoryUsageGB := ms.SizeInGB()
|
||||||
cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB\n", memoryUsageGB)
|
cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB\n", memoryUsageGB)
|
||||||
|
|
||||||
|
freedTotal := 0
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// First force-free all the checkpoints that were
|
||||||
|
if state.lastRetentionTime != 0 && state.selectorsExcluded {
|
||||||
|
freedTotal, err = ms.Free(nil, state.lastRetentionTime)
|
||||||
|
if err != nil {
|
||||||
|
cclog.Errorf("[METRICSTORE]> error while force-freeing the excluded buffers: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calling runtime.GC() twice in succession tp completely empty a bufferPool (sync.Pool)
|
||||||
|
runtime.GC()
|
||||||
|
runtime.GC()
|
||||||
|
|
||||||
|
cclog.Infof("[METRICSTORE]> done: %d excluded buffers force-freed\n", freedTotal)
|
||||||
|
}
|
||||||
|
|
||||||
|
state.mu.RUnlock()
|
||||||
|
|
||||||
|
memoryUsageGB = ms.SizeInGB()
|
||||||
|
|
||||||
if memoryUsageGB > float64(Keys.MemoryCap) {
|
if memoryUsageGB > float64(Keys.MemoryCap) {
|
||||||
cclog.Warnf("[METRICSTORE]> current memory usage is greater than the Memory Cap: %d GB\n", Keys.MemoryCap)
|
cclog.Warnf("[METRICSTORE]> memory usage is still greater than the Memory Cap: %d GB\n", Keys.MemoryCap)
|
||||||
cclog.Warnf("[METRICSTORE]> starting to force-free the buffers from the Metric Store\n")
|
cclog.Warnf("[METRICSTORE]> starting to force-free the buffers from the Metric Store\n")
|
||||||
|
|
||||||
freedTotal := 0
|
|
||||||
const maxIterations = 100
|
const maxIterations = 100
|
||||||
|
|
||||||
for range maxIterations {
|
for range maxIterations {
|
||||||
@@ -432,7 +470,7 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
if memoryUsageGB >= float64(Keys.MemoryCap) {
|
if memoryUsageGB >= float64(Keys.MemoryCap) {
|
||||||
cclog.Errorf("[METRICSTORE]> reached maximum iterations (%d) or no more buffers to free, current memory usage: %.2f GB\n", maxIterations, memoryUsageGB)
|
cclog.Errorf("[METRICSTORE]> reached maximum iterations (%d) or no more buffers to free, current memory usage: %.2f GB\n", maxIterations, memoryUsageGB)
|
||||||
} else {
|
} else {
|
||||||
cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freedTotal)
|
cclog.Infof("[METRICSTORE]> done: %d buffers force-freed\n", freedTotal)
|
||||||
cclog.Infof("[METRICSTORE]> current memory usage after force-freeing the buffers: %.2f GB\n", memoryUsageGB)
|
cclog.Infof("[METRICSTORE]> current memory usage after force-freeing the buffers: %.2f GB\n", memoryUsageGB)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -476,11 +514,13 @@ func Free(ms *MemoryStore, t time.Time) (int, error) {
|
|||||||
// If the length of the map returned by GetUsedNodes() is 0,
|
// If the length of the map returned by GetUsedNodes() is 0,
|
||||||
// then use default Free method with nil selector
|
// then use default Free method with nil selector
|
||||||
case 0:
|
case 0:
|
||||||
|
state.selectorsExcluded = false
|
||||||
return ms.Free(nil, t.Unix())
|
return ms.Free(nil, t.Unix())
|
||||||
|
|
||||||
// Else formulate selectors, exclude those from the map
|
// Else formulate selectors, exclude those from the map
|
||||||
// and free the rest of the selectors
|
// and free the rest of the selectors
|
||||||
default:
|
default:
|
||||||
|
state.selectorsExcluded = true
|
||||||
selectors := GetSelectors(ms, excludeSelectors)
|
selectors := GetSelectors(ms, excludeSelectors)
|
||||||
return FreeSelected(ms, selectors, t)
|
return FreeSelected(ms, selectors, t)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -94,6 +94,7 @@
|
|||||||
let totalMetrics = $state(0);
|
let totalMetrics = $state(0);
|
||||||
|
|
||||||
/* Derived */
|
/* Derived */
|
||||||
|
const showSummary = $derived((!!ccconfig[`jobView_showFootprint`] || !!ccconfig[`jobView_showPolarPlot`]))
|
||||||
const jobMetrics = $derived(queryStore({
|
const jobMetrics = $derived(queryStore({
|
||||||
client: client,
|
client: client,
|
||||||
query: query,
|
query: query,
|
||||||
@@ -261,7 +262,9 @@
|
|||||||
{#if $initq.error}
|
{#if $initq.error}
|
||||||
<Card body color="danger">{$initq.error.message}</Card>
|
<Card body color="danger">{$initq.error.message}</Card>
|
||||||
{:else if $initq?.data}
|
{:else if $initq?.data}
|
||||||
|
{#if showSummary}
|
||||||
<JobSummary job={$initq.data.job}/>
|
<JobSummary job={$initq.data.job}/>
|
||||||
|
{/if}
|
||||||
{:else}
|
{:else}
|
||||||
<Spinner secondary />
|
<Spinner secondary />
|
||||||
{/if}
|
{/if}
|
||||||
|
|||||||
Reference in New Issue
Block a user