mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-15 04:17:30 +01:00
Make checkpointInterval an option config option again.
Also applies small fixes Entire-Checkpoint: c11d1a65fae4
This commit is contained in:
@@ -127,7 +127,7 @@ func (data *APIMetricData) AddStats() {
|
|||||||
// This is commonly used for unit conversion (e.g., bytes to gigabytes).
|
// This is commonly used for unit conversion (e.g., bytes to gigabytes).
|
||||||
// Scaling by 0 or 1 is a no-op for performance reasons.
|
// Scaling by 0 or 1 is a no-op for performance reasons.
|
||||||
func (data *APIMetricData) ScaleBy(f schema.Float) {
|
func (data *APIMetricData) ScaleBy(f schema.Float) {
|
||||||
if f == 0 || f == 1 {
|
if f == 1 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -100,8 +100,15 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
|
|
||||||
wg.Go(func() {
|
wg.Go(func() {
|
||||||
|
|
||||||
const checkpointInterval = 12 * time.Hour
|
d := 12 * time.Hour // default checkpoint interval
|
||||||
d := checkpointInterval
|
if Keys.CheckpointInterval != "" {
|
||||||
|
parsed, err := time.ParseDuration(Keys.CheckpointInterval)
|
||||||
|
if err != nil {
|
||||||
|
cclog.Errorf("[METRICSTORE]> invalid checkpoint-interval %q: %s, using default 12h", Keys.CheckpointInterval, err)
|
||||||
|
} else {
|
||||||
|
d = parsed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ticker := time.NewTicker(d)
|
ticker := time.NewTicker(d)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|||||||
@@ -54,6 +54,10 @@ const configSchema = `{
|
|||||||
"description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.",
|
"description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
},
|
||||||
|
"checkpoint-interval": {
|
||||||
|
"description": "Interval between checkpoints as a Go duration string (e.g., '12h', '6h', '30m'). Default is '12h'.",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
"memory-cap": {
|
"memory-cap": {
|
||||||
"description": "Upper memory capacity limit used by metricstore in GB",
|
"description": "Upper memory capacity limit used by metricstore in GB",
|
||||||
"type": "integer"
|
"type": "integer"
|
||||||
|
|||||||
@@ -66,7 +66,10 @@ func (l *Level) collectMetricStatus(m *MemoryStore, expectedMetrics []string, he
|
|||||||
if degraded[metricName] {
|
if degraded[metricName] {
|
||||||
continue // already degraded, cannot improve
|
continue // already degraded, cannot improve
|
||||||
}
|
}
|
||||||
mc := m.Metrics[metricName]
|
mc, ok := m.Metrics[metricName]
|
||||||
|
if !ok {
|
||||||
|
continue // unknown metric, will be reported as missing
|
||||||
|
}
|
||||||
b := l.metrics[mc.offset]
|
b := l.metrics[mc.offset]
|
||||||
if b.bufferExists() {
|
if b.bufferExists() {
|
||||||
if !b.isBufferHealthy() {
|
if !b.isBufferHealthy() {
|
||||||
|
|||||||
@@ -181,6 +181,14 @@ func (l *Level) collectPaths(currentDepth, targetDepth int, currentPath []string
|
|||||||
// - int: Total number of buffers freed in this subtree
|
// - int: Total number of buffers freed in this subtree
|
||||||
// - error: Non-nil on failure (propagated from children)
|
// - error: Non-nil on failure (propagated from children)
|
||||||
func (l *Level) free(t int64) (int, error) {
|
func (l *Level) free(t int64) (int, error) {
|
||||||
|
n, _, err := l.freeAndCheckEmpty(t)
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// freeAndCheckEmpty performs free() and atomically checks if the level is empty
|
||||||
|
// while still holding its own lock, avoiding a TOCTOU race between free() and
|
||||||
|
// a separate isEmpty() call.
|
||||||
|
func (l *Level) freeAndCheckEmpty(t int64) (int, bool, error) {
|
||||||
l.lock.Lock()
|
l.lock.Lock()
|
||||||
defer l.lock.Unlock()
|
defer l.lock.Unlock()
|
||||||
|
|
||||||
@@ -201,30 +209,28 @@ func (l *Level) free(t int64) (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for key, child := range l.children {
|
for key, child := range l.children {
|
||||||
m, err := child.free(t)
|
m, empty, err := child.freeAndCheckEmpty(t)
|
||||||
n += m
|
n += m
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return n, err
|
return n, false, err
|
||||||
}
|
}
|
||||||
if child.isEmpty() {
|
if empty {
|
||||||
delete(l.children, key)
|
delete(l.children, key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return n, nil
|
// Check emptiness while still holding the lock
|
||||||
}
|
empty := len(l.children) == 0
|
||||||
|
if empty {
|
||||||
// isEmpty returns true if this level has no metrics and no children.
|
for _, b := range l.metrics {
|
||||||
func (l *Level) isEmpty() bool {
|
if b != nil {
|
||||||
l.lock.RLock()
|
empty = false
|
||||||
defer l.lock.RUnlock()
|
break
|
||||||
|
}
|
||||||
for _, b := range l.metrics {
|
|
||||||
if b != nil {
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return len(l.children) == 0
|
|
||||||
|
return n, empty, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// forceFree removes the oldest buffer from each metric chain in the subtree.
|
// forceFree removes the oldest buffer from each metric chain in the subtree.
|
||||||
|
|||||||
@@ -754,6 +754,9 @@ func (m *MemoryStore) ForceFree() (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *MemoryStore) FreeAll() error {
|
func (m *MemoryStore) FreeAll() error {
|
||||||
|
m.root.lock.Lock()
|
||||||
|
defer m.root.lock.Unlock()
|
||||||
|
|
||||||
for k := range m.root.children {
|
for k := range m.root.children {
|
||||||
delete(m.root.children, k)
|
delete(m.root.children, k)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ func (b *buffer) stats(from, to int64) (Stats, int64, int64, error) {
|
|||||||
if b == nil {
|
if b == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
idx = 0
|
idx = int((t - b.start) / b.frequency)
|
||||||
}
|
}
|
||||||
|
|
||||||
if t < b.start || idx >= len(b.data) {
|
if t < b.start || idx >= len(b.data) {
|
||||||
|
|||||||
Reference in New Issue
Block a user