Add documentation

This commit is contained in:
2026-01-16 08:27:46 +01:00
parent 93dcfee8c5
commit 9a97d0e8eb
6 changed files with 782 additions and 111 deletions

View File

@@ -3,6 +3,9 @@
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
// This file contains the API types and data fetching logic for querying metric data
// from the in-memory metric store. It provides structures for building complex queries
// with support for aggregation, scaling, padding, and statistics computation.
package metricstore package metricstore
import ( import (
@@ -15,10 +18,17 @@ import (
) )
var ( var (
// ErrInvalidTimeRange is returned when a query has 'from' >= 'to'
ErrInvalidTimeRange = errors.New("[METRICSTORE]> invalid time range: 'from' must be before 'to'") ErrInvalidTimeRange = errors.New("[METRICSTORE]> invalid time range: 'from' must be before 'to'")
ErrEmptyCluster = errors.New("[METRICSTORE]> cluster name cannot be empty") // ErrEmptyCluster is returned when a query with ForAllNodes has no cluster specified
ErrEmptyCluster = errors.New("[METRICSTORE]> cluster name cannot be empty")
) )
// APIMetricData represents the response data for a single metric query.
//
// It contains both the time-series data points and computed statistics (avg, min, max).
// If an error occurred during data retrieval, the Error field will be set and other
// fields may be incomplete.
type APIMetricData struct { type APIMetricData struct {
Error *string `json:"error,omitempty"` Error *string `json:"error,omitempty"`
Data schema.FloatArray `json:"data,omitempty"` Data schema.FloatArray `json:"data,omitempty"`
@@ -30,6 +40,13 @@ type APIMetricData struct {
Max schema.Float `json:"max"` Max schema.Float `json:"max"`
} }
// APIQueryRequest represents a batch query request for metric data.
//
// It supports two modes of operation:
// 1. Explicit queries via the Queries field
// 2. Automatic query generation via ForAllNodes (queries all specified metrics for all nodes in the cluster)
//
// The request can be customized with flags to include/exclude statistics, raw data, and padding.
type APIQueryRequest struct { type APIQueryRequest struct {
Cluster string `json:"cluster"` Cluster string `json:"cluster"`
Queries []APIQuery `json:"queries"` Queries []APIQuery `json:"queries"`
@@ -41,11 +58,25 @@ type APIQueryRequest struct {
WithPadding bool `json:"with-padding"` WithPadding bool `json:"with-padding"`
} }
// APIQueryResponse represents the response to an APIQueryRequest.
//
// Results is a 2D array where each outer element corresponds to a query,
// and each inner element corresponds to a selector within that query
// (e.g., multiple CPUs or cores).
type APIQueryResponse struct { type APIQueryResponse struct {
Queries []APIQuery `json:"queries,omitempty"` Queries []APIQuery `json:"queries,omitempty"`
Results [][]APIMetricData `json:"results"` Results [][]APIMetricData `json:"results"`
} }
// APIQuery represents a single metric query with optional hierarchical selectors.
//
// The hierarchical selection works as follows:
// - Hostname: The node to query
// - Type + TypeIds: First level of hierarchy (e.g., "cpu" + ["0", "1", "2"])
// - SubType + SubTypeIds: Second level of hierarchy (e.g., "core" + ["0", "1"])
//
// If Aggregate is true, data from multiple type/subtype IDs will be aggregated according
// to the metric's aggregation strategy. Otherwise, separate results are returned for each combination.
type APIQuery struct { type APIQuery struct {
Type *string `json:"type,omitempty"` Type *string `json:"type,omitempty"`
SubType *string `json:"subtype,omitempty"` SubType *string `json:"subtype,omitempty"`
@@ -58,6 +89,11 @@ type APIQuery struct {
Aggregate bool `json:"aggreg"` Aggregate bool `json:"aggreg"`
} }
// AddStats computes and populates the Avg, Min, and Max fields from the Data array.
//
// NaN values in the data are ignored during computation. If all values are NaN,
// the statistics fields will be set to NaN.
//
// TODO: Optimize this, just like the stats endpoint! // TODO: Optimize this, just like the stats endpoint!
func (data *APIMetricData) AddStats() { func (data *APIMetricData) AddStats() {
n := 0 n := 0
@@ -83,6 +119,10 @@ func (data *APIMetricData) AddStats() {
} }
} }
// ScaleBy multiplies all data points and statistics by the given factor.
//
// This is commonly used for unit conversion (e.g., bytes to gigabytes).
// Scaling by 0 or 1 is a no-op for performance reasons.
func (data *APIMetricData) ScaleBy(f schema.Float) { func (data *APIMetricData) ScaleBy(f schema.Float) {
if f == 0 || f == 1 { if f == 0 || f == 1 {
return return
@@ -96,6 +136,17 @@ func (data *APIMetricData) ScaleBy(f schema.Float) {
} }
} }
// PadDataWithNull pads the beginning of the data array with NaN values if needed.
//
// This ensures that the data aligns with the requested 'from' timestamp, even if
// the metric store doesn't have data for the earliest time points. This is useful
// for maintaining consistent array indexing across multiple queries.
//
// Parameters:
// - ms: MemoryStore instance to lookup metric configuration
// - from: The requested start timestamp
// - to: The requested end timestamp (unused but kept for API consistency)
// - metric: The metric name to lookup frequency information
func (data *APIMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metric string) { func (data *APIMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metric string) {
minfo, ok := ms.Metrics[metric] minfo, ok := ms.Metrics[metric]
if !ok { if !ok {
@@ -115,6 +166,31 @@ func (data *APIMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metr
} }
} }
// FetchData executes a batch metric query request and returns the results.
//
// This is the primary API for retrieving metric data from the memory store. It supports:
// - Individual queries via req.Queries
// - Batch queries for all nodes via req.ForAllNodes
// - Hierarchical selector construction (cluster → host → type → subtype)
// - Optional statistics computation (avg, min, max)
// - Optional data scaling
// - Optional data padding with NaN values
//
// The function constructs selectors based on the query parameters and calls MemoryStore.Read()
// for each selector. If a query specifies Aggregate=false with multiple type/subtype IDs,
// separate results are returned for each combination.
//
// Parameters:
// - req: The query request containing queries, time range, and options
//
// Returns:
// - APIQueryResponse containing results for each query, or error if validation fails
//
// Errors:
// - ErrInvalidTimeRange if req.From > req.To
// - ErrEmptyCluster if req.ForAllNodes is used without specifying a cluster
// - Error if MemoryStore is not initialized
// - Individual query errors are stored in APIMetricData.Error field
func FetchData(req APIQueryRequest) (*APIQueryResponse, error) { func FetchData(req APIQueryRequest) (*APIQueryResponse, error) {
if req.From > req.To { if req.From > req.To {
return nil, ErrInvalidTimeRange return nil, ErrInvalidTimeRange
@@ -126,7 +202,7 @@ func FetchData(req APIQueryRequest) (*APIQueryResponse, error) {
req.WithData = true req.WithData = true
ms := GetMemoryStore() ms := GetMemoryStore()
if ms == nil { if ms == nil {
return nil, fmt.Errorf("memorystore not initialized") return nil, fmt.Errorf("[METRICSTORE]> memorystore not initialized")
} }
response := APIQueryResponse{ response := APIQueryResponse{
@@ -195,8 +271,6 @@ func FetchData(req APIQueryRequest) (*APIQueryResponse, error) {
} }
} }
// log.Printf("query: %#v\n", query)
// log.Printf("sels: %#v\n", sels)
var err error var err error
res := make([]APIMetricData, 0, len(sels)) res := make([]APIMetricData, 0, len(sels))
for _, sel := range sels { for _, sel := range sels {

View File

@@ -3,6 +3,41 @@
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
// Package metricstore provides buffer.go: Time-series data buffer implementation.
//
// # Buffer Architecture
//
// Each metric at each hierarchical level (cluster/host/cpu/etc.) uses a linked-list
// chain of fixed-size buffers to store time-series data. This design:
//
// - Avoids reallocation/copying when growing (new links added instead)
// - Enables efficient pooling (buffers returned to sync.Pool)
// - Supports traversal back in time (via prev pointers)
// - Maintains temporal ordering (newer data in later buffers)
//
// # Buffer Chain Example
//
// [oldest buffer] <- prev -- [older] <- prev -- [newest buffer (head)]
// start=1000 start=1512 start=2024
// data=[v0...v511] data=[v0...v511] data=[v0...v42]
//
// When the head buffer reaches capacity (BufferCap = 512), a new buffer becomes
// the new head and the old head is linked via prev.
//
// # Pooling Strategy
//
// sync.Pool reduces GC pressure for the common case (BufferCap-sized allocations).
// Non-standard capacity buffers are not pooled (e.g., from checkpoint deserialization).
//
// # Time Alignment
//
// Timestamps are aligned to measurement frequency intervals:
//
// index = (timestamp - buffer.start) / buffer.frequency
// actualTime = buffer.start + (frequency / 2) + (index * frequency)
//
// Missing data points are represented as NaN values. The read() function performs
// linear interpolation where possible.
package metricstore package metricstore
import ( import (
@@ -27,14 +62,30 @@ var bufferPool sync.Pool = sync.Pool{
} }
var ( var (
ErrNoData error = errors.New("[METRICSTORE]> no data for this metric/level") // ErrNoData indicates no time-series data exists for the requested metric/level.
ErrNoData error = errors.New("[METRICSTORE]> no data for this metric/level")
// ErrDataDoesNotAlign indicates that aggregated data from child scopes
// does not align with the parent scope's expected timestamps/intervals.
ErrDataDoesNotAlign error = errors.New("[METRICSTORE]> data from lower granularities does not align") ErrDataDoesNotAlign error = errors.New("[METRICSTORE]> data from lower granularities does not align")
) )
// Each metric on each level has it's own buffer. // buffer stores time-series data for a single metric at a specific hierarchical level.
// This is where the actual values go. //
// If `cap(data)` is reached, a new buffer is created and // Buffers form doubly-linked chains ordered by time. When capacity is reached,
// becomes the new head of a buffer list. // a new buffer becomes the head and the old head is linked via prev/next.
//
// Fields:
// - prev: Link to older buffer in the chain (nil if this is oldest)
// - next: Link to newer buffer in the chain (nil if this is newest/head)
// - data: Time-series values (schema.Float supports NaN for missing data)
// - frequency: Measurement interval in seconds
// - start: Start timestamp (adjusted by -frequency/2 for alignment)
// - archived: True if data has been persisted to disk archive
// - closed: True if buffer is no longer accepting writes
//
// Index calculation: index = (timestamp - start) / frequency
// Actual data timestamp: start + (frequency / 2) + (index * frequency)
type buffer struct { type buffer struct {
prev *buffer prev *buffer
next *buffer next *buffer
@@ -57,10 +108,22 @@ func newBuffer(ts, freq int64) *buffer {
return b return b
} }
// If a new buffer was created, the new head is returnd. // write appends a timestamped value to the buffer chain.
// Otherwise, the existing buffer is returnd. //
// Normaly, only "newer" data should be written, but if the value would // Returns the head buffer (which may be newly created if capacity was reached).
// end up in the same buffer anyways it is allowed. // Timestamps older than the buffer's start are rejected. If the calculated index
// exceeds capacity, a new buffer is allocated and linked as the new head.
//
// Missing timestamps are automatically filled with NaN values to maintain alignment.
// Overwrites are allowed if the index is already within the existing data slice.
//
// Parameters:
// - ts: Unix timestamp in seconds
// - value: Metric value (can be schema.NaN for missing data)
//
// Returns:
// - *buffer: The new head buffer (same as b if no new buffer created)
// - error: Non-nil if timestamp is before buffer start
func (b *buffer) write(ts int64, value schema.Float) (*buffer, error) { func (b *buffer) write(ts int64, value schema.Float) (*buffer, error) {
if ts < b.start { if ts < b.start {
return nil, errors.New("[METRICSTORE]> cannot write value to buffer from past") return nil, errors.New("[METRICSTORE]> cannot write value to buffer from past")
@@ -99,13 +162,27 @@ func (b *buffer) firstWrite() int64 {
return b.start + (b.frequency / 2) return b.start + (b.frequency / 2)
} }
// Return all known values from `from` to `to`. Gaps of information are represented as NaN. // read retrieves time-series data from the buffer chain for the specified time range.
// Simple linear interpolation is done between the two neighboring cells if possible. //
// If values at the start or end are missing, instead of NaN values, the second and thrid // Traverses the buffer chain backwards (via prev links) if 'from' precedes the current
// return values contain the actual `from`/`to`. // buffer's start. Missing data points are represented as NaN. Values are accumulated
// This function goes back the buffer chain if `from` is older than the currents buffer start. // into the provided 'data' slice (using +=, so caller must zero-initialize if needed).
// The loaded values are added to `data` and `data` is returned, possibly with a shorter length. //
// If `data` is not long enough to hold all values, this function will panic! // The function adjusts the actual time range returned if data is unavailable at the
// boundaries (returned via adjusted from/to timestamps).
//
// Parameters:
// - from: Start timestamp (Unix seconds)
// - to: End timestamp (Unix seconds, exclusive)
// - data: Pre-allocated slice to accumulate results (must be large enough)
//
// Returns:
// - []schema.Float: Slice of data (may be shorter than input 'data' slice)
// - int64: Actual start timestamp with available data
// - int64: Actual end timestamp (exclusive)
// - error: Non-nil on failure
//
// Panics if 'data' slice is too small to hold all values in [from, to).
func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int64, int64, error) { func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int64, int64, error) {
if from < b.firstWrite() { if from < b.firstWrite() {
if b.prev != nil { if b.prev != nil {
@@ -142,7 +219,18 @@ func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int6
return data[:i], from, t, nil return data[:i], from, t, nil
} }
// Returns true if this buffer needs to be freed. // free removes buffers older than the specified timestamp from the chain.
//
// Recursively traverses backwards (via prev) and unlinks buffers whose end time
// is before the retention threshold. Freed buffers are returned to the pool if
// they have the standard capacity (BufferCap).
//
// Parameters:
// - t: Retention threshold timestamp (Unix seconds)
//
// Returns:
// - delme: True if the current buffer itself should be deleted by caller
// - n: Number of buffers freed in this subtree
func (b *buffer) free(t int64) (delme bool, n int) { func (b *buffer) free(t int64) (delme bool, n int) {
if b.prev != nil { if b.prev != nil {
delme, m := b.prev.free(t) delme, m := b.prev.free(t)
@@ -196,7 +284,19 @@ func (b *buffer) forceFreeOldest() (delme bool, n int) {
return true, 1 return true, 1
} }
// Call `callback` on every buffer that contains data in the range from `from` to `to`. // iterFromTo invokes callback on every buffer in the chain that overlaps [from, to].
//
// Traverses backwards (via prev) first, then processes current buffer if it overlaps
// the time range. Used for checkpoint/archive operations that need to serialize buffers
// within a specific time window.
//
// Parameters:
// - from: Start timestamp (Unix seconds, inclusive)
// - to: End timestamp (Unix seconds, inclusive)
// - callback: Function to invoke on each overlapping buffer
//
// Returns:
// - error: First error returned by callback, or nil if all succeeded
func (b *buffer) iterFromTo(from, to int64, callback func(b *buffer) error) error { func (b *buffer) iterFromTo(from, to int64, callback func(b *buffer) error) error {
if b == nil { if b == nil {
return nil return nil

View File

@@ -3,6 +3,34 @@
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
// This file implements checkpoint persistence for the in-memory metric store.
//
// Checkpoints enable graceful restarts by periodically saving in-memory metric
// data to disk in either JSON or Avro format. The checkpoint system:
//
// Key Features:
// - Periodic background checkpointing via the Checkpointing() worker
// - Two formats: JSON (human-readable) and Avro (compact, efficient)
// - Parallel checkpoint creation and loading using worker pools
// - Hierarchical file organization: checkpoint_dir/cluster/host/timestamp.{json|avro}
// - Only saves unarchived data (archived data is already persisted elsewhere)
// - Automatic format detection and fallback during loading
// - GC optimization during loading to prevent excessive heap growth
//
// Checkpoint Workflow:
// 1. Init() loads checkpoints within retention window at startup
// 2. Checkpointing() worker periodically saves new data
// 3. Shutdown() writes final checkpoint before exit
//
// File Organization:
//
// checkpoints/
// cluster1/
// host001/
// 1234567890.json (timestamp = checkpoint start time)
// 1234567950.json
// host002/
// ...
package metricstore package metricstore
import ( import (
@@ -29,18 +57,21 @@ import (
) )
const ( const (
CheckpointFilePerms = 0o644 CheckpointFilePerms = 0o644 // File permissions for checkpoint files
CheckpointDirPerms = 0o755 CheckpointDirPerms = 0o755 // Directory permissions for checkpoint directories
GCTriggerInterval = DefaultGCTriggerInterval GCTriggerInterval = DefaultGCTriggerInterval // Interval for triggering GC during checkpoint loading
) )
// Whenever changed, update MarshalJSON as well! // CheckpointMetrics represents metric data in a checkpoint file.
// Whenever the structure changes, update MarshalJSON as well!
type CheckpointMetrics struct { type CheckpointMetrics struct {
Data []schema.Float `json:"data"` Data []schema.Float `json:"data"`
Frequency int64 `json:"frequency"` Frequency int64 `json:"frequency"`
Start int64 `json:"start"` Start int64 `json:"start"`
} }
// CheckpointFile represents the hierarchical structure of a checkpoint file.
// It mirrors the Level tree structure from the MemoryStore.
type CheckpointFile struct { type CheckpointFile struct {
Metrics map[string]*CheckpointMetrics `json:"metrics"` Metrics map[string]*CheckpointMetrics `json:"metrics"`
Children map[string]*CheckpointFile `json:"children"` Children map[string]*CheckpointFile `json:"children"`
@@ -48,10 +79,23 @@ type CheckpointFile struct {
To int64 `json:"to"` To int64 `json:"to"`
} }
var lastCheckpoint time.Time // lastCheckpoint tracks the timestamp of the last checkpoint creation.
var (
lastCheckpoint time.Time
lastCheckpointMu sync.Mutex
)
// Checkpointing starts a background worker that periodically saves metric data to disk.
//
// The behavior depends on the configured file format:
// - JSON: Periodic checkpointing based on Keys.Checkpoints.Interval
// - Avro: Initial delay + periodic checkpointing at DefaultAvroCheckpointInterval
//
// The worker respects context cancellation and signals completion via the WaitGroup.
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
lastCheckpointMu.Lock()
lastCheckpoint = time.Now() lastCheckpoint = time.Now()
lastCheckpointMu.Unlock()
if Keys.Checkpoints.FileFormat == "json" { if Keys.Checkpoints.FileFormat == "json" {
ms := GetMemoryStore() ms := GetMemoryStore()
@@ -60,9 +104,10 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
defer wg.Done() defer wg.Done()
d, err := time.ParseDuration(Keys.Checkpoints.Interval) d, err := time.ParseDuration(Keys.Checkpoints.Interval)
if err != nil { if err != nil {
cclog.Fatal(err) cclog.Fatalf("[METRICSTORE]> invalid checkpoint interval '%s': %s", Keys.Checkpoints.Interval, err.Error())
} }
if d <= 0 { if d <= 0 {
cclog.Warnf("[METRICSTORE]> checkpoint interval is zero or negative (%s), checkpointing disabled", d)
return return
} }
@@ -74,15 +119,21 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-ticker.C: case <-ticker.C:
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", lastCheckpoint.Format(time.RFC3339)) lastCheckpointMu.Lock()
from := lastCheckpoint
lastCheckpointMu.Unlock()
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339))
now := time.Now() now := time.Now()
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir,
lastCheckpoint.Unix(), now.Unix()) from.Unix(), now.Unix())
if err != nil { if err != nil {
cclog.Errorf("[METRICSTORE]> checkpointing failed: %s", err.Error()) cclog.Errorf("[METRICSTORE]> checkpointing failed: %s", err.Error())
} else { } else {
cclog.Infof("[METRICSTORE]> done: %d checkpoint files created", n) cclog.Infof("[METRICSTORE]> done: %d checkpoint files created", n)
lastCheckpointMu.Lock()
lastCheckpoint = now lastCheckpoint = now
lastCheckpointMu.Unlock()
} }
} }
} }
@@ -113,9 +164,10 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
} }
} }
// As `Float` implements a custom MarshalJSON() function, // MarshalJSON provides optimized JSON encoding for CheckpointMetrics.
// serializing an array of such types has more overhead //
// than one would assume (because of extra allocations, interfaces and so on). // Since schema.Float has custom MarshalJSON, serializing []Float has significant overhead.
// This method manually constructs JSON to avoid allocations and interface conversions.
func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) { func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) {
buf := make([]byte, 0, 128+len(cm.Data)*8) buf := make([]byte, 0, 128+len(cm.Data)*8)
buf = append(buf, `{"frequency":`...) buf = append(buf, `{"frequency":`...)
@@ -137,13 +189,27 @@ func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) {
return buf, nil return buf, nil
} }
// Metrics stored at the lowest 2 levels are not stored away (root and cluster)! // ToCheckpoint writes metric data to checkpoint files in parallel.
// On a per-host basis a new JSON file is created. I have no idea if this will scale. //
// The good thing: Only a host at a time is locked, so this function can run // Metrics at root and cluster levels are skipped. One file per host is created.
// in parallel to writes/reads. // Uses worker pool (Keys.NumWorkers) for parallel processing. Only locks one host
// at a time, allowing concurrent writes/reads to other hosts.
//
// Returns the number of checkpoint files created and any errors encountered.
func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) {
levels := make([]*Level, 0) // Pre-calculate capacity by counting cluster/host pairs
selectors := make([][]string, 0) m.root.lock.RLock()
totalHosts := 0
for _, l1 := range m.root.children {
l1.lock.RLock()
totalHosts += len(l1.children)
l1.lock.RUnlock()
}
m.root.lock.RUnlock()
levels := make([]*Level, 0, totalHosts)
selectors := make([][]string, 0, totalHosts)
m.root.lock.RLock() m.root.lock.RLock()
for sel1, l1 := range m.root.children { for sel1, l1 := range m.root.children {
l1.lock.RLock() l1.lock.RLock()
@@ -203,6 +269,8 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) {
return int(n), nil return int(n), nil
} }
// toCheckpointFile recursively converts a Level tree to CheckpointFile structure.
// Skips metrics that are already archived. Returns nil if no unarchived data exists.
func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFile, error) { func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFile, error) {
l.lock.RLock() l.lock.RLock()
defer l.lock.RUnlock() defer l.lock.RUnlock()
@@ -224,6 +292,7 @@ func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil
b.iterFromTo(from, to, func(b *buffer) error { b.iterFromTo(from, to, func(b *buffer) error {
if !b.archived { if !b.archived {
allArchived = false allArchived = false
return fmt.Errorf("stop") // Early termination signal
} }
return nil return nil
}) })
@@ -267,6 +336,8 @@ func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil
return retval, nil return retval, nil
} }
// toCheckpoint writes a Level's data to a JSON checkpoint file.
// Creates directory if needed. Returns ErrNoNewArchiveData if nothing to save.
func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
cf, err := l.toCheckpointFile(from, to, m) cf, err := l.toCheckpointFile(from, to, m)
if err != nil { if err != nil {
@@ -278,11 +349,11 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
} }
filepath := path.Join(dir, fmt.Sprintf("%d.json", from)) filepath := path.Join(dir, fmt.Sprintf("%d.json", from))
f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms) f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
if err != nil && os.IsNotExist(err) { if err != nil && os.IsNotExist(err) {
err = os.MkdirAll(dir, CheckpointDirPerms) err = os.MkdirAll(dir, CheckpointDirPerms)
if err == nil { if err == nil {
f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms) f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
} }
} }
if err != nil { if err != nil {
@@ -298,9 +369,54 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
return bw.Flush() return bw.Flush()
} }
// enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs.
// Returns error if directory structure is invalid.
func enqueueCheckpointHosts(dir string, work chan<- [2]string) error {
clustersDir, err := os.ReadDir(dir)
if err != nil {
return err
}
gcCounter := 0
for _, clusterDir := range clustersDir {
if !clusterDir.IsDir() {
return errors.New("[METRICSTORE]> expected only directories at first level of checkpoints/ directory")
}
hostsDir, err := os.ReadDir(filepath.Join(dir, clusterDir.Name()))
if err != nil {
return err
}
for _, hostDir := range hostsDir {
if !hostDir.IsDir() {
return errors.New("[METRICSTORE]> expected only directories at second level of checkpoints/ directory")
}
gcCounter++
if gcCounter%GCTriggerInterval == 0 {
// Forcing garbage collection runs here regulary during the loading of checkpoints
// will decrease the total heap size after loading everything back to memory is done.
// While loading data, the heap will grow fast, so the GC target size will double
// almost always. By forcing GCs here, we can keep it growing more slowly so that
// at the end, less memory is wasted.
runtime.GC()
}
work <- [2]string{clusterDir.Name(), hostDir.Name()}
}
}
return nil
}
// FromCheckpoint loads checkpoint files from disk into memory in parallel.
//
// Uses worker pool to load cluster/host combinations. Periodically triggers GC
// to prevent excessive heap growth. Returns number of files loaded and any errors.
func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (int, error) { func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (int, error) {
var wg sync.WaitGroup var wg sync.WaitGroup
work := make(chan [2]string, Keys.NumWorkers) work := make(chan [2]string, Keys.NumWorkers*4)
n, errs := int32(0), int32(0) n, errs := int32(0), int32(0)
wg.Add(Keys.NumWorkers) wg.Add(Keys.NumWorkers)
@@ -319,40 +435,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (
}() }()
} }
i := 0 err := enqueueCheckpointHosts(dir, work)
clustersDir, err := os.ReadDir(dir)
for _, clusterDir := range clustersDir {
if !clusterDir.IsDir() {
err = errors.New("[METRICSTORE]> expected only directories at first level of checkpoints/ directory")
goto done
}
hostsDir, e := os.ReadDir(filepath.Join(dir, clusterDir.Name()))
if e != nil {
err = e
goto done
}
for _, hostDir := range hostsDir {
if !hostDir.IsDir() {
err = errors.New("[METRICSTORE]> expected only directories at second level of checkpoints/ directory")
goto done
}
i++
if i%Keys.NumWorkers == 0 && i > GCTriggerInterval {
// Forcing garbage collection runs here regulary during the loading of checkpoints
// will decrease the total heap size after loading everything back to memory is done.
// While loading data, the heap will grow fast, so the GC target size will double
// almost always. By forcing GCs here, we can keep it growing more slowly so that
// at the end, less memory is wasted.
runtime.GC()
}
work <- [2]string{clusterDir.Name(), hostDir.Name()}
}
}
done:
close(work) close(work)
wg.Wait() wg.Wait()
@@ -366,9 +449,11 @@ done:
return int(n), nil return int(n), nil
} }
// Metrics stored at the lowest 2 levels are not loaded (root and cluster)! // FromCheckpointFiles is the main entry point for loading checkpoints at startup.
// This function can only be called once and before the very first write or read. //
// Different host's data is loaded to memory in parallel. // Automatically detects checkpoint format (JSON vs Avro) and falls back if needed.
// Creates checkpoint directory if it doesn't exist. This function must be called
// before any writes or reads, and can only be called once.
func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
if _, err := os.Stat(dir); os.IsNotExist(err) { if _, err := os.Stat(dir); os.IsNotExist(err) {
// The directory does not exist, so create it using os.MkdirAll() // The directory does not exist, so create it using os.MkdirAll()
@@ -411,6 +496,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
return 0, nil return 0, nil
} }
// checkFilesWithExtension walks a directory tree to check if files with the given extension exist.
func checkFilesWithExtension(dir string, extension string) (bool, error) { func checkFilesWithExtension(dir string, extension string) (bool, error) {
found := false found := false

View File

@@ -3,6 +3,43 @@
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
// Package metricstore provides config.go: Configuration structures and metric management.
//
// # Configuration Hierarchy
//
// The metricstore package uses nested configuration structures:
//
// MetricStoreConfig (Keys)
// ├─ NumWorkers: Parallel checkpoint/archive workers
// ├─ RetentionInMemory: How long to keep data in RAM
// ├─ MemoryCap: Memory limit in bytes (triggers forceFree)
// ├─ Checkpoints: Persistence configuration
// │ ├─ FileFormat: "avro" or "json"
// │ ├─ Interval: How often to save (e.g., "1h")
// │ └─ RootDir: Checkpoint storage path
// ├─ Archive: Long-term storage configuration
// │ ├─ ArchiveInterval: How often to archive
// │ ├─ RootDir: Archive storage path
// │ └─ DeleteInstead: Delete old data instead of archiving
// ├─ Debug: Development/debugging options
// └─ Subscriptions: NATS topic subscriptions for metric ingestion
//
// # Metric Configuration
//
// Each metric (e.g., "cpu_load", "mem_used") has a MetricConfig entry in the global
// Metrics map, defining:
//
// - Frequency: Measurement interval in seconds
// - Aggregation: How to combine values (sum/avg/none) when transforming scopes
// - offset: Internal index into Level.metrics slice (assigned during Init)
//
// # AggregationStrategy
//
// Determines how to combine metric values when aggregating from finer to coarser scopes:
//
// - NoAggregation: Do not combine (incompatible scopes)
// - SumAggregation: Add values (e.g., power consumption: core→socket)
// - AvgAggregation: Average values (e.g., temperature: core→socket)
package metricstore package metricstore
import ( import (
@@ -19,23 +56,50 @@ const (
DefaultAvroCheckpointInterval = time.Minute DefaultAvroCheckpointInterval = time.Minute
) )
// Checkpoints configures periodic persistence of in-memory metric data.
//
// Fields:
// - FileFormat: "avro" (default, binary, compact) or "json" (human-readable, slower)
// - Interval: Duration string (e.g., "1h", "30m") between checkpoint saves
// - RootDir: Filesystem path for checkpoint files (created if missing)
type Checkpoints struct { type Checkpoints struct {
FileFormat string `json:"file-format"` FileFormat string `json:"file-format"`
Interval string `json:"interval"` Interval string `json:"interval"`
RootDir string `json:"directory"` RootDir string `json:"directory"`
} }
// Debug provides development and profiling options.
//
// Fields:
// - DumpToFile: Path to dump checkpoint data for inspection (empty = disabled)
// - EnableGops: Enable gops agent for live runtime debugging (https://github.com/google/gops)
type Debug struct { type Debug struct {
DumpToFile string `json:"dump-to-file"` DumpToFile string `json:"dump-to-file"`
EnableGops bool `json:"gops"` EnableGops bool `json:"gops"`
} }
// Archive configures long-term storage of old metric data.
//
// Data older than RetentionInMemory is archived to disk or deleted.
//
// Fields:
// - ArchiveInterval: Duration string (e.g., "24h") between archive operations
// - RootDir: Filesystem path for archived data (created if missing)
// - DeleteInstead: If true, delete old data instead of archiving (saves disk space)
type Archive struct { type Archive struct {
ArchiveInterval string `json:"interval"` ArchiveInterval string `json:"interval"`
RootDir string `json:"directory"` RootDir string `json:"directory"`
DeleteInstead bool `json:"delete-instead"` DeleteInstead bool `json:"delete-instead"`
} }
// Subscriptions defines NATS topics to subscribe to for metric ingestion.
//
// Each subscription receives metrics via NATS messaging, enabling real-time
// data collection from compute nodes.
//
// Fields:
// - SubscribeTo: NATS subject/channel name (e.g., "metrics.compute.*")
// - ClusterTag: Default cluster name for metrics without cluster tag (optional)
type Subscriptions []struct { type Subscriptions []struct {
// Channel name // Channel name
SubscribeTo string `json:"subscribe-to"` SubscribeTo string `json:"subscribe-to"`
@@ -44,6 +108,19 @@ type Subscriptions []struct {
ClusterTag string `json:"cluster-tag"` ClusterTag string `json:"cluster-tag"`
} }
// MetricStoreConfig defines the main configuration for the metricstore.
//
// Loaded from cc-backend's config.json "metricstore" section. Controls memory usage,
// persistence, archiving, and metric ingestion.
//
// Fields:
// - NumWorkers: Parallel workers for checkpoint/archive (0 = auto: min(NumCPU/2+1, 10))
// - RetentionInMemory: Duration string (e.g., "48h") for in-memory data retention
// - MemoryCap: Max bytes for buffer data (0 = unlimited); triggers forceFree when exceeded
// - Checkpoints: Periodic persistence configuration
// - Debug: Development/profiling options (nil = disabled)
// - Archive: Long-term storage configuration (nil = disabled)
// - Subscriptions: NATS topics for metric ingestion (nil = polling only)
type MetricStoreConfig struct { type MetricStoreConfig struct {
// Number of concurrent workers for checkpoint and archive operations. // Number of concurrent workers for checkpoint and archive operations.
// If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10) // If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10)
@@ -56,6 +133,10 @@ type MetricStoreConfig struct {
Subscriptions *Subscriptions `json:"nats-subscriptions"` Subscriptions *Subscriptions `json:"nats-subscriptions"`
} }
// Keys is the global metricstore configuration instance.
//
// Initialized with defaults, then overwritten by cc-backend's config.json.
// Accessed by Init(), Checkpointing(), and other lifecycle functions.
var Keys MetricStoreConfig = MetricStoreConfig{ var Keys MetricStoreConfig = MetricStoreConfig{
Checkpoints: Checkpoints{ Checkpoints: Checkpoints{
FileFormat: "avro", FileFormat: "avro",
@@ -63,15 +144,33 @@ var Keys MetricStoreConfig = MetricStoreConfig{
}, },
} }
// AggregationStrategy for aggregation over multiple values at different cpus/sockets/..., not time! // AggregationStrategy defines how to combine metric values across hierarchy levels.
//
// Used when transforming data from finer-grained scopes (e.g., core) to coarser scopes
// (e.g., socket). This is SPATIAL aggregation, not TEMPORAL (time-based) aggregation.
//
// Values:
// - NoAggregation: Do not aggregate (incompatible scopes or non-aggregatable metrics)
// - SumAggregation: Add values (e.g., power: sum core power → socket power)
// - AvgAggregation: Average values (e.g., temperature: average core temps → socket temp)
type AggregationStrategy int type AggregationStrategy int
const ( const (
NoAggregation AggregationStrategy = iota NoAggregation AggregationStrategy = iota // Do not aggregate
SumAggregation SumAggregation // Sum values (e.g., power, energy)
AvgAggregation AvgAggregation // Average values (e.g., temperature, utilization)
) )
// AssignAggregationStrategy parses a string into an AggregationStrategy value.
//
// Used when loading metric configurations from JSON/YAML files.
//
// Parameters:
// - str: "sum", "avg", or "" (empty string for NoAggregation)
//
// Returns:
// - AggregationStrategy: Parsed value
// - error: Non-nil if str is unrecognized
func AssignAggregationStrategy(str string) (AggregationStrategy, error) { func AssignAggregationStrategy(str string) (AggregationStrategy, error) {
switch str { switch str {
case "": case "":
@@ -85,6 +184,14 @@ func AssignAggregationStrategy(str string) (AggregationStrategy, error) {
} }
} }
// MetricConfig defines configuration for a single metric type.
//
// Stored in the global Metrics map, keyed by metric name (e.g., "cpu_load").
//
// Fields:
// - Frequency: Measurement interval in seconds (e.g., 60 for 1-minute granularity)
// - Aggregation: How to combine values across hierarchy levels (sum/avg/none)
// - offset: Internal index into Level.metrics slice (assigned during Init)
type MetricConfig struct { type MetricConfig struct {
// Interval in seconds at which measurements are stored // Interval in seconds at which measurements are stored
Frequency int64 Frequency int64
@@ -96,8 +203,21 @@ type MetricConfig struct {
offset int offset int
} }
// Metrics is the global map of metric configurations.
//
// Keyed by metric name (e.g., "cpu_load", "mem_used"). Populated during Init()
// from cluster configuration and checkpoint restoration. Each MetricConfig.offset
// corresponds to the buffer slice index in Level.metrics.
var Metrics map[string]MetricConfig var Metrics map[string]MetricConfig
// GetMetricFrequency retrieves the measurement interval for a metric.
//
// Parameters:
// - metricName: Metric name (e.g., "cpu_load")
//
// Returns:
// - int64: Frequency in seconds
// - error: Non-nil if metric not found in Metrics map
func GetMetricFrequency(metricName string) (int64, error) { func GetMetricFrequency(metricName string) (int64, error) {
if metric, ok := Metrics[metricName]; ok { if metric, ok := Metrics[metricName]; ok {
return metric.Frequency, nil return metric.Frequency, nil
@@ -105,9 +225,18 @@ func GetMetricFrequency(metricName string) (int64, error) {
return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName) return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName)
} }
// AddMetric adds logic to add metrics. Redundant metrics should be updated with max frequency. // AddMetric registers a new metric or updates an existing one.
// use metric.Name to check if the metric already exists. //
// if not, add it to the Metrics map. // If the metric already exists with a different frequency, uses the higher frequency
// (finer granularity). This handles cases where different clusters report the same
// metric at different intervals.
//
// Parameters:
// - name: Metric name (e.g., "cpu_load")
// - metric: Configuration (frequency, aggregation strategy)
//
// Returns:
// - error: Always nil (signature for future error handling)
func AddMetric(name string, metric MetricConfig) error { func AddMetric(name string, metric MetricConfig) error {
if Metrics == nil { if Metrics == nil {
Metrics = make(map[string]MetricConfig, 0) Metrics = make(map[string]MetricConfig, 0)

View File

@@ -3,6 +3,41 @@
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
// Package metricstore provides level.go: Hierarchical tree structure for metric storage.
//
// # Level Architecture
//
// The Level type forms a tree structure where each node represents a level in the
// ClusterCockpit hierarchy: cluster → host → socket → core → hwthread, with special
// nodes for memory domains and accelerators.
//
// Structure:
//
// Root Level (cluster="emmy")
// ├─ Level (host="node001")
// │ ├─ Level (socket="0")
// │ │ ├─ Level (core="0") [stores cpu0 metrics]
// │ │ └─ Level (core="1") [stores cpu1 metrics]
// │ └─ Level (socket="1")
// │ └─ ...
// └─ Level (host="node002")
// └─ ...
//
// Each Level can:
// - Hold data (metrics slice of buffer pointers)
// - Have child nodes (children map[string]*Level)
// - Both simultaneously (inner nodes can store aggregated metrics)
//
// # Selector Paths
//
// Selectors are hierarchical paths: []string{"cluster", "host", "component"}.
// Example: []string{"emmy", "node001", "cpu0"} navigates to the cpu0 core level.
//
// # Concurrency
//
// RWMutex protects children map and metrics slice. Read-heavy workload (metric reads)
// uses RLock. Writes (new levels, buffer updates) use Lock. Double-checked locking
// prevents races during level creation.
package metricstore package metricstore
import ( import (
@@ -12,20 +47,40 @@ import (
"github.com/ClusterCockpit/cc-lib/v2/util" "github.com/ClusterCockpit/cc-lib/v2/util"
) )
// Could also be called "node" as this forms a node in a tree structure. // Level represents a node in the hierarchical metric storage tree.
// Called Level because "node" might be confusing here. //
// Can be both a leaf or a inner node. In this tree structue, inner nodes can // Can be both a leaf or inner node. Inner nodes hold data in 'metrics' for aggregated
// also hold data (in `metrics`). // values (e.g., socket-level metrics derived from core-level data). Named "Level"
// instead of "node" to avoid confusion with cluster nodes (hosts).
//
// Fields:
// - children: Map of child level names to Level pointers (e.g., "cpu0" → Level)
// - metrics: Slice of buffer pointers (one per metric, indexed by MetricConfig.offset)
// - lock: RWMutex for concurrent access (read-heavy, write-rare)
type Level struct { type Level struct {
children map[string]*Level children map[string]*Level
metrics []*buffer metrics []*buffer
lock sync.RWMutex lock sync.RWMutex
} }
// Find the correct level for the given selector, creating it if // findLevelOrCreate navigates to or creates the level specified by selector.
// it does not exist. Example selector in the context of the //
// ClusterCockpit could be: []string{ "emmy", "host123", "cpu0" }. // Recursively descends the tree, creating missing levels as needed. Uses double-checked
// This function would probably benefit a lot from `level.children` beeing a `sync.Map`? // locking: RLock first (fast path), then Lock if creation needed (slow path), then
// re-check after acquiring Lock to handle races.
//
// Example selector: []string{"emmy", "node001", "cpu0"}
// Navigates: root → emmy → node001 → cpu0, creating levels as needed.
//
// Parameters:
// - selector: Hierarchical path (consumed recursively, decreasing depth)
// - nMetrics: Number of metric slots to allocate in new levels
//
// Returns:
// - *Level: The target level (existing or newly created)
//
// Note: sync.Map may improve performance for high-concurrency writes, but current
// approach suffices for read-heavy workload.
func (l *Level) findLevelOrCreate(selector []string, nMetrics int) *Level { func (l *Level) findLevelOrCreate(selector []string, nMetrics int) *Level {
if len(selector) == 0 { if len(selector) == 0 {
return l return l
@@ -72,6 +127,22 @@ func (l *Level) findLevelOrCreate(selector []string, nMetrics int) *Level {
return child.findLevelOrCreate(selector[1:], nMetrics) return child.findLevelOrCreate(selector[1:], nMetrics)
} }
// collectPaths gathers all selector paths at the specified depth in the tree.
//
// Recursively traverses children, collecting paths when currentDepth+1 == targetDepth.
// Each path is a selector that can be used with findLevel() or findBuffers().
//
// Explicitly copies slices to avoid shared underlying arrays between siblings, preventing
// unintended mutations.
//
// Parameters:
// - currentDepth: Depth of current level (0 = root)
// - targetDepth: Depth to collect paths from
// - currentPath: Path accumulated so far
// - results: Output slice (appended to)
//
// Example: collectPaths(0, 2, []string{}, &results) collects all 2-level paths
// like []string{"emmy", "node001"}, []string{"emmy", "node002"}, etc.
func (l *Level) collectPaths(currentDepth, targetDepth int, currentPath []string, results *[][]string) { func (l *Level) collectPaths(currentDepth, targetDepth int, currentPath []string, results *[][]string) {
l.lock.RLock() l.lock.RLock()
defer l.lock.RUnlock() defer l.lock.RUnlock()
@@ -95,6 +166,18 @@ func (l *Level) collectPaths(currentDepth, targetDepth int, currentPath []string
} }
} }
// free removes buffers older than the retention threshold from the entire subtree.
//
// Recursively frees buffers in this level's metrics and all child levels. Buffers
// with standard capacity (BufferCap) are returned to the pool. Called by the
// retention worker to enforce retention policies.
//
// Parameters:
// - t: Retention threshold timestamp (Unix seconds)
//
// Returns:
// - int: Total number of buffers freed in this subtree
// - error: Non-nil on failure (propagated from children)
func (l *Level) free(t int64) (int, error) { func (l *Level) free(t int64) (int, error) {
l.lock.Lock() l.lock.Lock()
defer l.lock.Unlock() defer l.lock.Unlock()
@@ -124,6 +207,17 @@ func (l *Level) free(t int64) (int, error) {
return n, nil return n, nil
} }
// forceFree removes the oldest buffer from each metric chain in the subtree.
//
// Unlike free(), which removes based on time threshold, this unconditionally removes
// the oldest buffer in each chain. Used by MemoryUsageTracker when memory cap is
// exceeded and time-based retention is insufficient.
//
// Recursively processes current level's metrics and all child levels.
//
// Returns:
// - int: Total number of buffers freed in this subtree
// - error: Non-nil on failure (propagated from children)
func (l *Level) forceFree() (int, error) { func (l *Level) forceFree() (int, error) {
l.lock.Lock() l.lock.Lock()
defer l.lock.Unlock() defer l.lock.Unlock()
@@ -160,6 +254,14 @@ func (l *Level) forceFree() (int, error) {
return n, nil return n, nil
} }
// sizeInBytes calculates the total memory usage of all buffers in the subtree.
//
// Recursively sums buffer data sizes (count of Float values × sizeof(Float)) across
// this level's metrics and all child levels. Used by MemoryUsageTracker to enforce
// memory cap limits.
//
// Returns:
// - int64: Total bytes used by buffer data in this subtree
func (l *Level) sizeInBytes() int64 { func (l *Level) sizeInBytes() int64 {
l.lock.RLock() l.lock.RLock()
defer l.lock.RUnlock() defer l.lock.RUnlock()
@@ -178,6 +280,16 @@ func (l *Level) sizeInBytes() int64 {
return size return size
} }
// findLevel navigates to the level specified by selector, returning nil if not found.
//
// Read-only variant of findLevelOrCreate. Does not create missing levels.
// Recursively descends the tree following the selector path.
//
// Parameters:
// - selector: Hierarchical path (e.g., []string{"emmy", "node001", "cpu0"})
//
// Returns:
// - *Level: The target level, or nil if any component in the path does not exist
func (l *Level) findLevel(selector []string) *Level { func (l *Level) findLevel(selector []string) *Level {
if len(selector) == 0 { if len(selector) == 0 {
return l return l
@@ -194,6 +306,28 @@ func (l *Level) findLevel(selector []string) *Level {
return lvl.findLevel(selector[1:]) return lvl.findLevel(selector[1:])
} }
// findBuffers invokes callback on all buffers matching the selector pattern.
//
// Supports flexible selector patterns (from cc-lib/util.Selector):
// - Exact match: Selector element with String set (e.g., "node001")
// - Group match: Selector element with Group set (e.g., ["cpu0", "cpu2", "cpu4"])
// - Wildcard: Selector element with Any=true (matches all children)
//
// Empty selector (len==0) matches current level's buffer at 'offset' and recursively
// all descendant buffers at the same offset (used for aggregation queries).
//
// Parameters:
// - selector: Pattern to match (consumed recursively)
// - offset: Metric index in metrics slice (from MetricConfig.offset)
// - f: Callback invoked on each matching buffer
//
// Returns:
// - error: First error returned by callback, or nil if all succeeded
//
// Example:
//
// // Find all cpu0 buffers across all hosts:
// findBuffers([]Selector{{Any: true}, {String: "cpu0"}}, metricOffset, callback)
func (l *Level) findBuffers(selector util.Selector, offset int, f func(b *buffer) error) error { func (l *Level) findBuffers(selector util.Selector, offset int, f func(b *buffer) error) error {
l.lock.RLock() l.lock.RLock()
defer l.lock.RUnlock() defer l.lock.RUnlock()

View File

@@ -42,30 +42,75 @@ var (
msInstance *MemoryStore msInstance *MemoryStore
// shutdownFunc stores the context cancellation function created in Init // shutdownFunc stores the context cancellation function created in Init
// and is called during Shutdown to cancel all background goroutines // and is called during Shutdown to cancel all background goroutines
shutdownFunc context.CancelFunc shutdownFunc context.CancelFunc
shutdownFuncMu sync.Mutex // Protects shutdownFunc from concurrent access
) )
// NodeProvider provides information about nodes currently in use by running jobs. // NodeProvider provides information about nodes currently in use by running jobs.
//
// This interface allows metricstore to query job information without directly // This interface allows metricstore to query job information without directly
// depending on the repository package, breaking the import cycle. // depending on the repository package, breaking the import cycle.
//
// Implementations should return nodes that are actively processing jobs started
// before the given timestamp. These nodes will be excluded from retention-based
// garbage collection to prevent data loss for jobs that are still running or
// recently completed.
type NodeProvider interface { type NodeProvider interface {
// GetUsedNodes returns a map of cluster names to sorted lists of unique hostnames // GetUsedNodes returns a map of cluster names to sorted lists of unique hostnames
// that are currently in use by jobs that started before the given timestamp. // that are currently in use by jobs that started before the given timestamp.
//
// Parameters:
// - ts: Unix timestamp threshold - returns nodes with jobs started before this time
//
// Returns:
// - Map of cluster names to lists of node hostnames that should be excluded from garbage collection
// - Error if the query fails
GetUsedNodes(ts int64) (map[string][]string, error) GetUsedNodes(ts int64) (map[string][]string, error)
} }
// Metric represents a single metric data point to be written to the store.
type Metric struct { type Metric struct {
Name string Name string
Value schema.Float Value schema.Float
// MetricConfig contains frequency and aggregation settings for this metric.
// If Frequency is 0, configuration will be looked up from MemoryStore.Metrics during Write().
MetricConfig MetricConfig MetricConfig MetricConfig
} }
// MemoryStore is the main in-memory time-series metric storage implementation.
//
// It organizes metrics in a hierarchical tree structure where each level represents
// a component of the system hierarchy (e.g., cluster → host → CPU). Each level can
// store multiple metrics as time-series buffers.
//
// The store is initialized as a singleton via InitMetrics() and accessed via GetMemoryStore().
// All public methods are safe for concurrent use.
type MemoryStore struct { type MemoryStore struct {
Metrics map[string]MetricConfig Metrics map[string]MetricConfig
root Level root Level
nodeProvider NodeProvider // Injected dependency for querying running jobs nodeProvider NodeProvider
} }
// Init initializes the metric store from configuration and starts background workers.
//
// This function must be called exactly once before any other metricstore operations.
// It performs the following initialization steps:
// 1. Validates and decodes the metric store configuration
// 2. Configures worker pool size (defaults to NumCPU/2+1, max 10)
// 3. Loads metric configurations from all registered clusters
// 4. Restores checkpoints within the retention window
// 5. Starts background workers for retention, checkpointing, archiving, and monitoring
// 6. Optionally subscribes to NATS for real-time metric ingestion
//
// Parameters:
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig)
// - wg: WaitGroup that will be incremented for each background goroutine started
//
// The function will call cclog.Fatal on critical errors during initialization.
// Use Shutdown() to cleanly stop all background workers started by Init().
//
// Note: Signal handling must be implemented by the caller. Call Shutdown() when
// receiving termination signals to ensure checkpoint data is persisted.
func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
startupTime := time.Now() startupTime := time.Now()
@@ -163,7 +208,9 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
// The caller is responsible for handling shutdown signals and calling // The caller is responsible for handling shutdown signals and calling
// the shutdown() function when appropriate. // the shutdown() function when appropriate.
// Store the shutdown function for later use by Shutdown() // Store the shutdown function for later use by Shutdown()
shutdownFuncMu.Lock()
shutdownFunc = shutdown shutdownFunc = shutdown
shutdownFuncMu.Unlock()
if Keys.Subscriptions != nil { if Keys.Subscriptions != nil {
err = ReceiveNats(ms, 1, ctx) err = ReceiveNats(ms, 1, ctx)
@@ -173,8 +220,17 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
} }
} }
// InitMetrics creates a new, initialized instance of a MemoryStore. // InitMetrics initializes the singleton MemoryStore instance with the given metric configurations.
// Will panic if values in the metric configurations are invalid. //
// This function must be called before GetMemoryStore() and can only be called once due to
// the singleton pattern. It assigns each metric an internal offset for efficient buffer indexing.
//
// Parameters:
// - metrics: Map of metric names to their configurations (frequency and aggregation strategy)
//
// Panics if any metric has Frequency == 0, which indicates an invalid configuration.
//
// After this call, the global msInstance is ready for use via GetMemoryStore().
func InitMetrics(metrics map[string]MetricConfig) { func InitMetrics(metrics map[string]MetricConfig) {
singleton.Do(func() { singleton.Do(func() {
offset := 0 offset := 0
@@ -201,6 +257,11 @@ func InitMetrics(metrics map[string]MetricConfig) {
}) })
} }
// GetMemoryStore returns the singleton MemoryStore instance.
//
// Returns the initialized MemoryStore singleton. Calls cclog.Fatal if InitMetrics() was not called first.
//
// This function is safe for concurrent use after initialization.
func GetMemoryStore() *MemoryStore { func GetMemoryStore() *MemoryStore {
if msInstance == nil { if msInstance == nil {
cclog.Fatalf("[METRICSTORE]> MemoryStore not initialized!") cclog.Fatalf("[METRICSTORE]> MemoryStore not initialized!")
@@ -217,7 +278,22 @@ func (ms *MemoryStore) SetNodeProvider(provider NodeProvider) {
ms.nodeProvider = provider ms.nodeProvider = provider
} }
// Shutdown performs a graceful shutdown of the metric store.
//
// This function cancels all background goroutines started by Init() and writes
// a final checkpoint to disk before returning. It should be called when the
// application receives a termination signal.
//
// The function will:
// 1. Cancel the context to stop all background workers
// 2. Close NATS message channels if using Avro format
// 3. Write a final checkpoint to preserve in-memory data
// 4. Log any errors encountered during shutdown
//
// Note: This function blocks until the final checkpoint is written.
func Shutdown() { func Shutdown() {
shutdownFuncMu.Lock()
defer shutdownFuncMu.Unlock()
if shutdownFunc != nil { if shutdownFunc != nil {
shutdownFunc() shutdownFunc()
} }
@@ -244,6 +320,17 @@ func Shutdown() {
cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files) cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files)
} }
// Retention starts a background goroutine that periodically frees old metric data.
//
// This worker runs at half the retention interval and calls Free() to remove buffers
// older than the configured retention time. It respects the NodeProvider to preserve
// data for nodes with active jobs.
//
// Parameters:
// - wg: WaitGroup to signal completion when context is cancelled
// - ctx: Context for cancellation signal
//
// The goroutine exits when ctx is cancelled.
func Retention(wg *sync.WaitGroup, ctx context.Context) { func Retention(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore() ms := GetMemoryStore()
@@ -283,6 +370,17 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
}() }()
} }
// MemoryUsageTracker starts a background goroutine that monitors memory usage.
//
// This worker checks memory usage every minute and force-frees buffers if memory
// exceeds the configured cap. It protects against infinite loops by limiting
// iterations and forcing garbage collection between attempts.
//
// Parameters:
// - wg: WaitGroup to signal completion when context is cancelled
// - ctx: Context for cancellation signal
//
// The goroutine exits when ctx is cancelled.
func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore() ms := GetMemoryStore()
@@ -303,15 +401,16 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
return return
case <-ticker.C: case <-ticker.C:
memoryUsageGB := ms.SizeInGB() memoryUsageGB := ms.SizeInGB()
cclog.Infof("[METRICSTORE]> current memory usage: %.2f\n", memoryUsageGB) cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB\n", memoryUsageGB)
if memoryUsageGB > float64(Keys.MemoryCap) { if memoryUsageGB > float64(Keys.MemoryCap) {
cclog.Warnf("[METRICSTORE]> current memory usage is greater than the Memory Cap: %d\n", Keys.MemoryCap) cclog.Warnf("[METRICSTORE]> current memory usage is greater than the Memory Cap: %d GB\n", Keys.MemoryCap)
cclog.Warnf("[METRICSTORE]> starting to force-free the buffers from the Metric Store\n") cclog.Warnf("[METRICSTORE]> starting to force-free the buffers from the Metric Store\n")
freedTotal := 0 freedTotal := 0
const maxIterations = 100
for { for range maxIterations {
memoryUsageGB = ms.SizeInGB() memoryUsageGB = ms.SizeInGB()
if memoryUsageGB < float64(Keys.MemoryCap) { if memoryUsageGB < float64(Keys.MemoryCap) {
break break
@@ -319,16 +418,23 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
freed, err := ms.ForceFree() freed, err := ms.ForceFree()
if err != nil { if err != nil {
cclog.Errorf("error while force-freeing the buffers: %s", err) cclog.Errorf("[METRICSTORE]> error while force-freeing the buffers: %s", err)
} }
if freed == 0 { if freed == 0 {
cclog.Fatalf("0 buffers force-freed in last try, %d total buffers force-freed, memory usage of %.2f remains higher than the memory cap and there are no buffers left to force-free\n", freedTotal, memoryUsageGB) cclog.Errorf("[METRICSTORE]> 0 buffers force-freed in last try, %d total buffers force-freed, memory usage of %.2f GB remains higher than the memory cap of %d GB and there are no buffers left to force-free\n", freedTotal, memoryUsageGB, Keys.MemoryCap)
break
} }
freedTotal += freed freedTotal += freed
runtime.GC()
} }
cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freedTotal) if memoryUsageGB >= float64(Keys.MemoryCap) {
cclog.Infof("[METRICSTORE]> current memory usage after force-freeing the buffers: %.2f\n", memoryUsageGB) cclog.Errorf("[METRICSTORE]> reached maximum iterations (%d) or no more buffers to free, current memory usage: %.2f GB\n", maxIterations, memoryUsageGB)
} else {
cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freedTotal)
cclog.Infof("[METRICSTORE]> current memory usage after force-freeing the buffers: %.2f GB\n", memoryUsageGB)
}
} }
} }
@@ -336,6 +442,24 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
}() }()
} }
// Free removes metric data older than the given time while preserving data for active nodes.
//
// This function implements intelligent retention by consulting the NodeProvider (if configured)
// to determine which nodes are currently in use by running jobs. Data for these nodes is
// preserved even if older than the retention time.
//
// Parameters:
// - ms: The MemoryStore instance
// - t: Time threshold - buffers with data older than this will be freed
//
// Returns:
// - Number of buffers freed
// - Error if NodeProvider query fails
//
// Behavior:
// - If no NodeProvider is set: frees all buffers older than t
// - If NodeProvider returns empty map: frees all buffers older than t
// - Otherwise: preserves buffers for nodes returned by GetUsedNodes(), frees others
func Free(ms *MemoryStore, t time.Time) (int, error) { func Free(ms *MemoryStore, t time.Time) (int, error) {
// If no NodeProvider is configured, free all buffers older than t // If no NodeProvider is configured, free all buffers older than t
if ms.nodeProvider == nil { if ms.nodeProvider == nil {
@@ -362,8 +486,17 @@ func Free(ms *MemoryStore, t time.Time) (int, error) {
} }
} }
// A function to free specific selectors. Used when we want to retain some specific nodes // FreeSelected frees buffers for specific selectors while preserving others.
// beyond the retention time. //
// This function is used when we want to retain some specific nodes beyond the retention time.
// It iterates through the provided selectors and frees their associated buffers.
//
// Parameters:
// - ms: The MemoryStore instance
// - selectors: List of selector paths to free (e.g., [["cluster1", "node1"], ["cluster2", "node2"]])
// - t: Time threshold for freeing buffers
//
// Returns the total number of buffers freed and any error encountered.
func FreeSelected(ms *MemoryStore, selectors [][]string, t time.Time) (int, error) { func FreeSelected(ms *MemoryStore, selectors [][]string, t time.Time) (int, error) {
freed := 0 freed := 0
@@ -380,8 +513,22 @@ func FreeSelected(ms *MemoryStore, selectors [][]string, t time.Time) (int, erro
return freed, nil return freed, nil
} }
// This function will populate all the second last levels - meaning nodes // GetSelectors returns all selectors at depth 2 (cluster/node level) that are NOT in the exclusion map.
// From that we can exclude the specific selectosr/node we want to retain. //
// This function generates a list of selectors whose buffers should be freed by excluding
// selectors that correspond to nodes currently in use by running jobs.
//
// Parameters:
// - ms: The MemoryStore instance
// - excludeSelectors: Map of cluster names to node hostnames that should NOT be freed
//
// Returns a list of selectors ([]string paths) that can be safely freed.
//
// Example:
//
// If the tree has paths ["emmy", "node001"] and ["emmy", "node002"],
// and excludeSelectors contains {"emmy": ["node001"]},
// then only [["emmy", "node002"]] is returned.
func GetSelectors(ms *MemoryStore, excludeSelectors map[string][]string) [][]string { func GetSelectors(ms *MemoryStore, excludeSelectors map[string][]string) [][]string {
allSelectors := ms.GetPaths(2) allSelectors := ms.GetPaths(2)
@@ -432,6 +579,7 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error
if metric.MetricConfig.Frequency == 0 { if metric.MetricConfig.Frequency == 0 {
metric.MetricConfig, ok = m.Metrics[metric.Name] metric.MetricConfig, ok = m.Metrics[metric.Name]
if !ok { if !ok {
cclog.Debugf("[METRICSTORE]> Unknown metric '%s' in Write() - skipping", metric.Name)
metric.MetricConfig.Frequency = 0 metric.MetricConfig.Frequency = 0
} }
metrics[i] = metric metrics[i] = metric