mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-01-16 09:41:47 +01:00
Add Memory Tracker worker for CCMS
This commit is contained in:
@@ -19,6 +19,7 @@
|
|||||||
"checkpoints": {
|
"checkpoints": {
|
||||||
"interval": "12h"
|
"interval": "12h"
|
||||||
},
|
},
|
||||||
"retention-in-memory": "48h"
|
"retention-in-memory": "48h",
|
||||||
|
"memory-cap": 100
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -49,6 +49,7 @@
|
|||||||
"checkpoints": {
|
"checkpoints": {
|
||||||
"interval": "12h"
|
"interval": "12h"
|
||||||
},
|
},
|
||||||
|
"memory-cap": 100,
|
||||||
"retention-in-memory": "48h",
|
"retention-in-memory": "48h",
|
||||||
"nats-subscriptions": [
|
"nats-subscriptions": [
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -164,6 +164,38 @@ func (b *buffer) free(t int64) (delme bool, n int) {
|
|||||||
return false, n
|
return false, n
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// forceFreeOldest recursively finds the end of the linked list (the oldest buffer)
|
||||||
|
// and removes it.
|
||||||
|
// Returns:
|
||||||
|
//
|
||||||
|
// delme: true if 'b' itself is the oldest and should be removed by the caller
|
||||||
|
// n: the number of buffers freed (will be 1 or 0)
|
||||||
|
func (b *buffer) forceFreeOldest() (delme bool, n int) {
|
||||||
|
// If there is a previous buffer, recurse down to find the oldest
|
||||||
|
if b.prev != nil {
|
||||||
|
delPrev, freed := b.prev.forceFreeOldest()
|
||||||
|
|
||||||
|
// If the previous buffer signals it should be deleted:
|
||||||
|
if delPrev {
|
||||||
|
// Unlink references
|
||||||
|
b.prev.next = nil
|
||||||
|
|
||||||
|
// Return to pool if capacity matches
|
||||||
|
if cap(b.prev.data) == BufferCap {
|
||||||
|
bufferPool.Put(b.prev)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the link from the current buffer
|
||||||
|
b.prev = nil
|
||||||
|
}
|
||||||
|
return false, freed
|
||||||
|
}
|
||||||
|
|
||||||
|
// If b.prev is nil, THIS buffer is the oldest.
|
||||||
|
// We return true so the parent (or the Level loop) knows to delete reference to 'b'.
|
||||||
|
return true, 1
|
||||||
|
}
|
||||||
|
|
||||||
// Call `callback` on every buffer that contains data in the range from `from` to `to`.
|
// Call `callback` on every buffer that contains data in the range from `from` to `to`.
|
||||||
func (b *buffer) iterFromTo(from, to int64, callback func(b *buffer) error) error {
|
func (b *buffer) iterFromTo(from, to int64, callback func(b *buffer) error) error {
|
||||||
if b == nil {
|
if b == nil {
|
||||||
|
|||||||
@@ -73,5 +73,5 @@ const configSchema = `{
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"required": ["checkpoints", "retention-in-memory"]
|
"required": ["checkpoints", "retention-in-memory", "memory-cap"]
|
||||||
}`
|
}`
|
||||||
|
|||||||
@@ -124,6 +124,42 @@ func (l *Level) free(t int64) (int, error) {
|
|||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *Level) forceFree() (int, error) {
|
||||||
|
l.lock.Lock()
|
||||||
|
defer l.lock.Unlock()
|
||||||
|
|
||||||
|
n := 0
|
||||||
|
|
||||||
|
// Iterate over metrics in the current level
|
||||||
|
for i, b := range l.metrics {
|
||||||
|
if b != nil {
|
||||||
|
// Attempt to free the oldest buffer in this chain
|
||||||
|
delme, freedCount := b.forceFreeOldest()
|
||||||
|
n += freedCount
|
||||||
|
|
||||||
|
// If delme is true, it means 'b' itself (the head) was the oldest
|
||||||
|
// and needs to be removed from the slice.
|
||||||
|
if delme {
|
||||||
|
if cap(b.data) == BufferCap {
|
||||||
|
bufferPool.Put(b)
|
||||||
|
}
|
||||||
|
l.metrics[i] = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recursively traverse children
|
||||||
|
for _, child := range l.children {
|
||||||
|
m, err := child.forceFree()
|
||||||
|
n += m
|
||||||
|
if err != nil {
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (l *Level) sizeInBytes() int64 {
|
func (l *Level) sizeInBytes() int64 {
|
||||||
l.lock.RLock()
|
l.lock.RLock()
|
||||||
defer l.lock.RUnlock()
|
defer l.lock.RUnlock()
|
||||||
|
|||||||
@@ -143,11 +143,13 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
|
|||||||
checkpointingGoroutines := 1
|
checkpointingGoroutines := 1
|
||||||
dataStagingGoroutines := 1
|
dataStagingGoroutines := 1
|
||||||
archivingGoroutines := 1
|
archivingGoroutines := 1
|
||||||
|
memoryUsageTracker := 1
|
||||||
|
|
||||||
totalGoroutines := retentionGoroutines +
|
totalGoroutines := retentionGoroutines +
|
||||||
checkpointingGoroutines +
|
checkpointingGoroutines +
|
||||||
dataStagingGoroutines +
|
dataStagingGoroutines +
|
||||||
archivingGoroutines
|
archivingGoroutines +
|
||||||
|
memoryUsageTracker
|
||||||
|
|
||||||
wg.Add(totalGoroutines)
|
wg.Add(totalGoroutines)
|
||||||
|
|
||||||
@@ -155,6 +157,7 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
|
|||||||
Checkpointing(wg, ctx)
|
Checkpointing(wg, ctx)
|
||||||
Archiving(wg, ctx)
|
Archiving(wg, ctx)
|
||||||
DataStaging(wg, ctx)
|
DataStaging(wg, ctx)
|
||||||
|
MemoryUsageTracker(wg, ctx)
|
||||||
|
|
||||||
// Note: Signal handling has been removed from this function.
|
// Note: Signal handling has been removed from this function.
|
||||||
// The caller is responsible for handling shutdown signals and calling
|
// The caller is responsible for handling shutdown signals and calling
|
||||||
@@ -280,6 +283,59 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
|
||||||
|
ms := GetMemoryStore()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
d := 1 * time.Minute
|
||||||
|
|
||||||
|
if d <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ticker := time.NewTicker(d)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
memoryUsageGB := ms.SizeInGB()
|
||||||
|
cclog.Infof("[METRICSTORE]> current memory usage: %.2f\n", memoryUsageGB)
|
||||||
|
|
||||||
|
if memoryUsageGB > float64(Keys.MemoryCap) {
|
||||||
|
cclog.Warnf("[METRICSTORE]> current memory usage is greater than the Memory Cap: %d\n", Keys.MemoryCap)
|
||||||
|
cclog.Warnf("[METRICSTORE]> starting to force-free the buffers from the Metric Store\n")
|
||||||
|
|
||||||
|
freedTotal := 0
|
||||||
|
|
||||||
|
for {
|
||||||
|
memoryUsageGB = ms.SizeInGB()
|
||||||
|
if memoryUsageGB < float64(Keys.MemoryCap) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
freed, err := ms.ForceFree()
|
||||||
|
if err != nil {
|
||||||
|
cclog.Errorf("error while force-freeing the buffers: %s", err)
|
||||||
|
}
|
||||||
|
if freed == 0 {
|
||||||
|
cclog.Fatalf("0 buffers force-freed in last try, %d total buffers force-freed, memory usage of %.2f remains higher than the memory cap and there are no buffers left to force-free\n", freedTotal, memoryUsageGB)
|
||||||
|
}
|
||||||
|
freedTotal += freed
|
||||||
|
}
|
||||||
|
|
||||||
|
cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freedTotal)
|
||||||
|
cclog.Infof("[METRICSTORE]> current memory usage after force-freeing the buffers: %.2f\n", memoryUsageGB)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
func Free(ms *MemoryStore, t time.Time) (int, error) {
|
func Free(ms *MemoryStore, t time.Time) (int, error) {
|
||||||
// If no NodeProvider is configured, free all buffers older than t
|
// If no NodeProvider is configured, free all buffers older than t
|
||||||
if ms.nodeProvider == nil {
|
if ms.nodeProvider == nil {
|
||||||
@@ -496,6 +552,12 @@ func (m *MemoryStore) Free(selector []string, t int64) (int, error) {
|
|||||||
return m.GetLevel(selector).free(t)
|
return m.GetLevel(selector).free(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Free releases all buffers for the selected level and all its children that
|
||||||
|
// contain only values older than `t`.
|
||||||
|
func (m *MemoryStore) ForceFree() (int, error) {
|
||||||
|
return m.GetLevel(nil).forceFree()
|
||||||
|
}
|
||||||
|
|
||||||
func (m *MemoryStore) FreeAll() error {
|
func (m *MemoryStore) FreeAll() error {
|
||||||
for k := range m.root.children {
|
for k := range m.root.children {
|
||||||
delete(m.root.children, k)
|
delete(m.root.children, k)
|
||||||
@@ -508,6 +570,10 @@ func (m *MemoryStore) SizeInBytes() int64 {
|
|||||||
return m.root.sizeInBytes()
|
return m.root.sizeInBytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MemoryStore) SizeInGB() float64 {
|
||||||
|
return float64(m.root.sizeInBytes()) / 1e9
|
||||||
|
}
|
||||||
|
|
||||||
// ListChildren , given a selector, returns a list of all children of the level
|
// ListChildren , given a selector, returns a list of all children of the level
|
||||||
// selected.
|
// selected.
|
||||||
func (m *MemoryStore) ListChildren(selector []string) []string {
|
func (m *MemoryStore) ListChildren(selector []string) []string {
|
||||||
|
|||||||
Reference in New Issue
Block a user