diff --git a/internal/metricstore/api.go b/internal/metricstore/api.go index d8a2ea82..a72c2ac7 100644 --- a/internal/metricstore/api.go +++ b/internal/metricstore/api.go @@ -3,6 +3,9 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +// This file contains the API types and data fetching logic for querying metric data +// from the in-memory metric store. It provides structures for building complex queries +// with support for aggregation, scaling, padding, and statistics computation. package metricstore import ( @@ -15,10 +18,17 @@ import ( ) var ( + // ErrInvalidTimeRange is returned when a query has 'from' >= 'to' ErrInvalidTimeRange = errors.New("[METRICSTORE]> invalid time range: 'from' must be before 'to'") - ErrEmptyCluster = errors.New("[METRICSTORE]> cluster name cannot be empty") + // ErrEmptyCluster is returned when a query with ForAllNodes has no cluster specified + ErrEmptyCluster = errors.New("[METRICSTORE]> cluster name cannot be empty") ) +// APIMetricData represents the response data for a single metric query. +// +// It contains both the time-series data points and computed statistics (avg, min, max). +// If an error occurred during data retrieval, the Error field will be set and other +// fields may be incomplete. type APIMetricData struct { Error *string `json:"error,omitempty"` Data schema.FloatArray `json:"data,omitempty"` @@ -30,6 +40,13 @@ type APIMetricData struct { Max schema.Float `json:"max"` } +// APIQueryRequest represents a batch query request for metric data. +// +// It supports two modes of operation: +// 1. Explicit queries via the Queries field +// 2. Automatic query generation via ForAllNodes (queries all specified metrics for all nodes in the cluster) +// +// The request can be customized with flags to include/exclude statistics, raw data, and padding. type APIQueryRequest struct { Cluster string `json:"cluster"` Queries []APIQuery `json:"queries"` @@ -41,11 +58,25 @@ type APIQueryRequest struct { WithPadding bool `json:"with-padding"` } +// APIQueryResponse represents the response to an APIQueryRequest. +// +// Results is a 2D array where each outer element corresponds to a query, +// and each inner element corresponds to a selector within that query +// (e.g., multiple CPUs or cores). type APIQueryResponse struct { Queries []APIQuery `json:"queries,omitempty"` Results [][]APIMetricData `json:"results"` } +// APIQuery represents a single metric query with optional hierarchical selectors. +// +// The hierarchical selection works as follows: +// - Hostname: The node to query +// - Type + TypeIds: First level of hierarchy (e.g., "cpu" + ["0", "1", "2"]) +// - SubType + SubTypeIds: Second level of hierarchy (e.g., "core" + ["0", "1"]) +// +// If Aggregate is true, data from multiple type/subtype IDs will be aggregated according +// to the metric's aggregation strategy. Otherwise, separate results are returned for each combination. type APIQuery struct { Type *string `json:"type,omitempty"` SubType *string `json:"subtype,omitempty"` @@ -58,6 +89,11 @@ type APIQuery struct { Aggregate bool `json:"aggreg"` } +// AddStats computes and populates the Avg, Min, and Max fields from the Data array. +// +// NaN values in the data are ignored during computation. If all values are NaN, +// the statistics fields will be set to NaN. +// // TODO: Optimize this, just like the stats endpoint! func (data *APIMetricData) AddStats() { n := 0 @@ -83,6 +119,10 @@ func (data *APIMetricData) AddStats() { } } +// ScaleBy multiplies all data points and statistics by the given factor. +// +// This is commonly used for unit conversion (e.g., bytes to gigabytes). +// Scaling by 0 or 1 is a no-op for performance reasons. func (data *APIMetricData) ScaleBy(f schema.Float) { if f == 0 || f == 1 { return @@ -96,6 +136,17 @@ func (data *APIMetricData) ScaleBy(f schema.Float) { } } +// PadDataWithNull pads the beginning of the data array with NaN values if needed. +// +// This ensures that the data aligns with the requested 'from' timestamp, even if +// the metric store doesn't have data for the earliest time points. This is useful +// for maintaining consistent array indexing across multiple queries. +// +// Parameters: +// - ms: MemoryStore instance to lookup metric configuration +// - from: The requested start timestamp +// - to: The requested end timestamp (unused but kept for API consistency) +// - metric: The metric name to lookup frequency information func (data *APIMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metric string) { minfo, ok := ms.Metrics[metric] if !ok { @@ -115,6 +166,31 @@ func (data *APIMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metr } } +// FetchData executes a batch metric query request and returns the results. +// +// This is the primary API for retrieving metric data from the memory store. It supports: +// - Individual queries via req.Queries +// - Batch queries for all nodes via req.ForAllNodes +// - Hierarchical selector construction (cluster → host → type → subtype) +// - Optional statistics computation (avg, min, max) +// - Optional data scaling +// - Optional data padding with NaN values +// +// The function constructs selectors based on the query parameters and calls MemoryStore.Read() +// for each selector. If a query specifies Aggregate=false with multiple type/subtype IDs, +// separate results are returned for each combination. +// +// Parameters: +// - req: The query request containing queries, time range, and options +// +// Returns: +// - APIQueryResponse containing results for each query, or error if validation fails +// +// Errors: +// - ErrInvalidTimeRange if req.From > req.To +// - ErrEmptyCluster if req.ForAllNodes is used without specifying a cluster +// - Error if MemoryStore is not initialized +// - Individual query errors are stored in APIMetricData.Error field func FetchData(req APIQueryRequest) (*APIQueryResponse, error) { if req.From > req.To { return nil, ErrInvalidTimeRange @@ -126,7 +202,7 @@ func FetchData(req APIQueryRequest) (*APIQueryResponse, error) { req.WithData = true ms := GetMemoryStore() if ms == nil { - return nil, fmt.Errorf("memorystore not initialized") + return nil, fmt.Errorf("[METRICSTORE]> memorystore not initialized") } response := APIQueryResponse{ @@ -195,8 +271,6 @@ func FetchData(req APIQueryRequest) (*APIQueryResponse, error) { } } - // log.Printf("query: %#v\n", query) - // log.Printf("sels: %#v\n", sels) var err error res := make([]APIMetricData, 0, len(sels)) for _, sel := range sels { diff --git a/internal/metricstore/buffer.go b/internal/metricstore/buffer.go index 64109abb..3687c4dc 100644 --- a/internal/metricstore/buffer.go +++ b/internal/metricstore/buffer.go @@ -3,6 +3,41 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +// Package metricstore provides buffer.go: Time-series data buffer implementation. +// +// # Buffer Architecture +// +// Each metric at each hierarchical level (cluster/host/cpu/etc.) uses a linked-list +// chain of fixed-size buffers to store time-series data. This design: +// +// - Avoids reallocation/copying when growing (new links added instead) +// - Enables efficient pooling (buffers returned to sync.Pool) +// - Supports traversal back in time (via prev pointers) +// - Maintains temporal ordering (newer data in later buffers) +// +// # Buffer Chain Example +// +// [oldest buffer] <- prev -- [older] <- prev -- [newest buffer (head)] +// start=1000 start=1512 start=2024 +// data=[v0...v511] data=[v0...v511] data=[v0...v42] +// +// When the head buffer reaches capacity (BufferCap = 512), a new buffer becomes +// the new head and the old head is linked via prev. +// +// # Pooling Strategy +// +// sync.Pool reduces GC pressure for the common case (BufferCap-sized allocations). +// Non-standard capacity buffers are not pooled (e.g., from checkpoint deserialization). +// +// # Time Alignment +// +// Timestamps are aligned to measurement frequency intervals: +// +// index = (timestamp - buffer.start) / buffer.frequency +// actualTime = buffer.start + (frequency / 2) + (index * frequency) +// +// Missing data points are represented as NaN values. The read() function performs +// linear interpolation where possible. package metricstore import ( @@ -27,14 +62,30 @@ var bufferPool sync.Pool = sync.Pool{ } var ( - ErrNoData error = errors.New("[METRICSTORE]> no data for this metric/level") + // ErrNoData indicates no time-series data exists for the requested metric/level. + ErrNoData error = errors.New("[METRICSTORE]> no data for this metric/level") + + // ErrDataDoesNotAlign indicates that aggregated data from child scopes + // does not align with the parent scope's expected timestamps/intervals. ErrDataDoesNotAlign error = errors.New("[METRICSTORE]> data from lower granularities does not align") ) -// Each metric on each level has it's own buffer. -// This is where the actual values go. -// If `cap(data)` is reached, a new buffer is created and -// becomes the new head of a buffer list. +// buffer stores time-series data for a single metric at a specific hierarchical level. +// +// Buffers form doubly-linked chains ordered by time. When capacity is reached, +// a new buffer becomes the head and the old head is linked via prev/next. +// +// Fields: +// - prev: Link to older buffer in the chain (nil if this is oldest) +// - next: Link to newer buffer in the chain (nil if this is newest/head) +// - data: Time-series values (schema.Float supports NaN for missing data) +// - frequency: Measurement interval in seconds +// - start: Start timestamp (adjusted by -frequency/2 for alignment) +// - archived: True if data has been persisted to disk archive +// - closed: True if buffer is no longer accepting writes +// +// Index calculation: index = (timestamp - start) / frequency +// Actual data timestamp: start + (frequency / 2) + (index * frequency) type buffer struct { prev *buffer next *buffer @@ -57,10 +108,22 @@ func newBuffer(ts, freq int64) *buffer { return b } -// If a new buffer was created, the new head is returnd. -// Otherwise, the existing buffer is returnd. -// Normaly, only "newer" data should be written, but if the value would -// end up in the same buffer anyways it is allowed. +// write appends a timestamped value to the buffer chain. +// +// Returns the head buffer (which may be newly created if capacity was reached). +// Timestamps older than the buffer's start are rejected. If the calculated index +// exceeds capacity, a new buffer is allocated and linked as the new head. +// +// Missing timestamps are automatically filled with NaN values to maintain alignment. +// Overwrites are allowed if the index is already within the existing data slice. +// +// Parameters: +// - ts: Unix timestamp in seconds +// - value: Metric value (can be schema.NaN for missing data) +// +// Returns: +// - *buffer: The new head buffer (same as b if no new buffer created) +// - error: Non-nil if timestamp is before buffer start func (b *buffer) write(ts int64, value schema.Float) (*buffer, error) { if ts < b.start { return nil, errors.New("[METRICSTORE]> cannot write value to buffer from past") @@ -99,13 +162,27 @@ func (b *buffer) firstWrite() int64 { return b.start + (b.frequency / 2) } -// Return all known values from `from` to `to`. Gaps of information are represented as NaN. -// Simple linear interpolation is done between the two neighboring cells if possible. -// If values at the start or end are missing, instead of NaN values, the second and thrid -// return values contain the actual `from`/`to`. -// This function goes back the buffer chain if `from` is older than the currents buffer start. -// The loaded values are added to `data` and `data` is returned, possibly with a shorter length. -// If `data` is not long enough to hold all values, this function will panic! +// read retrieves time-series data from the buffer chain for the specified time range. +// +// Traverses the buffer chain backwards (via prev links) if 'from' precedes the current +// buffer's start. Missing data points are represented as NaN. Values are accumulated +// into the provided 'data' slice (using +=, so caller must zero-initialize if needed). +// +// The function adjusts the actual time range returned if data is unavailable at the +// boundaries (returned via adjusted from/to timestamps). +// +// Parameters: +// - from: Start timestamp (Unix seconds) +// - to: End timestamp (Unix seconds, exclusive) +// - data: Pre-allocated slice to accumulate results (must be large enough) +// +// Returns: +// - []schema.Float: Slice of data (may be shorter than input 'data' slice) +// - int64: Actual start timestamp with available data +// - int64: Actual end timestamp (exclusive) +// - error: Non-nil on failure +// +// Panics if 'data' slice is too small to hold all values in [from, to). func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int64, int64, error) { if from < b.firstWrite() { if b.prev != nil { @@ -142,7 +219,18 @@ func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int6 return data[:i], from, t, nil } -// Returns true if this buffer needs to be freed. +// free removes buffers older than the specified timestamp from the chain. +// +// Recursively traverses backwards (via prev) and unlinks buffers whose end time +// is before the retention threshold. Freed buffers are returned to the pool if +// they have the standard capacity (BufferCap). +// +// Parameters: +// - t: Retention threshold timestamp (Unix seconds) +// +// Returns: +// - delme: True if the current buffer itself should be deleted by caller +// - n: Number of buffers freed in this subtree func (b *buffer) free(t int64) (delme bool, n int) { if b.prev != nil { delme, m := b.prev.free(t) @@ -196,7 +284,19 @@ func (b *buffer) forceFreeOldest() (delme bool, n int) { return true, 1 } -// Call `callback` on every buffer that contains data in the range from `from` to `to`. +// iterFromTo invokes callback on every buffer in the chain that overlaps [from, to]. +// +// Traverses backwards (via prev) first, then processes current buffer if it overlaps +// the time range. Used for checkpoint/archive operations that need to serialize buffers +// within a specific time window. +// +// Parameters: +// - from: Start timestamp (Unix seconds, inclusive) +// - to: End timestamp (Unix seconds, inclusive) +// - callback: Function to invoke on each overlapping buffer +// +// Returns: +// - error: First error returned by callback, or nil if all succeeded func (b *buffer) iterFromTo(from, to int64, callback func(b *buffer) error) error { if b == nil { return nil diff --git a/internal/metricstore/checkpoint.go b/internal/metricstore/checkpoint.go index 660b7a1f..b90b1c22 100644 --- a/internal/metricstore/checkpoint.go +++ b/internal/metricstore/checkpoint.go @@ -3,6 +3,34 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +// This file implements checkpoint persistence for the in-memory metric store. +// +// Checkpoints enable graceful restarts by periodically saving in-memory metric +// data to disk in either JSON or Avro format. The checkpoint system: +// +// Key Features: +// - Periodic background checkpointing via the Checkpointing() worker +// - Two formats: JSON (human-readable) and Avro (compact, efficient) +// - Parallel checkpoint creation and loading using worker pools +// - Hierarchical file organization: checkpoint_dir/cluster/host/timestamp.{json|avro} +// - Only saves unarchived data (archived data is already persisted elsewhere) +// - Automatic format detection and fallback during loading +// - GC optimization during loading to prevent excessive heap growth +// +// Checkpoint Workflow: +// 1. Init() loads checkpoints within retention window at startup +// 2. Checkpointing() worker periodically saves new data +// 3. Shutdown() writes final checkpoint before exit +// +// File Organization: +// +// checkpoints/ +// cluster1/ +// host001/ +// 1234567890.json (timestamp = checkpoint start time) +// 1234567950.json +// host002/ +// ... package metricstore import ( @@ -29,18 +57,21 @@ import ( ) const ( - CheckpointFilePerms = 0o644 - CheckpointDirPerms = 0o755 - GCTriggerInterval = DefaultGCTriggerInterval + CheckpointFilePerms = 0o644 // File permissions for checkpoint files + CheckpointDirPerms = 0o755 // Directory permissions for checkpoint directories + GCTriggerInterval = DefaultGCTriggerInterval // Interval for triggering GC during checkpoint loading ) -// Whenever changed, update MarshalJSON as well! +// CheckpointMetrics represents metric data in a checkpoint file. +// Whenever the structure changes, update MarshalJSON as well! type CheckpointMetrics struct { Data []schema.Float `json:"data"` Frequency int64 `json:"frequency"` Start int64 `json:"start"` } +// CheckpointFile represents the hierarchical structure of a checkpoint file. +// It mirrors the Level tree structure from the MemoryStore. type CheckpointFile struct { Metrics map[string]*CheckpointMetrics `json:"metrics"` Children map[string]*CheckpointFile `json:"children"` @@ -48,10 +79,23 @@ type CheckpointFile struct { To int64 `json:"to"` } -var lastCheckpoint time.Time +// lastCheckpoint tracks the timestamp of the last checkpoint creation. +var ( + lastCheckpoint time.Time + lastCheckpointMu sync.Mutex +) +// Checkpointing starts a background worker that periodically saves metric data to disk. +// +// The behavior depends on the configured file format: +// - JSON: Periodic checkpointing based on Keys.Checkpoints.Interval +// - Avro: Initial delay + periodic checkpointing at DefaultAvroCheckpointInterval +// +// The worker respects context cancellation and signals completion via the WaitGroup. func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { + lastCheckpointMu.Lock() lastCheckpoint = time.Now() + lastCheckpointMu.Unlock() if Keys.Checkpoints.FileFormat == "json" { ms := GetMemoryStore() @@ -60,9 +104,10 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { defer wg.Done() d, err := time.ParseDuration(Keys.Checkpoints.Interval) if err != nil { - cclog.Fatal(err) + cclog.Fatalf("[METRICSTORE]> invalid checkpoint interval '%s': %s", Keys.Checkpoints.Interval, err.Error()) } if d <= 0 { + cclog.Warnf("[METRICSTORE]> checkpoint interval is zero or negative (%s), checkpointing disabled", d) return } @@ -74,15 +119,21 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", lastCheckpoint.Format(time.RFC3339)) + lastCheckpointMu.Lock() + from := lastCheckpoint + lastCheckpointMu.Unlock() + + cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339)) now := time.Now() n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, - lastCheckpoint.Unix(), now.Unix()) + from.Unix(), now.Unix()) if err != nil { cclog.Errorf("[METRICSTORE]> checkpointing failed: %s", err.Error()) } else { cclog.Infof("[METRICSTORE]> done: %d checkpoint files created", n) + lastCheckpointMu.Lock() lastCheckpoint = now + lastCheckpointMu.Unlock() } } } @@ -113,9 +164,10 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { } } -// As `Float` implements a custom MarshalJSON() function, -// serializing an array of such types has more overhead -// than one would assume (because of extra allocations, interfaces and so on). +// MarshalJSON provides optimized JSON encoding for CheckpointMetrics. +// +// Since schema.Float has custom MarshalJSON, serializing []Float has significant overhead. +// This method manually constructs JSON to avoid allocations and interface conversions. func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) { buf := make([]byte, 0, 128+len(cm.Data)*8) buf = append(buf, `{"frequency":`...) @@ -137,13 +189,27 @@ func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) { return buf, nil } -// Metrics stored at the lowest 2 levels are not stored away (root and cluster)! -// On a per-host basis a new JSON file is created. I have no idea if this will scale. -// The good thing: Only a host at a time is locked, so this function can run -// in parallel to writes/reads. +// ToCheckpoint writes metric data to checkpoint files in parallel. +// +// Metrics at root and cluster levels are skipped. One file per host is created. +// Uses worker pool (Keys.NumWorkers) for parallel processing. Only locks one host +// at a time, allowing concurrent writes/reads to other hosts. +// +// Returns the number of checkpoint files created and any errors encountered. func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { - levels := make([]*Level, 0) - selectors := make([][]string, 0) + // Pre-calculate capacity by counting cluster/host pairs + m.root.lock.RLock() + totalHosts := 0 + for _, l1 := range m.root.children { + l1.lock.RLock() + totalHosts += len(l1.children) + l1.lock.RUnlock() + } + m.root.lock.RUnlock() + + levels := make([]*Level, 0, totalHosts) + selectors := make([][]string, 0, totalHosts) + m.root.lock.RLock() for sel1, l1 := range m.root.children { l1.lock.RLock() @@ -203,6 +269,8 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { return int(n), nil } +// toCheckpointFile recursively converts a Level tree to CheckpointFile structure. +// Skips metrics that are already archived. Returns nil if no unarchived data exists. func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFile, error) { l.lock.RLock() defer l.lock.RUnlock() @@ -224,6 +292,7 @@ func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil b.iterFromTo(from, to, func(b *buffer) error { if !b.archived { allArchived = false + return fmt.Errorf("stop") // Early termination signal } return nil }) @@ -267,6 +336,8 @@ func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil return retval, nil } +// toCheckpoint writes a Level's data to a JSON checkpoint file. +// Creates directory if needed. Returns ErrNoNewArchiveData if nothing to save. func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { cf, err := l.toCheckpointFile(from, to, m) if err != nil { @@ -278,11 +349,11 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { } filepath := path.Join(dir, fmt.Sprintf("%d.json", from)) - f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms) + f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) if err != nil && os.IsNotExist(err) { err = os.MkdirAll(dir, CheckpointDirPerms) if err == nil { - f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms) + f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) } } if err != nil { @@ -298,9 +369,54 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { return bw.Flush() } +// enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs. +// Returns error if directory structure is invalid. +func enqueueCheckpointHosts(dir string, work chan<- [2]string) error { + clustersDir, err := os.ReadDir(dir) + if err != nil { + return err + } + + gcCounter := 0 + for _, clusterDir := range clustersDir { + if !clusterDir.IsDir() { + return errors.New("[METRICSTORE]> expected only directories at first level of checkpoints/ directory") + } + + hostsDir, err := os.ReadDir(filepath.Join(dir, clusterDir.Name())) + if err != nil { + return err + } + + for _, hostDir := range hostsDir { + if !hostDir.IsDir() { + return errors.New("[METRICSTORE]> expected only directories at second level of checkpoints/ directory") + } + + gcCounter++ + if gcCounter%GCTriggerInterval == 0 { + // Forcing garbage collection runs here regulary during the loading of checkpoints + // will decrease the total heap size after loading everything back to memory is done. + // While loading data, the heap will grow fast, so the GC target size will double + // almost always. By forcing GCs here, we can keep it growing more slowly so that + // at the end, less memory is wasted. + runtime.GC() + } + + work <- [2]string{clusterDir.Name(), hostDir.Name()} + } + } + + return nil +} + +// FromCheckpoint loads checkpoint files from disk into memory in parallel. +// +// Uses worker pool to load cluster/host combinations. Periodically triggers GC +// to prevent excessive heap growth. Returns number of files loaded and any errors. func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (int, error) { var wg sync.WaitGroup - work := make(chan [2]string, Keys.NumWorkers) + work := make(chan [2]string, Keys.NumWorkers*4) n, errs := int32(0), int32(0) wg.Add(Keys.NumWorkers) @@ -319,40 +435,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) ( }() } - i := 0 - clustersDir, err := os.ReadDir(dir) - for _, clusterDir := range clustersDir { - if !clusterDir.IsDir() { - err = errors.New("[METRICSTORE]> expected only directories at first level of checkpoints/ directory") - goto done - } - - hostsDir, e := os.ReadDir(filepath.Join(dir, clusterDir.Name())) - if e != nil { - err = e - goto done - } - - for _, hostDir := range hostsDir { - if !hostDir.IsDir() { - err = errors.New("[METRICSTORE]> expected only directories at second level of checkpoints/ directory") - goto done - } - - i++ - if i%Keys.NumWorkers == 0 && i > GCTriggerInterval { - // Forcing garbage collection runs here regulary during the loading of checkpoints - // will decrease the total heap size after loading everything back to memory is done. - // While loading data, the heap will grow fast, so the GC target size will double - // almost always. By forcing GCs here, we can keep it growing more slowly so that - // at the end, less memory is wasted. - runtime.GC() - } - - work <- [2]string{clusterDir.Name(), hostDir.Name()} - } - } -done: + err := enqueueCheckpointHosts(dir, work) close(work) wg.Wait() @@ -366,9 +449,11 @@ done: return int(n), nil } -// Metrics stored at the lowest 2 levels are not loaded (root and cluster)! -// This function can only be called once and before the very first write or read. -// Different host's data is loaded to memory in parallel. +// FromCheckpointFiles is the main entry point for loading checkpoints at startup. +// +// Automatically detects checkpoint format (JSON vs Avro) and falls back if needed. +// Creates checkpoint directory if it doesn't exist. This function must be called +// before any writes or reads, and can only be called once. func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { if _, err := os.Stat(dir); os.IsNotExist(err) { // The directory does not exist, so create it using os.MkdirAll() @@ -411,6 +496,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { return 0, nil } +// checkFilesWithExtension walks a directory tree to check if files with the given extension exist. func checkFilesWithExtension(dir string, extension string) (bool, error) { found := false diff --git a/internal/metricstore/config.go b/internal/metricstore/config.go index 9657453d..34e536d0 100644 --- a/internal/metricstore/config.go +++ b/internal/metricstore/config.go @@ -3,6 +3,43 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +// Package metricstore provides config.go: Configuration structures and metric management. +// +// # Configuration Hierarchy +// +// The metricstore package uses nested configuration structures: +// +// MetricStoreConfig (Keys) +// ├─ NumWorkers: Parallel checkpoint/archive workers +// ├─ RetentionInMemory: How long to keep data in RAM +// ├─ MemoryCap: Memory limit in bytes (triggers forceFree) +// ├─ Checkpoints: Persistence configuration +// │ ├─ FileFormat: "avro" or "json" +// │ ├─ Interval: How often to save (e.g., "1h") +// │ └─ RootDir: Checkpoint storage path +// ├─ Archive: Long-term storage configuration +// │ ├─ ArchiveInterval: How often to archive +// │ ├─ RootDir: Archive storage path +// │ └─ DeleteInstead: Delete old data instead of archiving +// ├─ Debug: Development/debugging options +// └─ Subscriptions: NATS topic subscriptions for metric ingestion +// +// # Metric Configuration +// +// Each metric (e.g., "cpu_load", "mem_used") has a MetricConfig entry in the global +// Metrics map, defining: +// +// - Frequency: Measurement interval in seconds +// - Aggregation: How to combine values (sum/avg/none) when transforming scopes +// - offset: Internal index into Level.metrics slice (assigned during Init) +// +// # AggregationStrategy +// +// Determines how to combine metric values when aggregating from finer to coarser scopes: +// +// - NoAggregation: Do not combine (incompatible scopes) +// - SumAggregation: Add values (e.g., power consumption: core→socket) +// - AvgAggregation: Average values (e.g., temperature: core→socket) package metricstore import ( @@ -19,23 +56,50 @@ const ( DefaultAvroCheckpointInterval = time.Minute ) +// Checkpoints configures periodic persistence of in-memory metric data. +// +// Fields: +// - FileFormat: "avro" (default, binary, compact) or "json" (human-readable, slower) +// - Interval: Duration string (e.g., "1h", "30m") between checkpoint saves +// - RootDir: Filesystem path for checkpoint files (created if missing) type Checkpoints struct { FileFormat string `json:"file-format"` Interval string `json:"interval"` RootDir string `json:"directory"` } +// Debug provides development and profiling options. +// +// Fields: +// - DumpToFile: Path to dump checkpoint data for inspection (empty = disabled) +// - EnableGops: Enable gops agent for live runtime debugging (https://github.com/google/gops) type Debug struct { DumpToFile string `json:"dump-to-file"` EnableGops bool `json:"gops"` } +// Archive configures long-term storage of old metric data. +// +// Data older than RetentionInMemory is archived to disk or deleted. +// +// Fields: +// - ArchiveInterval: Duration string (e.g., "24h") between archive operations +// - RootDir: Filesystem path for archived data (created if missing) +// - DeleteInstead: If true, delete old data instead of archiving (saves disk space) type Archive struct { ArchiveInterval string `json:"interval"` RootDir string `json:"directory"` DeleteInstead bool `json:"delete-instead"` } +// Subscriptions defines NATS topics to subscribe to for metric ingestion. +// +// Each subscription receives metrics via NATS messaging, enabling real-time +// data collection from compute nodes. +// +// Fields: +// - SubscribeTo: NATS subject/channel name (e.g., "metrics.compute.*") +// - ClusterTag: Default cluster name for metrics without cluster tag (optional) type Subscriptions []struct { // Channel name SubscribeTo string `json:"subscribe-to"` @@ -44,6 +108,19 @@ type Subscriptions []struct { ClusterTag string `json:"cluster-tag"` } +// MetricStoreConfig defines the main configuration for the metricstore. +// +// Loaded from cc-backend's config.json "metricstore" section. Controls memory usage, +// persistence, archiving, and metric ingestion. +// +// Fields: +// - NumWorkers: Parallel workers for checkpoint/archive (0 = auto: min(NumCPU/2+1, 10)) +// - RetentionInMemory: Duration string (e.g., "48h") for in-memory data retention +// - MemoryCap: Max bytes for buffer data (0 = unlimited); triggers forceFree when exceeded +// - Checkpoints: Periodic persistence configuration +// - Debug: Development/profiling options (nil = disabled) +// - Archive: Long-term storage configuration (nil = disabled) +// - Subscriptions: NATS topics for metric ingestion (nil = polling only) type MetricStoreConfig struct { // Number of concurrent workers for checkpoint and archive operations. // If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10) @@ -56,6 +133,10 @@ type MetricStoreConfig struct { Subscriptions *Subscriptions `json:"nats-subscriptions"` } +// Keys is the global metricstore configuration instance. +// +// Initialized with defaults, then overwritten by cc-backend's config.json. +// Accessed by Init(), Checkpointing(), and other lifecycle functions. var Keys MetricStoreConfig = MetricStoreConfig{ Checkpoints: Checkpoints{ FileFormat: "avro", @@ -63,15 +144,33 @@ var Keys MetricStoreConfig = MetricStoreConfig{ }, } -// AggregationStrategy for aggregation over multiple values at different cpus/sockets/..., not time! +// AggregationStrategy defines how to combine metric values across hierarchy levels. +// +// Used when transforming data from finer-grained scopes (e.g., core) to coarser scopes +// (e.g., socket). This is SPATIAL aggregation, not TEMPORAL (time-based) aggregation. +// +// Values: +// - NoAggregation: Do not aggregate (incompatible scopes or non-aggregatable metrics) +// - SumAggregation: Add values (e.g., power: sum core power → socket power) +// - AvgAggregation: Average values (e.g., temperature: average core temps → socket temp) type AggregationStrategy int const ( - NoAggregation AggregationStrategy = iota - SumAggregation - AvgAggregation + NoAggregation AggregationStrategy = iota // Do not aggregate + SumAggregation // Sum values (e.g., power, energy) + AvgAggregation // Average values (e.g., temperature, utilization) ) +// AssignAggregationStrategy parses a string into an AggregationStrategy value. +// +// Used when loading metric configurations from JSON/YAML files. +// +// Parameters: +// - str: "sum", "avg", or "" (empty string for NoAggregation) +// +// Returns: +// - AggregationStrategy: Parsed value +// - error: Non-nil if str is unrecognized func AssignAggregationStrategy(str string) (AggregationStrategy, error) { switch str { case "": @@ -85,6 +184,14 @@ func AssignAggregationStrategy(str string) (AggregationStrategy, error) { } } +// MetricConfig defines configuration for a single metric type. +// +// Stored in the global Metrics map, keyed by metric name (e.g., "cpu_load"). +// +// Fields: +// - Frequency: Measurement interval in seconds (e.g., 60 for 1-minute granularity) +// - Aggregation: How to combine values across hierarchy levels (sum/avg/none) +// - offset: Internal index into Level.metrics slice (assigned during Init) type MetricConfig struct { // Interval in seconds at which measurements are stored Frequency int64 @@ -96,8 +203,21 @@ type MetricConfig struct { offset int } +// Metrics is the global map of metric configurations. +// +// Keyed by metric name (e.g., "cpu_load", "mem_used"). Populated during Init() +// from cluster configuration and checkpoint restoration. Each MetricConfig.offset +// corresponds to the buffer slice index in Level.metrics. var Metrics map[string]MetricConfig +// GetMetricFrequency retrieves the measurement interval for a metric. +// +// Parameters: +// - metricName: Metric name (e.g., "cpu_load") +// +// Returns: +// - int64: Frequency in seconds +// - error: Non-nil if metric not found in Metrics map func GetMetricFrequency(metricName string) (int64, error) { if metric, ok := Metrics[metricName]; ok { return metric.Frequency, nil @@ -105,9 +225,18 @@ func GetMetricFrequency(metricName string) (int64, error) { return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName) } -// AddMetric adds logic to add metrics. Redundant metrics should be updated with max frequency. -// use metric.Name to check if the metric already exists. -// if not, add it to the Metrics map. +// AddMetric registers a new metric or updates an existing one. +// +// If the metric already exists with a different frequency, uses the higher frequency +// (finer granularity). This handles cases where different clusters report the same +// metric at different intervals. +// +// Parameters: +// - name: Metric name (e.g., "cpu_load") +// - metric: Configuration (frequency, aggregation strategy) +// +// Returns: +// - error: Always nil (signature for future error handling) func AddMetric(name string, metric MetricConfig) error { if Metrics == nil { Metrics = make(map[string]MetricConfig, 0) diff --git a/internal/metricstore/level.go b/internal/metricstore/level.go index 4960563a..bc99b884 100644 --- a/internal/metricstore/level.go +++ b/internal/metricstore/level.go @@ -3,6 +3,41 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +// Package metricstore provides level.go: Hierarchical tree structure for metric storage. +// +// # Level Architecture +// +// The Level type forms a tree structure where each node represents a level in the +// ClusterCockpit hierarchy: cluster → host → socket → core → hwthread, with special +// nodes for memory domains and accelerators. +// +// Structure: +// +// Root Level (cluster="emmy") +// ├─ Level (host="node001") +// │ ├─ Level (socket="0") +// │ │ ├─ Level (core="0") [stores cpu0 metrics] +// │ │ └─ Level (core="1") [stores cpu1 metrics] +// │ └─ Level (socket="1") +// │ └─ ... +// └─ Level (host="node002") +// └─ ... +// +// Each Level can: +// - Hold data (metrics slice of buffer pointers) +// - Have child nodes (children map[string]*Level) +// - Both simultaneously (inner nodes can store aggregated metrics) +// +// # Selector Paths +// +// Selectors are hierarchical paths: []string{"cluster", "host", "component"}. +// Example: []string{"emmy", "node001", "cpu0"} navigates to the cpu0 core level. +// +// # Concurrency +// +// RWMutex protects children map and metrics slice. Read-heavy workload (metric reads) +// uses RLock. Writes (new levels, buffer updates) use Lock. Double-checked locking +// prevents races during level creation. package metricstore import ( @@ -12,20 +47,40 @@ import ( "github.com/ClusterCockpit/cc-lib/v2/util" ) -// Could also be called "node" as this forms a node in a tree structure. -// Called Level because "node" might be confusing here. -// Can be both a leaf or a inner node. In this tree structue, inner nodes can -// also hold data (in `metrics`). +// Level represents a node in the hierarchical metric storage tree. +// +// Can be both a leaf or inner node. Inner nodes hold data in 'metrics' for aggregated +// values (e.g., socket-level metrics derived from core-level data). Named "Level" +// instead of "node" to avoid confusion with cluster nodes (hosts). +// +// Fields: +// - children: Map of child level names to Level pointers (e.g., "cpu0" → Level) +// - metrics: Slice of buffer pointers (one per metric, indexed by MetricConfig.offset) +// - lock: RWMutex for concurrent access (read-heavy, write-rare) type Level struct { children map[string]*Level metrics []*buffer lock sync.RWMutex } -// Find the correct level for the given selector, creating it if -// it does not exist. Example selector in the context of the -// ClusterCockpit could be: []string{ "emmy", "host123", "cpu0" }. -// This function would probably benefit a lot from `level.children` beeing a `sync.Map`? +// findLevelOrCreate navigates to or creates the level specified by selector. +// +// Recursively descends the tree, creating missing levels as needed. Uses double-checked +// locking: RLock first (fast path), then Lock if creation needed (slow path), then +// re-check after acquiring Lock to handle races. +// +// Example selector: []string{"emmy", "node001", "cpu0"} +// Navigates: root → emmy → node001 → cpu0, creating levels as needed. +// +// Parameters: +// - selector: Hierarchical path (consumed recursively, decreasing depth) +// - nMetrics: Number of metric slots to allocate in new levels +// +// Returns: +// - *Level: The target level (existing or newly created) +// +// Note: sync.Map may improve performance for high-concurrency writes, but current +// approach suffices for read-heavy workload. func (l *Level) findLevelOrCreate(selector []string, nMetrics int) *Level { if len(selector) == 0 { return l @@ -72,6 +127,22 @@ func (l *Level) findLevelOrCreate(selector []string, nMetrics int) *Level { return child.findLevelOrCreate(selector[1:], nMetrics) } +// collectPaths gathers all selector paths at the specified depth in the tree. +// +// Recursively traverses children, collecting paths when currentDepth+1 == targetDepth. +// Each path is a selector that can be used with findLevel() or findBuffers(). +// +// Explicitly copies slices to avoid shared underlying arrays between siblings, preventing +// unintended mutations. +// +// Parameters: +// - currentDepth: Depth of current level (0 = root) +// - targetDepth: Depth to collect paths from +// - currentPath: Path accumulated so far +// - results: Output slice (appended to) +// +// Example: collectPaths(0, 2, []string{}, &results) collects all 2-level paths +// like []string{"emmy", "node001"}, []string{"emmy", "node002"}, etc. func (l *Level) collectPaths(currentDepth, targetDepth int, currentPath []string, results *[][]string) { l.lock.RLock() defer l.lock.RUnlock() @@ -95,6 +166,18 @@ func (l *Level) collectPaths(currentDepth, targetDepth int, currentPath []string } } +// free removes buffers older than the retention threshold from the entire subtree. +// +// Recursively frees buffers in this level's metrics and all child levels. Buffers +// with standard capacity (BufferCap) are returned to the pool. Called by the +// retention worker to enforce retention policies. +// +// Parameters: +// - t: Retention threshold timestamp (Unix seconds) +// +// Returns: +// - int: Total number of buffers freed in this subtree +// - error: Non-nil on failure (propagated from children) func (l *Level) free(t int64) (int, error) { l.lock.Lock() defer l.lock.Unlock() @@ -124,6 +207,17 @@ func (l *Level) free(t int64) (int, error) { return n, nil } +// forceFree removes the oldest buffer from each metric chain in the subtree. +// +// Unlike free(), which removes based on time threshold, this unconditionally removes +// the oldest buffer in each chain. Used by MemoryUsageTracker when memory cap is +// exceeded and time-based retention is insufficient. +// +// Recursively processes current level's metrics and all child levels. +// +// Returns: +// - int: Total number of buffers freed in this subtree +// - error: Non-nil on failure (propagated from children) func (l *Level) forceFree() (int, error) { l.lock.Lock() defer l.lock.Unlock() @@ -160,6 +254,14 @@ func (l *Level) forceFree() (int, error) { return n, nil } +// sizeInBytes calculates the total memory usage of all buffers in the subtree. +// +// Recursively sums buffer data sizes (count of Float values × sizeof(Float)) across +// this level's metrics and all child levels. Used by MemoryUsageTracker to enforce +// memory cap limits. +// +// Returns: +// - int64: Total bytes used by buffer data in this subtree func (l *Level) sizeInBytes() int64 { l.lock.RLock() defer l.lock.RUnlock() @@ -178,6 +280,16 @@ func (l *Level) sizeInBytes() int64 { return size } +// findLevel navigates to the level specified by selector, returning nil if not found. +// +// Read-only variant of findLevelOrCreate. Does not create missing levels. +// Recursively descends the tree following the selector path. +// +// Parameters: +// - selector: Hierarchical path (e.g., []string{"emmy", "node001", "cpu0"}) +// +// Returns: +// - *Level: The target level, or nil if any component in the path does not exist func (l *Level) findLevel(selector []string) *Level { if len(selector) == 0 { return l @@ -194,6 +306,28 @@ func (l *Level) findLevel(selector []string) *Level { return lvl.findLevel(selector[1:]) } +// findBuffers invokes callback on all buffers matching the selector pattern. +// +// Supports flexible selector patterns (from cc-lib/util.Selector): +// - Exact match: Selector element with String set (e.g., "node001") +// - Group match: Selector element with Group set (e.g., ["cpu0", "cpu2", "cpu4"]) +// - Wildcard: Selector element with Any=true (matches all children) +// +// Empty selector (len==0) matches current level's buffer at 'offset' and recursively +// all descendant buffers at the same offset (used for aggregation queries). +// +// Parameters: +// - selector: Pattern to match (consumed recursively) +// - offset: Metric index in metrics slice (from MetricConfig.offset) +// - f: Callback invoked on each matching buffer +// +// Returns: +// - error: First error returned by callback, or nil if all succeeded +// +// Example: +// +// // Find all cpu0 buffers across all hosts: +// findBuffers([]Selector{{Any: true}, {String: "cpu0"}}, metricOffset, callback) func (l *Level) findBuffers(selector util.Selector, offset int, f func(b *buffer) error) error { l.lock.RLock() defer l.lock.RUnlock() diff --git a/internal/metricstore/metricstore.go b/internal/metricstore/metricstore.go index 07b9c38a..b1fe065d 100644 --- a/internal/metricstore/metricstore.go +++ b/internal/metricstore/metricstore.go @@ -42,30 +42,75 @@ var ( msInstance *MemoryStore // shutdownFunc stores the context cancellation function created in Init // and is called during Shutdown to cancel all background goroutines - shutdownFunc context.CancelFunc + shutdownFunc context.CancelFunc + shutdownFuncMu sync.Mutex // Protects shutdownFunc from concurrent access ) // NodeProvider provides information about nodes currently in use by running jobs. +// // This interface allows metricstore to query job information without directly // depending on the repository package, breaking the import cycle. +// +// Implementations should return nodes that are actively processing jobs started +// before the given timestamp. These nodes will be excluded from retention-based +// garbage collection to prevent data loss for jobs that are still running or +// recently completed. type NodeProvider interface { // GetUsedNodes returns a map of cluster names to sorted lists of unique hostnames // that are currently in use by jobs that started before the given timestamp. + // + // Parameters: + // - ts: Unix timestamp threshold - returns nodes with jobs started before this time + // + // Returns: + // - Map of cluster names to lists of node hostnames that should be excluded from garbage collection + // - Error if the query fails GetUsedNodes(ts int64) (map[string][]string, error) } +// Metric represents a single metric data point to be written to the store. type Metric struct { - Name string - Value schema.Float + Name string + Value schema.Float + // MetricConfig contains frequency and aggregation settings for this metric. + // If Frequency is 0, configuration will be looked up from MemoryStore.Metrics during Write(). MetricConfig MetricConfig } +// MemoryStore is the main in-memory time-series metric storage implementation. +// +// It organizes metrics in a hierarchical tree structure where each level represents +// a component of the system hierarchy (e.g., cluster → host → CPU). Each level can +// store multiple metrics as time-series buffers. +// +// The store is initialized as a singleton via InitMetrics() and accessed via GetMemoryStore(). +// All public methods are safe for concurrent use. type MemoryStore struct { Metrics map[string]MetricConfig root Level - nodeProvider NodeProvider // Injected dependency for querying running jobs + nodeProvider NodeProvider } +// Init initializes the metric store from configuration and starts background workers. +// +// This function must be called exactly once before any other metricstore operations. +// It performs the following initialization steps: +// 1. Validates and decodes the metric store configuration +// 2. Configures worker pool size (defaults to NumCPU/2+1, max 10) +// 3. Loads metric configurations from all registered clusters +// 4. Restores checkpoints within the retention window +// 5. Starts background workers for retention, checkpointing, archiving, and monitoring +// 6. Optionally subscribes to NATS for real-time metric ingestion +// +// Parameters: +// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig) +// - wg: WaitGroup that will be incremented for each background goroutine started +// +// The function will call cclog.Fatal on critical errors during initialization. +// Use Shutdown() to cleanly stop all background workers started by Init(). +// +// Note: Signal handling must be implemented by the caller. Call Shutdown() when +// receiving termination signals to ensure checkpoint data is persisted. func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { startupTime := time.Now() @@ -163,7 +208,9 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { // The caller is responsible for handling shutdown signals and calling // the shutdown() function when appropriate. // Store the shutdown function for later use by Shutdown() + shutdownFuncMu.Lock() shutdownFunc = shutdown + shutdownFuncMu.Unlock() if Keys.Subscriptions != nil { err = ReceiveNats(ms, 1, ctx) @@ -173,8 +220,17 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { } } -// InitMetrics creates a new, initialized instance of a MemoryStore. -// Will panic if values in the metric configurations are invalid. +// InitMetrics initializes the singleton MemoryStore instance with the given metric configurations. +// +// This function must be called before GetMemoryStore() and can only be called once due to +// the singleton pattern. It assigns each metric an internal offset for efficient buffer indexing. +// +// Parameters: +// - metrics: Map of metric names to their configurations (frequency and aggregation strategy) +// +// Panics if any metric has Frequency == 0, which indicates an invalid configuration. +// +// After this call, the global msInstance is ready for use via GetMemoryStore(). func InitMetrics(metrics map[string]MetricConfig) { singleton.Do(func() { offset := 0 @@ -201,6 +257,11 @@ func InitMetrics(metrics map[string]MetricConfig) { }) } +// GetMemoryStore returns the singleton MemoryStore instance. +// +// Returns the initialized MemoryStore singleton. Calls cclog.Fatal if InitMetrics() was not called first. +// +// This function is safe for concurrent use after initialization. func GetMemoryStore() *MemoryStore { if msInstance == nil { cclog.Fatalf("[METRICSTORE]> MemoryStore not initialized!") @@ -217,7 +278,22 @@ func (ms *MemoryStore) SetNodeProvider(provider NodeProvider) { ms.nodeProvider = provider } +// Shutdown performs a graceful shutdown of the metric store. +// +// This function cancels all background goroutines started by Init() and writes +// a final checkpoint to disk before returning. It should be called when the +// application receives a termination signal. +// +// The function will: +// 1. Cancel the context to stop all background workers +// 2. Close NATS message channels if using Avro format +// 3. Write a final checkpoint to preserve in-memory data +// 4. Log any errors encountered during shutdown +// +// Note: This function blocks until the final checkpoint is written. func Shutdown() { + shutdownFuncMu.Lock() + defer shutdownFuncMu.Unlock() if shutdownFunc != nil { shutdownFunc() } @@ -244,6 +320,17 @@ func Shutdown() { cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files) } +// Retention starts a background goroutine that periodically frees old metric data. +// +// This worker runs at half the retention interval and calls Free() to remove buffers +// older than the configured retention time. It respects the NodeProvider to preserve +// data for nodes with active jobs. +// +// Parameters: +// - wg: WaitGroup to signal completion when context is cancelled +// - ctx: Context for cancellation signal +// +// The goroutine exits when ctx is cancelled. func Retention(wg *sync.WaitGroup, ctx context.Context) { ms := GetMemoryStore() @@ -283,6 +370,17 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { }() } +// MemoryUsageTracker starts a background goroutine that monitors memory usage. +// +// This worker checks memory usage every minute and force-frees buffers if memory +// exceeds the configured cap. It protects against infinite loops by limiting +// iterations and forcing garbage collection between attempts. +// +// Parameters: +// - wg: WaitGroup to signal completion when context is cancelled +// - ctx: Context for cancellation signal +// +// The goroutine exits when ctx is cancelled. func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { ms := GetMemoryStore() @@ -303,15 +401,16 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { return case <-ticker.C: memoryUsageGB := ms.SizeInGB() - cclog.Infof("[METRICSTORE]> current memory usage: %.2f\n", memoryUsageGB) + cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB\n", memoryUsageGB) if memoryUsageGB > float64(Keys.MemoryCap) { - cclog.Warnf("[METRICSTORE]> current memory usage is greater than the Memory Cap: %d\n", Keys.MemoryCap) + cclog.Warnf("[METRICSTORE]> current memory usage is greater than the Memory Cap: %d GB\n", Keys.MemoryCap) cclog.Warnf("[METRICSTORE]> starting to force-free the buffers from the Metric Store\n") freedTotal := 0 + const maxIterations = 100 - for { + for range maxIterations { memoryUsageGB = ms.SizeInGB() if memoryUsageGB < float64(Keys.MemoryCap) { break @@ -319,16 +418,23 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { freed, err := ms.ForceFree() if err != nil { - cclog.Errorf("error while force-freeing the buffers: %s", err) + cclog.Errorf("[METRICSTORE]> error while force-freeing the buffers: %s", err) } if freed == 0 { - cclog.Fatalf("0 buffers force-freed in last try, %d total buffers force-freed, memory usage of %.2f remains higher than the memory cap and there are no buffers left to force-free\n", freedTotal, memoryUsageGB) + cclog.Errorf("[METRICSTORE]> 0 buffers force-freed in last try, %d total buffers force-freed, memory usage of %.2f GB remains higher than the memory cap of %d GB and there are no buffers left to force-free\n", freedTotal, memoryUsageGB, Keys.MemoryCap) + break } freedTotal += freed + + runtime.GC() } - cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freedTotal) - cclog.Infof("[METRICSTORE]> current memory usage after force-freeing the buffers: %.2f\n", memoryUsageGB) + if memoryUsageGB >= float64(Keys.MemoryCap) { + cclog.Errorf("[METRICSTORE]> reached maximum iterations (%d) or no more buffers to free, current memory usage: %.2f GB\n", maxIterations, memoryUsageGB) + } else { + cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freedTotal) + cclog.Infof("[METRICSTORE]> current memory usage after force-freeing the buffers: %.2f GB\n", memoryUsageGB) + } } } @@ -336,6 +442,24 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { }() } +// Free removes metric data older than the given time while preserving data for active nodes. +// +// This function implements intelligent retention by consulting the NodeProvider (if configured) +// to determine which nodes are currently in use by running jobs. Data for these nodes is +// preserved even if older than the retention time. +// +// Parameters: +// - ms: The MemoryStore instance +// - t: Time threshold - buffers with data older than this will be freed +// +// Returns: +// - Number of buffers freed +// - Error if NodeProvider query fails +// +// Behavior: +// - If no NodeProvider is set: frees all buffers older than t +// - If NodeProvider returns empty map: frees all buffers older than t +// - Otherwise: preserves buffers for nodes returned by GetUsedNodes(), frees others func Free(ms *MemoryStore, t time.Time) (int, error) { // If no NodeProvider is configured, free all buffers older than t if ms.nodeProvider == nil { @@ -362,8 +486,17 @@ func Free(ms *MemoryStore, t time.Time) (int, error) { } } -// A function to free specific selectors. Used when we want to retain some specific nodes -// beyond the retention time. +// FreeSelected frees buffers for specific selectors while preserving others. +// +// This function is used when we want to retain some specific nodes beyond the retention time. +// It iterates through the provided selectors and frees their associated buffers. +// +// Parameters: +// - ms: The MemoryStore instance +// - selectors: List of selector paths to free (e.g., [["cluster1", "node1"], ["cluster2", "node2"]]) +// - t: Time threshold for freeing buffers +// +// Returns the total number of buffers freed and any error encountered. func FreeSelected(ms *MemoryStore, selectors [][]string, t time.Time) (int, error) { freed := 0 @@ -380,8 +513,22 @@ func FreeSelected(ms *MemoryStore, selectors [][]string, t time.Time) (int, erro return freed, nil } -// This function will populate all the second last levels - meaning nodes -// From that we can exclude the specific selectosr/node we want to retain. +// GetSelectors returns all selectors at depth 2 (cluster/node level) that are NOT in the exclusion map. +// +// This function generates a list of selectors whose buffers should be freed by excluding +// selectors that correspond to nodes currently in use by running jobs. +// +// Parameters: +// - ms: The MemoryStore instance +// - excludeSelectors: Map of cluster names to node hostnames that should NOT be freed +// +// Returns a list of selectors ([]string paths) that can be safely freed. +// +// Example: +// +// If the tree has paths ["emmy", "node001"] and ["emmy", "node002"], +// and excludeSelectors contains {"emmy": ["node001"]}, +// then only [["emmy", "node002"]] is returned. func GetSelectors(ms *MemoryStore, excludeSelectors map[string][]string) [][]string { allSelectors := ms.GetPaths(2) @@ -432,6 +579,7 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error if metric.MetricConfig.Frequency == 0 { metric.MetricConfig, ok = m.Metrics[metric.Name] if !ok { + cclog.Debugf("[METRICSTORE]> Unknown metric '%s' in Write() - skipping", metric.Name) metric.MetricConfig.Frequency = 0 } metrics[i] = metric