diff --git a/configs/config-demo.json b/configs/config-demo.json index e53fa8bd..7d36f19f 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -1,7 +1,6 @@ { "main": { - "addr": "127.0.0.1:8080", - "apiAllowedIPs": ["*"] + "addr": "127.0.0.1:8080" }, "cron": { "commit-job-worker": "1m", @@ -17,6 +16,8 @@ "checkpoints": { "interval": "12h" }, - "retention-in-memory": "48h" + "retention-in-memory": "48h", + "memory-cap": 100 } } + diff --git a/configs/config.json b/configs/config.json index 90a6ba79..ff224f29 100644 --- a/configs/config.json +++ b/configs/config.json @@ -49,6 +49,7 @@ "checkpoints": { "interval": "12h" }, + "memory-cap": 100, "retention-in-memory": "48h", "nats-subscriptions": [ { diff --git a/internal/metricstore/api.go b/internal/metricstore/api.go index d8a2ea82..a72c2ac7 100644 --- a/internal/metricstore/api.go +++ b/internal/metricstore/api.go @@ -3,6 +3,9 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +// This file contains the API types and data fetching logic for querying metric data +// from the in-memory metric store. It provides structures for building complex queries +// with support for aggregation, scaling, padding, and statistics computation. package metricstore import ( @@ -15,10 +18,17 @@ import ( ) var ( + // ErrInvalidTimeRange is returned when a query has 'from' >= 'to' ErrInvalidTimeRange = errors.New("[METRICSTORE]> invalid time range: 'from' must be before 'to'") - ErrEmptyCluster = errors.New("[METRICSTORE]> cluster name cannot be empty") + // ErrEmptyCluster is returned when a query with ForAllNodes has no cluster specified + ErrEmptyCluster = errors.New("[METRICSTORE]> cluster name cannot be empty") ) +// APIMetricData represents the response data for a single metric query. +// +// It contains both the time-series data points and computed statistics (avg, min, max). +// If an error occurred during data retrieval, the Error field will be set and other +// fields may be incomplete. type APIMetricData struct { Error *string `json:"error,omitempty"` Data schema.FloatArray `json:"data,omitempty"` @@ -30,6 +40,13 @@ type APIMetricData struct { Max schema.Float `json:"max"` } +// APIQueryRequest represents a batch query request for metric data. +// +// It supports two modes of operation: +// 1. Explicit queries via the Queries field +// 2. Automatic query generation via ForAllNodes (queries all specified metrics for all nodes in the cluster) +// +// The request can be customized with flags to include/exclude statistics, raw data, and padding. type APIQueryRequest struct { Cluster string `json:"cluster"` Queries []APIQuery `json:"queries"` @@ -41,11 +58,25 @@ type APIQueryRequest struct { WithPadding bool `json:"with-padding"` } +// APIQueryResponse represents the response to an APIQueryRequest. +// +// Results is a 2D array where each outer element corresponds to a query, +// and each inner element corresponds to a selector within that query +// (e.g., multiple CPUs or cores). type APIQueryResponse struct { Queries []APIQuery `json:"queries,omitempty"` Results [][]APIMetricData `json:"results"` } +// APIQuery represents a single metric query with optional hierarchical selectors. +// +// The hierarchical selection works as follows: +// - Hostname: The node to query +// - Type + TypeIds: First level of hierarchy (e.g., "cpu" + ["0", "1", "2"]) +// - SubType + SubTypeIds: Second level of hierarchy (e.g., "core" + ["0", "1"]) +// +// If Aggregate is true, data from multiple type/subtype IDs will be aggregated according +// to the metric's aggregation strategy. Otherwise, separate results are returned for each combination. type APIQuery struct { Type *string `json:"type,omitempty"` SubType *string `json:"subtype,omitempty"` @@ -58,6 +89,11 @@ type APIQuery struct { Aggregate bool `json:"aggreg"` } +// AddStats computes and populates the Avg, Min, and Max fields from the Data array. +// +// NaN values in the data are ignored during computation. If all values are NaN, +// the statistics fields will be set to NaN. +// // TODO: Optimize this, just like the stats endpoint! func (data *APIMetricData) AddStats() { n := 0 @@ -83,6 +119,10 @@ func (data *APIMetricData) AddStats() { } } +// ScaleBy multiplies all data points and statistics by the given factor. +// +// This is commonly used for unit conversion (e.g., bytes to gigabytes). +// Scaling by 0 or 1 is a no-op for performance reasons. func (data *APIMetricData) ScaleBy(f schema.Float) { if f == 0 || f == 1 { return @@ -96,6 +136,17 @@ func (data *APIMetricData) ScaleBy(f schema.Float) { } } +// PadDataWithNull pads the beginning of the data array with NaN values if needed. +// +// This ensures that the data aligns with the requested 'from' timestamp, even if +// the metric store doesn't have data for the earliest time points. This is useful +// for maintaining consistent array indexing across multiple queries. +// +// Parameters: +// - ms: MemoryStore instance to lookup metric configuration +// - from: The requested start timestamp +// - to: The requested end timestamp (unused but kept for API consistency) +// - metric: The metric name to lookup frequency information func (data *APIMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metric string) { minfo, ok := ms.Metrics[metric] if !ok { @@ -115,6 +166,31 @@ func (data *APIMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metr } } +// FetchData executes a batch metric query request and returns the results. +// +// This is the primary API for retrieving metric data from the memory store. It supports: +// - Individual queries via req.Queries +// - Batch queries for all nodes via req.ForAllNodes +// - Hierarchical selector construction (cluster → host → type → subtype) +// - Optional statistics computation (avg, min, max) +// - Optional data scaling +// - Optional data padding with NaN values +// +// The function constructs selectors based on the query parameters and calls MemoryStore.Read() +// for each selector. If a query specifies Aggregate=false with multiple type/subtype IDs, +// separate results are returned for each combination. +// +// Parameters: +// - req: The query request containing queries, time range, and options +// +// Returns: +// - APIQueryResponse containing results for each query, or error if validation fails +// +// Errors: +// - ErrInvalidTimeRange if req.From > req.To +// - ErrEmptyCluster if req.ForAllNodes is used without specifying a cluster +// - Error if MemoryStore is not initialized +// - Individual query errors are stored in APIMetricData.Error field func FetchData(req APIQueryRequest) (*APIQueryResponse, error) { if req.From > req.To { return nil, ErrInvalidTimeRange @@ -126,7 +202,7 @@ func FetchData(req APIQueryRequest) (*APIQueryResponse, error) { req.WithData = true ms := GetMemoryStore() if ms == nil { - return nil, fmt.Errorf("memorystore not initialized") + return nil, fmt.Errorf("[METRICSTORE]> memorystore not initialized") } response := APIQueryResponse{ @@ -195,8 +271,6 @@ func FetchData(req APIQueryRequest) (*APIQueryResponse, error) { } } - // log.Printf("query: %#v\n", query) - // log.Printf("sels: %#v\n", sels) var err error res := make([]APIMetricData, 0, len(sels)) for _, sel := range sels { diff --git a/internal/metricstore/archive.go b/internal/metricstore/archive.go index 78efadfe..68f88741 100644 --- a/internal/metricstore/archive.go +++ b/internal/metricstore/archive.go @@ -21,12 +21,36 @@ import ( cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" ) +// Worker for either Archiving or Deleting files + func Archiving(wg *sync.WaitGroup, ctx context.Context) { + if Keys.Archive != nil { + // Run as Archiver + runWorker(wg, ctx, + Keys.Archive.ArchiveInterval, + "archiving", + Keys.Archive.RootDir, + Keys.Archive.DeleteInstead, + ) + } else { + // Run as Deleter + runWorker(wg, ctx, + Keys.RetentionInMemory, + "deleting", + "", + true, + ) + } +} + +// runWorker takes simple values to configure what it does +func runWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, archiveDir string, delete bool) { go func() { defer wg.Done() - d, err := time.ParseDuration(Keys.Archive.ArchiveInterval) + + d, err := time.ParseDuration(interval) if err != nil { - cclog.Fatalf("[METRICSTORE]> error parsing archive interval duration: %v\n", err) + cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err) } if d <= 0 { return @@ -41,14 +65,18 @@ func Archiving(wg *sync.WaitGroup, ctx context.Context) { return case <-ticker.C: t := time.Now().Add(-d) - cclog.Infof("[METRICSTORE]> start archiving checkpoints (older than %s)...", t.Format(time.RFC3339)) - n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir, - Keys.Archive.RootDir, t.Unix(), Keys.Archive.DeleteInstead) + cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339)) + + n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir, archiveDir, t.Unix(), delete) if err != nil { - cclog.Errorf("[METRICSTORE]> archiving failed: %s", err.Error()) + cclog.Errorf("[METRICSTORE]> %s failed: %s", mode, err.Error()) } else { - cclog.Infof("[METRICSTORE]> done: %d files zipped and moved to archive", n) + if delete && archiveDir == "" { + cclog.Infof("[METRICSTORE]> done: %d checkpoints deleted", n) + } else { + cclog.Infof("[METRICSTORE]> done: %d files zipped and moved to archive", n) + } } } } diff --git a/internal/metricstore/buffer.go b/internal/metricstore/buffer.go index 94d3ce76..3687c4dc 100644 --- a/internal/metricstore/buffer.go +++ b/internal/metricstore/buffer.go @@ -3,6 +3,41 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +// Package metricstore provides buffer.go: Time-series data buffer implementation. +// +// # Buffer Architecture +// +// Each metric at each hierarchical level (cluster/host/cpu/etc.) uses a linked-list +// chain of fixed-size buffers to store time-series data. This design: +// +// - Avoids reallocation/copying when growing (new links added instead) +// - Enables efficient pooling (buffers returned to sync.Pool) +// - Supports traversal back in time (via prev pointers) +// - Maintains temporal ordering (newer data in later buffers) +// +// # Buffer Chain Example +// +// [oldest buffer] <- prev -- [older] <- prev -- [newest buffer (head)] +// start=1000 start=1512 start=2024 +// data=[v0...v511] data=[v0...v511] data=[v0...v42] +// +// When the head buffer reaches capacity (BufferCap = 512), a new buffer becomes +// the new head and the old head is linked via prev. +// +// # Pooling Strategy +// +// sync.Pool reduces GC pressure for the common case (BufferCap-sized allocations). +// Non-standard capacity buffers are not pooled (e.g., from checkpoint deserialization). +// +// # Time Alignment +// +// Timestamps are aligned to measurement frequency intervals: +// +// index = (timestamp - buffer.start) / buffer.frequency +// actualTime = buffer.start + (frequency / 2) + (index * frequency) +// +// Missing data points are represented as NaN values. The read() function performs +// linear interpolation where possible. package metricstore import ( @@ -27,14 +62,30 @@ var bufferPool sync.Pool = sync.Pool{ } var ( - ErrNoData error = errors.New("[METRICSTORE]> no data for this metric/level") + // ErrNoData indicates no time-series data exists for the requested metric/level. + ErrNoData error = errors.New("[METRICSTORE]> no data for this metric/level") + + // ErrDataDoesNotAlign indicates that aggregated data from child scopes + // does not align with the parent scope's expected timestamps/intervals. ErrDataDoesNotAlign error = errors.New("[METRICSTORE]> data from lower granularities does not align") ) -// Each metric on each level has it's own buffer. -// This is where the actual values go. -// If `cap(data)` is reached, a new buffer is created and -// becomes the new head of a buffer list. +// buffer stores time-series data for a single metric at a specific hierarchical level. +// +// Buffers form doubly-linked chains ordered by time. When capacity is reached, +// a new buffer becomes the head and the old head is linked via prev/next. +// +// Fields: +// - prev: Link to older buffer in the chain (nil if this is oldest) +// - next: Link to newer buffer in the chain (nil if this is newest/head) +// - data: Time-series values (schema.Float supports NaN for missing data) +// - frequency: Measurement interval in seconds +// - start: Start timestamp (adjusted by -frequency/2 for alignment) +// - archived: True if data has been persisted to disk archive +// - closed: True if buffer is no longer accepting writes +// +// Index calculation: index = (timestamp - start) / frequency +// Actual data timestamp: start + (frequency / 2) + (index * frequency) type buffer struct { prev *buffer next *buffer @@ -57,10 +108,22 @@ func newBuffer(ts, freq int64) *buffer { return b } -// If a new buffer was created, the new head is returnd. -// Otherwise, the existing buffer is returnd. -// Normaly, only "newer" data should be written, but if the value would -// end up in the same buffer anyways it is allowed. +// write appends a timestamped value to the buffer chain. +// +// Returns the head buffer (which may be newly created if capacity was reached). +// Timestamps older than the buffer's start are rejected. If the calculated index +// exceeds capacity, a new buffer is allocated and linked as the new head. +// +// Missing timestamps are automatically filled with NaN values to maintain alignment. +// Overwrites are allowed if the index is already within the existing data slice. +// +// Parameters: +// - ts: Unix timestamp in seconds +// - value: Metric value (can be schema.NaN for missing data) +// +// Returns: +// - *buffer: The new head buffer (same as b if no new buffer created) +// - error: Non-nil if timestamp is before buffer start func (b *buffer) write(ts int64, value schema.Float) (*buffer, error) { if ts < b.start { return nil, errors.New("[METRICSTORE]> cannot write value to buffer from past") @@ -99,13 +162,27 @@ func (b *buffer) firstWrite() int64 { return b.start + (b.frequency / 2) } -// Return all known values from `from` to `to`. Gaps of information are represented as NaN. -// Simple linear interpolation is done between the two neighboring cells if possible. -// If values at the start or end are missing, instead of NaN values, the second and thrid -// return values contain the actual `from`/`to`. -// This function goes back the buffer chain if `from` is older than the currents buffer start. -// The loaded values are added to `data` and `data` is returned, possibly with a shorter length. -// If `data` is not long enough to hold all values, this function will panic! +// read retrieves time-series data from the buffer chain for the specified time range. +// +// Traverses the buffer chain backwards (via prev links) if 'from' precedes the current +// buffer's start. Missing data points are represented as NaN. Values are accumulated +// into the provided 'data' slice (using +=, so caller must zero-initialize if needed). +// +// The function adjusts the actual time range returned if data is unavailable at the +// boundaries (returned via adjusted from/to timestamps). +// +// Parameters: +// - from: Start timestamp (Unix seconds) +// - to: End timestamp (Unix seconds, exclusive) +// - data: Pre-allocated slice to accumulate results (must be large enough) +// +// Returns: +// - []schema.Float: Slice of data (may be shorter than input 'data' slice) +// - int64: Actual start timestamp with available data +// - int64: Actual end timestamp (exclusive) +// - error: Non-nil on failure +// +// Panics if 'data' slice is too small to hold all values in [from, to). func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int64, int64, error) { if from < b.firstWrite() { if b.prev != nil { @@ -142,7 +219,18 @@ func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int6 return data[:i], from, t, nil } -// Returns true if this buffer needs to be freed. +// free removes buffers older than the specified timestamp from the chain. +// +// Recursively traverses backwards (via prev) and unlinks buffers whose end time +// is before the retention threshold. Freed buffers are returned to the pool if +// they have the standard capacity (BufferCap). +// +// Parameters: +// - t: Retention threshold timestamp (Unix seconds) +// +// Returns: +// - delme: True if the current buffer itself should be deleted by caller +// - n: Number of buffers freed in this subtree func (b *buffer) free(t int64) (delme bool, n int) { if b.prev != nil { delme, m := b.prev.free(t) @@ -164,7 +252,51 @@ func (b *buffer) free(t int64) (delme bool, n int) { return false, n } -// Call `callback` on every buffer that contains data in the range from `from` to `to`. +// forceFreeOldest recursively finds the end of the linked list (the oldest buffer) +// and removes it. +// Returns: +// +// delme: true if 'b' itself is the oldest and should be removed by the caller +// n: the number of buffers freed (will be 1 or 0) +func (b *buffer) forceFreeOldest() (delme bool, n int) { + // If there is a previous buffer, recurse down to find the oldest + if b.prev != nil { + delPrev, freed := b.prev.forceFreeOldest() + + // If the previous buffer signals it should be deleted: + if delPrev { + // Unlink references + b.prev.next = nil + + // Return to pool if capacity matches + if cap(b.prev.data) == BufferCap { + bufferPool.Put(b.prev) + } + + // Remove the link from the current buffer + b.prev = nil + } + return false, freed + } + + // If b.prev is nil, THIS buffer is the oldest. + // We return true so the parent (or the Level loop) knows to delete reference to 'b'. + return true, 1 +} + +// iterFromTo invokes callback on every buffer in the chain that overlaps [from, to]. +// +// Traverses backwards (via prev) first, then processes current buffer if it overlaps +// the time range. Used for checkpoint/archive operations that need to serialize buffers +// within a specific time window. +// +// Parameters: +// - from: Start timestamp (Unix seconds, inclusive) +// - to: End timestamp (Unix seconds, inclusive) +// - callback: Function to invoke on each overlapping buffer +// +// Returns: +// - error: First error returned by callback, or nil if all succeeded func (b *buffer) iterFromTo(from, to int64, callback func(b *buffer) error) error { if b == nil { return nil diff --git a/internal/metricstore/checkpoint.go b/internal/metricstore/checkpoint.go index b5511221..b90b1c22 100644 --- a/internal/metricstore/checkpoint.go +++ b/internal/metricstore/checkpoint.go @@ -3,6 +3,34 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +// This file implements checkpoint persistence for the in-memory metric store. +// +// Checkpoints enable graceful restarts by periodically saving in-memory metric +// data to disk in either JSON or Avro format. The checkpoint system: +// +// Key Features: +// - Periodic background checkpointing via the Checkpointing() worker +// - Two formats: JSON (human-readable) and Avro (compact, efficient) +// - Parallel checkpoint creation and loading using worker pools +// - Hierarchical file organization: checkpoint_dir/cluster/host/timestamp.{json|avro} +// - Only saves unarchived data (archived data is already persisted elsewhere) +// - Automatic format detection and fallback during loading +// - GC optimization during loading to prevent excessive heap growth +// +// Checkpoint Workflow: +// 1. Init() loads checkpoints within retention window at startup +// 2. Checkpointing() worker periodically saves new data +// 3. Shutdown() writes final checkpoint before exit +// +// File Organization: +// +// checkpoints/ +// cluster1/ +// host001/ +// 1234567890.json (timestamp = checkpoint start time) +// 1234567950.json +// host002/ +// ... package metricstore import ( @@ -29,18 +57,21 @@ import ( ) const ( - CheckpointFilePerms = 0o644 - CheckpointDirPerms = 0o755 - GCTriggerInterval = DefaultGCTriggerInterval + CheckpointFilePerms = 0o644 // File permissions for checkpoint files + CheckpointDirPerms = 0o755 // Directory permissions for checkpoint directories + GCTriggerInterval = DefaultGCTriggerInterval // Interval for triggering GC during checkpoint loading ) -// Whenever changed, update MarshalJSON as well! +// CheckpointMetrics represents metric data in a checkpoint file. +// Whenever the structure changes, update MarshalJSON as well! type CheckpointMetrics struct { Data []schema.Float `json:"data"` Frequency int64 `json:"frequency"` Start int64 `json:"start"` } +// CheckpointFile represents the hierarchical structure of a checkpoint file. +// It mirrors the Level tree structure from the MemoryStore. type CheckpointFile struct { Metrics map[string]*CheckpointMetrics `json:"metrics"` Children map[string]*CheckpointFile `json:"children"` @@ -48,10 +79,23 @@ type CheckpointFile struct { To int64 `json:"to"` } -var lastCheckpoint time.Time +// lastCheckpoint tracks the timestamp of the last checkpoint creation. +var ( + lastCheckpoint time.Time + lastCheckpointMu sync.Mutex +) +// Checkpointing starts a background worker that periodically saves metric data to disk. +// +// The behavior depends on the configured file format: +// - JSON: Periodic checkpointing based on Keys.Checkpoints.Interval +// - Avro: Initial delay + periodic checkpointing at DefaultAvroCheckpointInterval +// +// The worker respects context cancellation and signals completion via the WaitGroup. func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { + lastCheckpointMu.Lock() lastCheckpoint = time.Now() + lastCheckpointMu.Unlock() if Keys.Checkpoints.FileFormat == "json" { ms := GetMemoryStore() @@ -60,9 +104,10 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { defer wg.Done() d, err := time.ParseDuration(Keys.Checkpoints.Interval) if err != nil { - cclog.Fatal(err) + cclog.Fatalf("[METRICSTORE]> invalid checkpoint interval '%s': %s", Keys.Checkpoints.Interval, err.Error()) } if d <= 0 { + cclog.Warnf("[METRICSTORE]> checkpoint interval is zero or negative (%s), checkpointing disabled", d) return } @@ -74,15 +119,21 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", lastCheckpoint.Format(time.RFC3339)) + lastCheckpointMu.Lock() + from := lastCheckpoint + lastCheckpointMu.Unlock() + + cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339)) now := time.Now() n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, - lastCheckpoint.Unix(), now.Unix()) + from.Unix(), now.Unix()) if err != nil { cclog.Errorf("[METRICSTORE]> checkpointing failed: %s", err.Error()) } else { cclog.Infof("[METRICSTORE]> done: %d checkpoint files created", n) + lastCheckpointMu.Lock() lastCheckpoint = now + lastCheckpointMu.Unlock() } } } @@ -113,9 +164,10 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { } } -// As `Float` implements a custom MarshalJSON() function, -// serializing an array of such types has more overhead -// than one would assume (because of extra allocations, interfaces and so on). +// MarshalJSON provides optimized JSON encoding for CheckpointMetrics. +// +// Since schema.Float has custom MarshalJSON, serializing []Float has significant overhead. +// This method manually constructs JSON to avoid allocations and interface conversions. func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) { buf := make([]byte, 0, 128+len(cm.Data)*8) buf = append(buf, `{"frequency":`...) @@ -137,13 +189,27 @@ func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) { return buf, nil } -// Metrics stored at the lowest 2 levels are not stored away (root and cluster)! -// On a per-host basis a new JSON file is created. I have no idea if this will scale. -// The good thing: Only a host at a time is locked, so this function can run -// in parallel to writes/reads. +// ToCheckpoint writes metric data to checkpoint files in parallel. +// +// Metrics at root and cluster levels are skipped. One file per host is created. +// Uses worker pool (Keys.NumWorkers) for parallel processing. Only locks one host +// at a time, allowing concurrent writes/reads to other hosts. +// +// Returns the number of checkpoint files created and any errors encountered. func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { - levels := make([]*Level, 0) - selectors := make([][]string, 0) + // Pre-calculate capacity by counting cluster/host pairs + m.root.lock.RLock() + totalHosts := 0 + for _, l1 := range m.root.children { + l1.lock.RLock() + totalHosts += len(l1.children) + l1.lock.RUnlock() + } + m.root.lock.RUnlock() + + levels := make([]*Level, 0, totalHosts) + selectors := make([][]string, 0, totalHosts) + m.root.lock.RLock() for sel1, l1 := range m.root.children { l1.lock.RLock() @@ -203,6 +269,8 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { return int(n), nil } +// toCheckpointFile recursively converts a Level tree to CheckpointFile structure. +// Skips metrics that are already archived. Returns nil if no unarchived data exists. func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFile, error) { l.lock.RLock() defer l.lock.RUnlock() @@ -224,6 +292,7 @@ func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil b.iterFromTo(from, to, func(b *buffer) error { if !b.archived { allArchived = false + return fmt.Errorf("stop") // Early termination signal } return nil }) @@ -267,6 +336,8 @@ func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil return retval, nil } +// toCheckpoint writes a Level's data to a JSON checkpoint file. +// Creates directory if needed. Returns ErrNoNewArchiveData if nothing to save. func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { cf, err := l.toCheckpointFile(from, to, m) if err != nil { @@ -278,11 +349,11 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { } filepath := path.Join(dir, fmt.Sprintf("%d.json", from)) - f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms) + f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) if err != nil && os.IsNotExist(err) { err = os.MkdirAll(dir, CheckpointDirPerms) if err == nil { - f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms) + f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) } } if err != nil { @@ -298,9 +369,54 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { return bw.Flush() } +// enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs. +// Returns error if directory structure is invalid. +func enqueueCheckpointHosts(dir string, work chan<- [2]string) error { + clustersDir, err := os.ReadDir(dir) + if err != nil { + return err + } + + gcCounter := 0 + for _, clusterDir := range clustersDir { + if !clusterDir.IsDir() { + return errors.New("[METRICSTORE]> expected only directories at first level of checkpoints/ directory") + } + + hostsDir, err := os.ReadDir(filepath.Join(dir, clusterDir.Name())) + if err != nil { + return err + } + + for _, hostDir := range hostsDir { + if !hostDir.IsDir() { + return errors.New("[METRICSTORE]> expected only directories at second level of checkpoints/ directory") + } + + gcCounter++ + if gcCounter%GCTriggerInterval == 0 { + // Forcing garbage collection runs here regulary during the loading of checkpoints + // will decrease the total heap size after loading everything back to memory is done. + // While loading data, the heap will grow fast, so the GC target size will double + // almost always. By forcing GCs here, we can keep it growing more slowly so that + // at the end, less memory is wasted. + runtime.GC() + } + + work <- [2]string{clusterDir.Name(), hostDir.Name()} + } + } + + return nil +} + +// FromCheckpoint loads checkpoint files from disk into memory in parallel. +// +// Uses worker pool to load cluster/host combinations. Periodically triggers GC +// to prevent excessive heap growth. Returns number of files loaded and any errors. func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (int, error) { var wg sync.WaitGroup - work := make(chan [2]string, Keys.NumWorkers) + work := make(chan [2]string, Keys.NumWorkers*4) n, errs := int32(0), int32(0) wg.Add(Keys.NumWorkers) @@ -319,40 +435,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) ( }() } - i := 0 - clustersDir, err := os.ReadDir(dir) - for _, clusterDir := range clustersDir { - if !clusterDir.IsDir() { - err = errors.New("[METRICSTORE]> expected only directories at first level of checkpoints/ directory") - goto done - } - - hostsDir, e := os.ReadDir(filepath.Join(dir, clusterDir.Name())) - if e != nil { - err = e - goto done - } - - for _, hostDir := range hostsDir { - if !hostDir.IsDir() { - err = errors.New("[METRICSTORE]> expected only directories at second level of checkpoints/ directory") - goto done - } - - i++ - if i%Keys.NumWorkers == 0 && i > GCTriggerInterval { - // Forcing garbage collection runs here regulary during the loading of checkpoints - // will decrease the total heap size after loading everything back to memory is done. - // While loading data, the heap will grow fast, so the GC target size will double - // almost always. By forcing GCs here, we can keep it growing more slowly so that - // at the end, less memory is wasted. - runtime.GC() - } - - work <- [2]string{clusterDir.Name(), hostDir.Name()} - } - } -done: + err := enqueueCheckpointHosts(dir, work) close(work) wg.Wait() @@ -366,9 +449,11 @@ done: return int(n), nil } -// Metrics stored at the lowest 2 levels are not loaded (root and cluster)! -// This function can only be called once and before the very first write or read. -// Different host's data is loaded to memory in parallel. +// FromCheckpointFiles is the main entry point for loading checkpoints at startup. +// +// Automatically detects checkpoint format (JSON vs Avro) and falls back if needed. +// Creates checkpoint directory if it doesn't exist. This function must be called +// before any writes or reads, and can only be called once. func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { if _, err := os.Stat(dir); os.IsNotExist(err) { // The directory does not exist, so create it using os.MkdirAll() @@ -411,6 +496,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { return 0, nil } +// checkFilesWithExtension walks a directory tree to check if files with the given extension exist. func checkFilesWithExtension(dir string, extension string) (bool, error) { found := false @@ -730,13 +816,24 @@ func findFiles(direntries []fs.DirEntry, t int64, extension string, findMoreRece return nums[a.Name()] < nums[b.Name()] }) + if len(nums) == 0 { + return nil, nil + } + filenames := make([]string, 0) - for i := range direntries { - e := direntries[i] + + for i, e := range direntries { ts1 := nums[e.Name()] + // Logic to look for files in forward or direction + // If logic: All files greater than or after + // the given timestamp will be selected + // Else If logic: All files less than or before + // the given timestamp will be selected if findMoreRecentFiles && t <= ts1 { filenames = append(filenames, e.Name()) + } else if !findMoreRecentFiles && ts1 <= t && ts1 != 0 { + filenames = append(filenames, e.Name()) } if i == len(direntries)-1 { continue @@ -749,10 +846,6 @@ func findFiles(direntries []fs.DirEntry, t int64, extension string, findMoreRece if ts1 < t && t < ts2 { filenames = append(filenames, e.Name()) } - } else { - if ts2 < t { - filenames = append(filenames, e.Name()) - } } } diff --git a/internal/metricstore/config.go b/internal/metricstore/config.go index 9657453d..34e536d0 100644 --- a/internal/metricstore/config.go +++ b/internal/metricstore/config.go @@ -3,6 +3,43 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +// Package metricstore provides config.go: Configuration structures and metric management. +// +// # Configuration Hierarchy +// +// The metricstore package uses nested configuration structures: +// +// MetricStoreConfig (Keys) +// ├─ NumWorkers: Parallel checkpoint/archive workers +// ├─ RetentionInMemory: How long to keep data in RAM +// ├─ MemoryCap: Memory limit in bytes (triggers forceFree) +// ├─ Checkpoints: Persistence configuration +// │ ├─ FileFormat: "avro" or "json" +// │ ├─ Interval: How often to save (e.g., "1h") +// │ └─ RootDir: Checkpoint storage path +// ├─ Archive: Long-term storage configuration +// │ ├─ ArchiveInterval: How often to archive +// │ ├─ RootDir: Archive storage path +// │ └─ DeleteInstead: Delete old data instead of archiving +// ├─ Debug: Development/debugging options +// └─ Subscriptions: NATS topic subscriptions for metric ingestion +// +// # Metric Configuration +// +// Each metric (e.g., "cpu_load", "mem_used") has a MetricConfig entry in the global +// Metrics map, defining: +// +// - Frequency: Measurement interval in seconds +// - Aggregation: How to combine values (sum/avg/none) when transforming scopes +// - offset: Internal index into Level.metrics slice (assigned during Init) +// +// # AggregationStrategy +// +// Determines how to combine metric values when aggregating from finer to coarser scopes: +// +// - NoAggregation: Do not combine (incompatible scopes) +// - SumAggregation: Add values (e.g., power consumption: core→socket) +// - AvgAggregation: Average values (e.g., temperature: core→socket) package metricstore import ( @@ -19,23 +56,50 @@ const ( DefaultAvroCheckpointInterval = time.Minute ) +// Checkpoints configures periodic persistence of in-memory metric data. +// +// Fields: +// - FileFormat: "avro" (default, binary, compact) or "json" (human-readable, slower) +// - Interval: Duration string (e.g., "1h", "30m") between checkpoint saves +// - RootDir: Filesystem path for checkpoint files (created if missing) type Checkpoints struct { FileFormat string `json:"file-format"` Interval string `json:"interval"` RootDir string `json:"directory"` } +// Debug provides development and profiling options. +// +// Fields: +// - DumpToFile: Path to dump checkpoint data for inspection (empty = disabled) +// - EnableGops: Enable gops agent for live runtime debugging (https://github.com/google/gops) type Debug struct { DumpToFile string `json:"dump-to-file"` EnableGops bool `json:"gops"` } +// Archive configures long-term storage of old metric data. +// +// Data older than RetentionInMemory is archived to disk or deleted. +// +// Fields: +// - ArchiveInterval: Duration string (e.g., "24h") between archive operations +// - RootDir: Filesystem path for archived data (created if missing) +// - DeleteInstead: If true, delete old data instead of archiving (saves disk space) type Archive struct { ArchiveInterval string `json:"interval"` RootDir string `json:"directory"` DeleteInstead bool `json:"delete-instead"` } +// Subscriptions defines NATS topics to subscribe to for metric ingestion. +// +// Each subscription receives metrics via NATS messaging, enabling real-time +// data collection from compute nodes. +// +// Fields: +// - SubscribeTo: NATS subject/channel name (e.g., "metrics.compute.*") +// - ClusterTag: Default cluster name for metrics without cluster tag (optional) type Subscriptions []struct { // Channel name SubscribeTo string `json:"subscribe-to"` @@ -44,6 +108,19 @@ type Subscriptions []struct { ClusterTag string `json:"cluster-tag"` } +// MetricStoreConfig defines the main configuration for the metricstore. +// +// Loaded from cc-backend's config.json "metricstore" section. Controls memory usage, +// persistence, archiving, and metric ingestion. +// +// Fields: +// - NumWorkers: Parallel workers for checkpoint/archive (0 = auto: min(NumCPU/2+1, 10)) +// - RetentionInMemory: Duration string (e.g., "48h") for in-memory data retention +// - MemoryCap: Max bytes for buffer data (0 = unlimited); triggers forceFree when exceeded +// - Checkpoints: Periodic persistence configuration +// - Debug: Development/profiling options (nil = disabled) +// - Archive: Long-term storage configuration (nil = disabled) +// - Subscriptions: NATS topics for metric ingestion (nil = polling only) type MetricStoreConfig struct { // Number of concurrent workers for checkpoint and archive operations. // If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10) @@ -56,6 +133,10 @@ type MetricStoreConfig struct { Subscriptions *Subscriptions `json:"nats-subscriptions"` } +// Keys is the global metricstore configuration instance. +// +// Initialized with defaults, then overwritten by cc-backend's config.json. +// Accessed by Init(), Checkpointing(), and other lifecycle functions. var Keys MetricStoreConfig = MetricStoreConfig{ Checkpoints: Checkpoints{ FileFormat: "avro", @@ -63,15 +144,33 @@ var Keys MetricStoreConfig = MetricStoreConfig{ }, } -// AggregationStrategy for aggregation over multiple values at different cpus/sockets/..., not time! +// AggregationStrategy defines how to combine metric values across hierarchy levels. +// +// Used when transforming data from finer-grained scopes (e.g., core) to coarser scopes +// (e.g., socket). This is SPATIAL aggregation, not TEMPORAL (time-based) aggregation. +// +// Values: +// - NoAggregation: Do not aggregate (incompatible scopes or non-aggregatable metrics) +// - SumAggregation: Add values (e.g., power: sum core power → socket power) +// - AvgAggregation: Average values (e.g., temperature: average core temps → socket temp) type AggregationStrategy int const ( - NoAggregation AggregationStrategy = iota - SumAggregation - AvgAggregation + NoAggregation AggregationStrategy = iota // Do not aggregate + SumAggregation // Sum values (e.g., power, energy) + AvgAggregation // Average values (e.g., temperature, utilization) ) +// AssignAggregationStrategy parses a string into an AggregationStrategy value. +// +// Used when loading metric configurations from JSON/YAML files. +// +// Parameters: +// - str: "sum", "avg", or "" (empty string for NoAggregation) +// +// Returns: +// - AggregationStrategy: Parsed value +// - error: Non-nil if str is unrecognized func AssignAggregationStrategy(str string) (AggregationStrategy, error) { switch str { case "": @@ -85,6 +184,14 @@ func AssignAggregationStrategy(str string) (AggregationStrategy, error) { } } +// MetricConfig defines configuration for a single metric type. +// +// Stored in the global Metrics map, keyed by metric name (e.g., "cpu_load"). +// +// Fields: +// - Frequency: Measurement interval in seconds (e.g., 60 for 1-minute granularity) +// - Aggregation: How to combine values across hierarchy levels (sum/avg/none) +// - offset: Internal index into Level.metrics slice (assigned during Init) type MetricConfig struct { // Interval in seconds at which measurements are stored Frequency int64 @@ -96,8 +203,21 @@ type MetricConfig struct { offset int } +// Metrics is the global map of metric configurations. +// +// Keyed by metric name (e.g., "cpu_load", "mem_used"). Populated during Init() +// from cluster configuration and checkpoint restoration. Each MetricConfig.offset +// corresponds to the buffer slice index in Level.metrics. var Metrics map[string]MetricConfig +// GetMetricFrequency retrieves the measurement interval for a metric. +// +// Parameters: +// - metricName: Metric name (e.g., "cpu_load") +// +// Returns: +// - int64: Frequency in seconds +// - error: Non-nil if metric not found in Metrics map func GetMetricFrequency(metricName string) (int64, error) { if metric, ok := Metrics[metricName]; ok { return metric.Frequency, nil @@ -105,9 +225,18 @@ func GetMetricFrequency(metricName string) (int64, error) { return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName) } -// AddMetric adds logic to add metrics. Redundant metrics should be updated with max frequency. -// use metric.Name to check if the metric already exists. -// if not, add it to the Metrics map. +// AddMetric registers a new metric or updates an existing one. +// +// If the metric already exists with a different frequency, uses the higher frequency +// (finer granularity). This handles cases where different clusters report the same +// metric at different intervals. +// +// Parameters: +// - name: Metric name (e.g., "cpu_load") +// - metric: Configuration (frequency, aggregation strategy) +// +// Returns: +// - error: Always nil (signature for future error handling) func AddMetric(name string, metric MetricConfig) error { if Metrics == nil { Metrics = make(map[string]MetricConfig, 0) diff --git a/internal/metricstore/configSchema.go b/internal/metricstore/configSchema.go index c06bc6d9..4677eca6 100644 --- a/internal/metricstore/configSchema.go +++ b/internal/metricstore/configSchema.go @@ -73,5 +73,5 @@ const configSchema = `{ } } }, - "required": ["checkpoints", "retention-in-memory"] + "required": ["checkpoints", "retention-in-memory", "memory-cap"] }` diff --git a/internal/metricstore/level.go b/internal/metricstore/level.go index 87aeefc9..bc99b884 100644 --- a/internal/metricstore/level.go +++ b/internal/metricstore/level.go @@ -3,6 +3,41 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +// Package metricstore provides level.go: Hierarchical tree structure for metric storage. +// +// # Level Architecture +// +// The Level type forms a tree structure where each node represents a level in the +// ClusterCockpit hierarchy: cluster → host → socket → core → hwthread, with special +// nodes for memory domains and accelerators. +// +// Structure: +// +// Root Level (cluster="emmy") +// ├─ Level (host="node001") +// │ ├─ Level (socket="0") +// │ │ ├─ Level (core="0") [stores cpu0 metrics] +// │ │ └─ Level (core="1") [stores cpu1 metrics] +// │ └─ Level (socket="1") +// │ └─ ... +// └─ Level (host="node002") +// └─ ... +// +// Each Level can: +// - Hold data (metrics slice of buffer pointers) +// - Have child nodes (children map[string]*Level) +// - Both simultaneously (inner nodes can store aggregated metrics) +// +// # Selector Paths +// +// Selectors are hierarchical paths: []string{"cluster", "host", "component"}. +// Example: []string{"emmy", "node001", "cpu0"} navigates to the cpu0 core level. +// +// # Concurrency +// +// RWMutex protects children map and metrics slice. Read-heavy workload (metric reads) +// uses RLock. Writes (new levels, buffer updates) use Lock. Double-checked locking +// prevents races during level creation. package metricstore import ( @@ -12,20 +47,40 @@ import ( "github.com/ClusterCockpit/cc-lib/v2/util" ) -// Could also be called "node" as this forms a node in a tree structure. -// Called Level because "node" might be confusing here. -// Can be both a leaf or a inner node. In this tree structue, inner nodes can -// also hold data (in `metrics`). +// Level represents a node in the hierarchical metric storage tree. +// +// Can be both a leaf or inner node. Inner nodes hold data in 'metrics' for aggregated +// values (e.g., socket-level metrics derived from core-level data). Named "Level" +// instead of "node" to avoid confusion with cluster nodes (hosts). +// +// Fields: +// - children: Map of child level names to Level pointers (e.g., "cpu0" → Level) +// - metrics: Slice of buffer pointers (one per metric, indexed by MetricConfig.offset) +// - lock: RWMutex for concurrent access (read-heavy, write-rare) type Level struct { children map[string]*Level metrics []*buffer lock sync.RWMutex } -// Find the correct level for the given selector, creating it if -// it does not exist. Example selector in the context of the -// ClusterCockpit could be: []string{ "emmy", "host123", "cpu0" }. -// This function would probably benefit a lot from `level.children` beeing a `sync.Map`? +// findLevelOrCreate navigates to or creates the level specified by selector. +// +// Recursively descends the tree, creating missing levels as needed. Uses double-checked +// locking: RLock first (fast path), then Lock if creation needed (slow path), then +// re-check after acquiring Lock to handle races. +// +// Example selector: []string{"emmy", "node001", "cpu0"} +// Navigates: root → emmy → node001 → cpu0, creating levels as needed. +// +// Parameters: +// - selector: Hierarchical path (consumed recursively, decreasing depth) +// - nMetrics: Number of metric slots to allocate in new levels +// +// Returns: +// - *Level: The target level (existing or newly created) +// +// Note: sync.Map may improve performance for high-concurrency writes, but current +// approach suffices for read-heavy workload. func (l *Level) findLevelOrCreate(selector []string, nMetrics int) *Level { if len(selector) == 0 { return l @@ -72,6 +127,22 @@ func (l *Level) findLevelOrCreate(selector []string, nMetrics int) *Level { return child.findLevelOrCreate(selector[1:], nMetrics) } +// collectPaths gathers all selector paths at the specified depth in the tree. +// +// Recursively traverses children, collecting paths when currentDepth+1 == targetDepth. +// Each path is a selector that can be used with findLevel() or findBuffers(). +// +// Explicitly copies slices to avoid shared underlying arrays between siblings, preventing +// unintended mutations. +// +// Parameters: +// - currentDepth: Depth of current level (0 = root) +// - targetDepth: Depth to collect paths from +// - currentPath: Path accumulated so far +// - results: Output slice (appended to) +// +// Example: collectPaths(0, 2, []string{}, &results) collects all 2-level paths +// like []string{"emmy", "node001"}, []string{"emmy", "node002"}, etc. func (l *Level) collectPaths(currentDepth, targetDepth int, currentPath []string, results *[][]string) { l.lock.RLock() defer l.lock.RUnlock() @@ -95,6 +166,18 @@ func (l *Level) collectPaths(currentDepth, targetDepth int, currentPath []string } } +// free removes buffers older than the retention threshold from the entire subtree. +// +// Recursively frees buffers in this level's metrics and all child levels. Buffers +// with standard capacity (BufferCap) are returned to the pool. Called by the +// retention worker to enforce retention policies. +// +// Parameters: +// - t: Retention threshold timestamp (Unix seconds) +// +// Returns: +// - int: Total number of buffers freed in this subtree +// - error: Non-nil on failure (propagated from children) func (l *Level) free(t int64) (int, error) { l.lock.Lock() defer l.lock.Unlock() @@ -124,6 +207,61 @@ func (l *Level) free(t int64) (int, error) { return n, nil } +// forceFree removes the oldest buffer from each metric chain in the subtree. +// +// Unlike free(), which removes based on time threshold, this unconditionally removes +// the oldest buffer in each chain. Used by MemoryUsageTracker when memory cap is +// exceeded and time-based retention is insufficient. +// +// Recursively processes current level's metrics and all child levels. +// +// Returns: +// - int: Total number of buffers freed in this subtree +// - error: Non-nil on failure (propagated from children) +func (l *Level) forceFree() (int, error) { + l.lock.Lock() + defer l.lock.Unlock() + + n := 0 + + // Iterate over metrics in the current level + for i, b := range l.metrics { + if b != nil { + // Attempt to free the oldest buffer in this chain + delme, freedCount := b.forceFreeOldest() + n += freedCount + + // If delme is true, it means 'b' itself (the head) was the oldest + // and needs to be removed from the slice. + if delme { + if cap(b.data) == BufferCap { + bufferPool.Put(b) + } + l.metrics[i] = nil + } + } + } + + // Recursively traverse children + for _, child := range l.children { + m, err := child.forceFree() + n += m + if err != nil { + return n, err + } + } + + return n, nil +} + +// sizeInBytes calculates the total memory usage of all buffers in the subtree. +// +// Recursively sums buffer data sizes (count of Float values × sizeof(Float)) across +// this level's metrics and all child levels. Used by MemoryUsageTracker to enforce +// memory cap limits. +// +// Returns: +// - int64: Total bytes used by buffer data in this subtree func (l *Level) sizeInBytes() int64 { l.lock.RLock() defer l.lock.RUnlock() @@ -142,6 +280,16 @@ func (l *Level) sizeInBytes() int64 { return size } +// findLevel navigates to the level specified by selector, returning nil if not found. +// +// Read-only variant of findLevelOrCreate. Does not create missing levels. +// Recursively descends the tree following the selector path. +// +// Parameters: +// - selector: Hierarchical path (e.g., []string{"emmy", "node001", "cpu0"}) +// +// Returns: +// - *Level: The target level, or nil if any component in the path does not exist func (l *Level) findLevel(selector []string) *Level { if len(selector) == 0 { return l @@ -158,6 +306,28 @@ func (l *Level) findLevel(selector []string) *Level { return lvl.findLevel(selector[1:]) } +// findBuffers invokes callback on all buffers matching the selector pattern. +// +// Supports flexible selector patterns (from cc-lib/util.Selector): +// - Exact match: Selector element with String set (e.g., "node001") +// - Group match: Selector element with Group set (e.g., ["cpu0", "cpu2", "cpu4"]) +// - Wildcard: Selector element with Any=true (matches all children) +// +// Empty selector (len==0) matches current level's buffer at 'offset' and recursively +// all descendant buffers at the same offset (used for aggregation queries). +// +// Parameters: +// - selector: Pattern to match (consumed recursively) +// - offset: Metric index in metrics slice (from MetricConfig.offset) +// - f: Callback invoked on each matching buffer +// +// Returns: +// - error: First error returned by callback, or nil if all succeeded +// +// Example: +// +// // Find all cpu0 buffers across all hosts: +// findBuffers([]Selector{{Any: true}, {String: "cpu0"}}, metricOffset, callback) func (l *Level) findBuffers(selector util.Selector, offset int, f func(b *buffer) error) error { l.lock.RLock() defer l.lock.RUnlock() diff --git a/internal/metricstore/metricstore.go b/internal/metricstore/metricstore.go index 0d7e5f45..b1fe065d 100644 --- a/internal/metricstore/metricstore.go +++ b/internal/metricstore/metricstore.go @@ -42,30 +42,75 @@ var ( msInstance *MemoryStore // shutdownFunc stores the context cancellation function created in Init // and is called during Shutdown to cancel all background goroutines - shutdownFunc context.CancelFunc + shutdownFunc context.CancelFunc + shutdownFuncMu sync.Mutex // Protects shutdownFunc from concurrent access ) // NodeProvider provides information about nodes currently in use by running jobs. +// // This interface allows metricstore to query job information without directly // depending on the repository package, breaking the import cycle. +// +// Implementations should return nodes that are actively processing jobs started +// before the given timestamp. These nodes will be excluded from retention-based +// garbage collection to prevent data loss for jobs that are still running or +// recently completed. type NodeProvider interface { // GetUsedNodes returns a map of cluster names to sorted lists of unique hostnames // that are currently in use by jobs that started before the given timestamp. + // + // Parameters: + // - ts: Unix timestamp threshold - returns nodes with jobs started before this time + // + // Returns: + // - Map of cluster names to lists of node hostnames that should be excluded from garbage collection + // - Error if the query fails GetUsedNodes(ts int64) (map[string][]string, error) } +// Metric represents a single metric data point to be written to the store. type Metric struct { - Name string - Value schema.Float + Name string + Value schema.Float + // MetricConfig contains frequency and aggregation settings for this metric. + // If Frequency is 0, configuration will be looked up from MemoryStore.Metrics during Write(). MetricConfig MetricConfig } +// MemoryStore is the main in-memory time-series metric storage implementation. +// +// It organizes metrics in a hierarchical tree structure where each level represents +// a component of the system hierarchy (e.g., cluster → host → CPU). Each level can +// store multiple metrics as time-series buffers. +// +// The store is initialized as a singleton via InitMetrics() and accessed via GetMemoryStore(). +// All public methods are safe for concurrent use. type MemoryStore struct { Metrics map[string]MetricConfig root Level - nodeProvider NodeProvider // Injected dependency for querying running jobs + nodeProvider NodeProvider } +// Init initializes the metric store from configuration and starts background workers. +// +// This function must be called exactly once before any other metricstore operations. +// It performs the following initialization steps: +// 1. Validates and decodes the metric store configuration +// 2. Configures worker pool size (defaults to NumCPU/2+1, max 10) +// 3. Loads metric configurations from all registered clusters +// 4. Restores checkpoints within the retention window +// 5. Starts background workers for retention, checkpointing, archiving, and monitoring +// 6. Optionally subscribes to NATS for real-time metric ingestion +// +// Parameters: +// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig) +// - wg: WaitGroup that will be incremented for each background goroutine started +// +// The function will call cclog.Fatal on critical errors during initialization. +// Use Shutdown() to cleanly stop all background workers started by Init(). +// +// Note: Signal handling must be implemented by the caller. Call Shutdown() when +// receiving termination signals to ensure checkpoint data is persisted. func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { startupTime := time.Now() @@ -143,20 +188,29 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { checkpointingGoroutines := 1 dataStagingGoroutines := 1 archivingGoroutines := 1 + memoryUsageTracker := 1 + + totalGoroutines := retentionGoroutines + + checkpointingGoroutines + + dataStagingGoroutines + + archivingGoroutines + + memoryUsageTracker - totalGoroutines := retentionGoroutines + checkpointingGoroutines + dataStagingGoroutines + archivingGoroutines wg.Add(totalGoroutines) Retention(wg, ctx) Checkpointing(wg, ctx) Archiving(wg, ctx) DataStaging(wg, ctx) + MemoryUsageTracker(wg, ctx) // Note: Signal handling has been removed from this function. // The caller is responsible for handling shutdown signals and calling // the shutdown() function when appropriate. // Store the shutdown function for later use by Shutdown() + shutdownFuncMu.Lock() shutdownFunc = shutdown + shutdownFuncMu.Unlock() if Keys.Subscriptions != nil { err = ReceiveNats(ms, 1, ctx) @@ -166,8 +220,17 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { } } -// InitMetrics creates a new, initialized instance of a MemoryStore. -// Will panic if values in the metric configurations are invalid. +// InitMetrics initializes the singleton MemoryStore instance with the given metric configurations. +// +// This function must be called before GetMemoryStore() and can only be called once due to +// the singleton pattern. It assigns each metric an internal offset for efficient buffer indexing. +// +// Parameters: +// - metrics: Map of metric names to their configurations (frequency and aggregation strategy) +// +// Panics if any metric has Frequency == 0, which indicates an invalid configuration. +// +// After this call, the global msInstance is ready for use via GetMemoryStore(). func InitMetrics(metrics map[string]MetricConfig) { singleton.Do(func() { offset := 0 @@ -194,6 +257,11 @@ func InitMetrics(metrics map[string]MetricConfig) { }) } +// GetMemoryStore returns the singleton MemoryStore instance. +// +// Returns the initialized MemoryStore singleton. Calls cclog.Fatal if InitMetrics() was not called first. +// +// This function is safe for concurrent use after initialization. func GetMemoryStore() *MemoryStore { if msInstance == nil { cclog.Fatalf("[METRICSTORE]> MemoryStore not initialized!") @@ -210,7 +278,22 @@ func (ms *MemoryStore) SetNodeProvider(provider NodeProvider) { ms.nodeProvider = provider } +// Shutdown performs a graceful shutdown of the metric store. +// +// This function cancels all background goroutines started by Init() and writes +// a final checkpoint to disk before returning. It should be called when the +// application receives a termination signal. +// +// The function will: +// 1. Cancel the context to stop all background workers +// 2. Close NATS message channels if using Avro format +// 3. Write a final checkpoint to preserve in-memory data +// 4. Log any errors encountered during shutdown +// +// Note: This function blocks until the final checkpoint is written. func Shutdown() { + shutdownFuncMu.Lock() + defer shutdownFuncMu.Unlock() if shutdownFunc != nil { shutdownFunc() } @@ -237,6 +320,17 @@ func Shutdown() { cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files) } +// Retention starts a background goroutine that periodically frees old metric data. +// +// This worker runs at half the retention interval and calls Free() to remove buffers +// older than the configured retention time. It respects the NodeProvider to preserve +// data for nodes with active jobs. +// +// Parameters: +// - wg: WaitGroup to signal completion when context is cancelled +// - ctx: Context for cancellation signal +// +// The goroutine exits when ctx is cancelled. func Retention(wg *sync.WaitGroup, ctx context.Context) { ms := GetMemoryStore() @@ -276,6 +370,96 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { }() } +// MemoryUsageTracker starts a background goroutine that monitors memory usage. +// +// This worker checks memory usage every minute and force-frees buffers if memory +// exceeds the configured cap. It protects against infinite loops by limiting +// iterations and forcing garbage collection between attempts. +// +// Parameters: +// - wg: WaitGroup to signal completion when context is cancelled +// - ctx: Context for cancellation signal +// +// The goroutine exits when ctx is cancelled. +func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { + ms := GetMemoryStore() + + go func() { + defer wg.Done() + d := 1 * time.Minute + + if d <= 0 { + return + } + + ticker := time.NewTicker(d) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + memoryUsageGB := ms.SizeInGB() + cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB\n", memoryUsageGB) + + if memoryUsageGB > float64(Keys.MemoryCap) { + cclog.Warnf("[METRICSTORE]> current memory usage is greater than the Memory Cap: %d GB\n", Keys.MemoryCap) + cclog.Warnf("[METRICSTORE]> starting to force-free the buffers from the Metric Store\n") + + freedTotal := 0 + const maxIterations = 100 + + for range maxIterations { + memoryUsageGB = ms.SizeInGB() + if memoryUsageGB < float64(Keys.MemoryCap) { + break + } + + freed, err := ms.ForceFree() + if err != nil { + cclog.Errorf("[METRICSTORE]> error while force-freeing the buffers: %s", err) + } + if freed == 0 { + cclog.Errorf("[METRICSTORE]> 0 buffers force-freed in last try, %d total buffers force-freed, memory usage of %.2f GB remains higher than the memory cap of %d GB and there are no buffers left to force-free\n", freedTotal, memoryUsageGB, Keys.MemoryCap) + break + } + freedTotal += freed + + runtime.GC() + } + + if memoryUsageGB >= float64(Keys.MemoryCap) { + cclog.Errorf("[METRICSTORE]> reached maximum iterations (%d) or no more buffers to free, current memory usage: %.2f GB\n", maxIterations, memoryUsageGB) + } else { + cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freedTotal) + cclog.Infof("[METRICSTORE]> current memory usage after force-freeing the buffers: %.2f GB\n", memoryUsageGB) + } + } + + } + } + }() +} + +// Free removes metric data older than the given time while preserving data for active nodes. +// +// This function implements intelligent retention by consulting the NodeProvider (if configured) +// to determine which nodes are currently in use by running jobs. Data for these nodes is +// preserved even if older than the retention time. +// +// Parameters: +// - ms: The MemoryStore instance +// - t: Time threshold - buffers with data older than this will be freed +// +// Returns: +// - Number of buffers freed +// - Error if NodeProvider query fails +// +// Behavior: +// - If no NodeProvider is set: frees all buffers older than t +// - If NodeProvider returns empty map: frees all buffers older than t +// - Otherwise: preserves buffers for nodes returned by GetUsedNodes(), frees others func Free(ms *MemoryStore, t time.Time) (int, error) { // If no NodeProvider is configured, free all buffers older than t if ms.nodeProvider == nil { @@ -302,8 +486,17 @@ func Free(ms *MemoryStore, t time.Time) (int, error) { } } -// A function to free specific selectors. Used when we want to retain some specific nodes -// beyond the retention time. +// FreeSelected frees buffers for specific selectors while preserving others. +// +// This function is used when we want to retain some specific nodes beyond the retention time. +// It iterates through the provided selectors and frees their associated buffers. +// +// Parameters: +// - ms: The MemoryStore instance +// - selectors: List of selector paths to free (e.g., [["cluster1", "node1"], ["cluster2", "node2"]]) +// - t: Time threshold for freeing buffers +// +// Returns the total number of buffers freed and any error encountered. func FreeSelected(ms *MemoryStore, selectors [][]string, t time.Time) (int, error) { freed := 0 @@ -320,8 +513,22 @@ func FreeSelected(ms *MemoryStore, selectors [][]string, t time.Time) (int, erro return freed, nil } -// This function will populate all the second last levels - meaning nodes -// From that we can exclude the specific selectosr/node we want to retain. +// GetSelectors returns all selectors at depth 2 (cluster/node level) that are NOT in the exclusion map. +// +// This function generates a list of selectors whose buffers should be freed by excluding +// selectors that correspond to nodes currently in use by running jobs. +// +// Parameters: +// - ms: The MemoryStore instance +// - excludeSelectors: Map of cluster names to node hostnames that should NOT be freed +// +// Returns a list of selectors ([]string paths) that can be safely freed. +// +// Example: +// +// If the tree has paths ["emmy", "node001"] and ["emmy", "node002"], +// and excludeSelectors contains {"emmy": ["node001"]}, +// then only [["emmy", "node002"]] is returned. func GetSelectors(ms *MemoryStore, excludeSelectors map[string][]string) [][]string { allSelectors := ms.GetPaths(2) @@ -372,6 +579,7 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error if metric.MetricConfig.Frequency == 0 { metric.MetricConfig, ok = m.Metrics[metric.Name] if !ok { + cclog.Debugf("[METRICSTORE]> Unknown metric '%s' in Write() - skipping", metric.Name) metric.MetricConfig.Frequency = 0 } metrics[i] = metric @@ -492,6 +700,12 @@ func (m *MemoryStore) Free(selector []string, t int64) (int, error) { return m.GetLevel(selector).free(t) } +// Free releases all buffers for the selected level and all its children that +// contain only values older than `t`. +func (m *MemoryStore) ForceFree() (int, error) { + return m.GetLevel(nil).forceFree() +} + func (m *MemoryStore) FreeAll() error { for k := range m.root.children { delete(m.root.children, k) @@ -504,6 +718,10 @@ func (m *MemoryStore) SizeInBytes() int64 { return m.root.sizeInBytes() } +func (m *MemoryStore) SizeInGB() float64 { + return float64(m.root.sizeInBytes()) / 1e9 +} + // ListChildren , given a selector, returns a list of all children of the level // selected. func (m *MemoryStore) ListChildren(selector []string) []string { diff --git a/internal/metricstore/query.go b/internal/metricstore/query.go index 78c78dd5..a1656192 100644 --- a/internal/metricstore/query.go +++ b/internal/metricstore/query.go @@ -3,6 +3,27 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +// This file implements high-level query functions for loading job metric data +// with automatic scope transformation and aggregation. +// +// Key Concepts: +// +// Metric Scopes: Metrics are collected at different granularities (native scope): +// - HWThread: Per hardware thread +// - Core: Per CPU core +// - Socket: Per CPU socket +// - MemoryDomain: Per memory domain (NUMA) +// - Accelerator: Per GPU/accelerator +// - Node: Per compute node +// +// Scope Transformation: The buildQueries functions transform between native scope +// and requested scope by: +// - Aggregating finer-grained data (e.g., HWThread → Core → Socket → Node) +// - Rejecting requests for finer granularity than available +// - Handling special cases (e.g., Accelerator metrics) +// +// Query Building: Constructs APIQuery structures with proper selectors (Type, TypeIds) +// based on cluster topology and job resources. package metricstore import ( @@ -17,9 +38,33 @@ import ( "github.com/ClusterCockpit/cc-lib/v2/schema" ) -// TestLoadDataCallback allows tests to override LoadData behavior +// TestLoadDataCallback allows tests to override LoadData behavior for testing purposes. +// When set to a non-nil function, LoadData will call this function instead of the default implementation. var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) +// LoadData loads metric data for a specific job with automatic scope transformation. +// +// This is the primary function for retrieving job metric data. It handles: +// - Building queries with scope transformation via buildQueries +// - Fetching data from the metric store +// - Organizing results by metric and scope +// - Converting NaN statistics to 0 for JSON compatibility +// - Partial error handling (returns data for successful queries even if some fail) +// +// Parameters: +// - job: Job metadata including cluster, resources, and time range +// - metrics: List of metric names to load +// - scopes: Requested metric scopes (will be transformed to match native scopes) +// - ctx: Context for cancellation (currently unused but reserved for future use) +// - resolution: Data resolution in seconds (0 for native resolution) +// +// Returns: +// - JobData: Map of metric → scope → JobMetric with time-series data and statistics +// - Error: Returns error if query building or fetching fails, or partial error listing failed hosts +// +// Example: +// +// jobData, err := LoadData(job, []string{"cpu_load", "mem_used"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 60) func LoadData( job *schema.Job, metrics []string, @@ -91,12 +136,7 @@ func LoadData( *id = query.TypeIds[ndx] } - if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { - // "schema.Float()" because regular float64 can not be JSONed when NaN. - res.Avg = schema.Float(0) - res.Min = schema.Float(0) - res.Max = schema.Float(0) - } + sanitizeStats(&res) jobMetric.Series = append(jobMetric.Series, schema.Series{ Hostname: query.Hostname, @@ -126,6 +166,10 @@ func LoadData( return jobData, nil } +// Pre-converted scope strings avoid repeated string(MetricScope) allocations during +// query construction. These are used in APIQuery.Type field throughout buildQueries +// and buildNodeQueries functions. Converting once at package initialization improves +// performance for high-volume query building. var ( hwthreadString = string(schema.MetricScopeHWThread) coreString = string(schema.MetricScopeCore) @@ -134,12 +178,41 @@ var ( acceleratorString = string(schema.MetricScopeAccelerator) ) +// buildQueries constructs APIQuery structures with automatic scope transformation for a job. +// +// This function implements the core scope transformation logic, handling all combinations of +// native metric scopes and requested scopes. It uses the cluster topology to determine which +// hardware IDs to include in each query. +// +// Scope Transformation Rules: +// - If native scope >= requested scope: Aggregates data (Aggregate=true in APIQuery) +// - If native scope < requested scope: Returns error (cannot increase granularity) +// - Special handling for Accelerator scope (independent of CPU hierarchy) +// +// The function generates one or more APIQuery per (metric, scope, host) combination: +// - For non-aggregated queries: One query with all relevant IDs +// - For aggregated queries: May generate multiple queries (e.g., one per socket/core) +// +// Parameters: +// - job: Job metadata including cluster, subcluster, and resource allocation +// - metrics: List of metrics to query +// - scopes: Requested scopes for each metric +// - resolution: Data resolution in seconds +// +// Returns: +// - []APIQuery: List of queries to execute +// - []schema.MetricScope: Assigned scope for each query (after transformation) +// - error: Returns error if topology lookup fails or unhandled scope combination encountered func buildQueries( job *schema.Job, metrics []string, scopes []schema.MetricScope, resolution int64, ) ([]APIQuery, []schema.MetricScope, error) { + if len(job.Resources) == 0 { + return nil, nil, fmt.Errorf("METRICDATA/CCMS > no resources allocated for job %d", job.JobID) + } + queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) assignedScope := []schema.MetricScope{} @@ -152,7 +225,6 @@ func buildQueries( for _, metric := range metrics { mc := archive.GetMetricConfig(job.Cluster, metric) if mc == nil { - // return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, job.Cluster) cclog.Infof("metric '%s' is not specified for cluster '%s'", metric, job.Cluster) continue } @@ -171,10 +243,9 @@ func buildQueries( } } - // Avoid duplicates... - handledScopes := make([]schema.MetricScope, 0, 3) + // Avoid duplicates using map for O(1) lookup + handledScopes := make(map[schema.MetricScope]bool, 3) - scopesLoop: for _, requestedScope := range scopes { nativeScope := mc.Scope if nativeScope == schema.MetricScopeAccelerator && job.NumAcc == 0 { @@ -182,12 +253,10 @@ func buildQueries( } scope := nativeScope.Max(requestedScope) - for _, s := range handledScopes { - if scope == s { - continue scopesLoop - } + if handledScopes[scope] { + continue } - handledScopes = append(handledScopes, scope) + handledScopes[scope] = true for _, host := range job.Resources { hwthreads := host.HWThreads @@ -232,7 +301,7 @@ func buildQueries( continue } - // HWThread -> HWThead + // HWThread -> HWThread if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { queries = append(queries, APIQuery{ Metric: metric, @@ -356,7 +425,7 @@ func buildQueries( continue } - // MemoryDoman -> Node + // MemoryDomain -> Node if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) queries = append(queries, APIQuery{ @@ -420,12 +489,26 @@ func buildQueries( return queries, assignedScope, nil } +// LoadStats loads only metric statistics (avg/min/max) for a job at node scope. +// +// This is an optimized version of LoadData that fetches only statistics without +// time-series data, reducing bandwidth and memory usage. Always queries at node scope. +// +// Parameters: +// - job: Job metadata +// - metrics: List of metric names +// - ctx: Context (currently unused) +// +// Returns: +// - Map of metric → hostname → statistics +// - Error on query building or fetching failure func LoadStats( job *schema.Job, metrics []string, ctx context.Context, ) (map[string]map[string]schema.MetricStatistics, error) { - queries, _, err := buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization? + // TODO(#166): Add scope parameter for analysis view accelerator normalization + queries, _, err := buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) if err != nil { cclog.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error()) return nil, err @@ -477,6 +560,20 @@ func LoadStats( return stats, nil } +// LoadScopedStats loads metric statistics for a job with scope-aware grouping. +// +// Similar to LoadStats but supports multiple scopes and returns statistics grouped +// by scope with hardware IDs (e.g., per-core, per-socket statistics). +// +// Parameters: +// - job: Job metadata +// - metrics: List of metric names +// - scopes: Requested metric scopes +// - ctx: Context (currently unused) +// +// Returns: +// - ScopedJobStats: Map of metric → scope → []ScopedStats (with hostname and ID) +// - Error or partial error listing failed queries func LoadScopedStats( job *schema.Job, metrics []string, @@ -533,12 +630,7 @@ func LoadScopedStats( *id = query.TypeIds[ndx] } - if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { - // "schema.Float()" because regular float64 can not be JSONed when NaN. - res.Avg = schema.Float(0) - res.Min = schema.Float(0) - res.Max = schema.Float(0) - } + sanitizeStats(&res) scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ Hostname: query.Hostname, @@ -567,6 +659,22 @@ func LoadScopedStats( return scopedJobStats, nil } +// LoadNodeData loads metric data for specific nodes in a cluster over a time range. +// +// Unlike LoadData which operates on job resources, this function queries arbitrary nodes +// directly. Useful for system monitoring and node status views. +// +// Parameters: +// - cluster: Cluster name +// - metrics: List of metric names +// - nodes: List of node hostnames (nil = all nodes in cluster via ForAllNodes) +// - scopes: Requested metric scopes (currently unused - always node scope) +// - from, to: Time range +// - ctx: Context (currently unused) +// +// Returns: +// - Map of hostname → metric → []JobMetric +// - Error or partial error listing failed queries func LoadNodeData( cluster string, metrics, nodes []string, @@ -615,14 +723,10 @@ func LoadNodeData( metric := query.Metric qdata := res[0] if qdata.Error != nil { - /* Build list for "partial errors", if any */ errors = append(errors, fmt.Sprintf("fetching %s for node %s failed: %s", metric, query.Hostname, *qdata.Error)) } - if qdata.Avg.IsNaN() || qdata.Min.IsNaN() || qdata.Max.IsNaN() { - // return nil, fmt.Errorf("METRICDATA/CCMS > fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN") - qdata.Avg, qdata.Min, qdata.Max = 0., 0., 0. - } + sanitizeStats(&qdata) hostdata, ok := data[query.Hostname] if !ok { @@ -656,6 +760,24 @@ func LoadNodeData( return data, nil } +// LoadNodeListData loads metric data for a list of nodes with full scope transformation support. +// +// This is the most flexible node data loading function, supporting arbitrary scopes and +// resolution. Uses buildNodeQueries for proper scope transformation based on topology. +// +// Parameters: +// - cluster: Cluster name +// - subCluster: SubCluster name (empty string to infer from node names) +// - nodes: List of node hostnames +// - metrics: List of metric names +// - scopes: Requested metric scopes +// - resolution: Data resolution in seconds +// - from, to: Time range +// - ctx: Context (currently unused) +// +// Returns: +// - Map of hostname → JobData (metric → scope → JobMetric) +// - Error or partial error listing failed queries func LoadNodeListData( cluster, subCluster string, nodes []string, @@ -696,7 +818,7 @@ func LoadNodeListData( } else { query = req.Queries[i] } - // qdata := res[0] + metric := query.Metric scope := assignedScope[i] mc := archive.GetMetricConfig(cluster, metric) @@ -742,12 +864,7 @@ func LoadNodeListData( *id = query.TypeIds[ndx] } - if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { - // "schema.Float()" because regular float64 can not be JSONed when NaN. - res.Avg = schema.Float(0) - res.Min = schema.Float(0) - res.Max = schema.Float(0) - } + sanitizeStats(&res) scopeData.Series = append(scopeData.Series, schema.Series{ Hostname: query.Hostname, @@ -770,6 +887,23 @@ func LoadNodeListData( return data, nil } +// buildNodeQueries constructs APIQuery structures for node-based queries with scope transformation. +// +// Similar to buildQueries but operates on node lists rather than job resources. +// Supports dynamic subcluster lookup when subCluster parameter is empty. +// +// Parameters: +// - cluster: Cluster name +// - subCluster: SubCluster name (empty = infer from node hostnames) +// - nodes: List of node hostnames +// - metrics: List of metric names +// - scopes: Requested metric scopes +// - resolution: Data resolution in seconds +// +// Returns: +// - []APIQuery: List of queries to execute +// - []schema.MetricScope: Assigned scope for each query +// - error: Returns error if topology lookup fails or unhandled scope combination func buildNodeQueries( cluster string, subCluster string, @@ -778,6 +912,10 @@ func buildNodeQueries( scopes []schema.MetricScope, resolution int64, ) ([]APIQuery, []schema.MetricScope, error) { + if len(nodes) == 0 { + return nil, nil, fmt.Errorf("METRICDATA/CCMS > no nodes specified for query") + } + queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes)) assignedScope := []schema.MetricScope{} @@ -795,7 +933,6 @@ func buildNodeQueries( for _, metric := range metrics { mc := archive.GetMetricConfig(cluster, metric) if mc == nil { - // return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, cluster) cclog.Warnf("metric '%s' is not specified for cluster '%s'", metric, cluster) continue } @@ -814,20 +951,17 @@ func buildNodeQueries( } } - // Avoid duplicates... - handledScopes := make([]schema.MetricScope, 0, 3) + // Avoid duplicates using map for O(1) lookup + handledScopes := make(map[schema.MetricScope]bool, 3) - scopesLoop: for _, requestedScope := range scopes { nativeScope := mc.Scope scope := nativeScope.Max(requestedScope) - for _, s := range handledScopes { - if scope == s { - continue scopesLoop - } + if handledScopes[scope] { + continue } - handledScopes = append(handledScopes, scope) + handledScopes[scope] = true for _, hostname := range nodes { @@ -850,7 +984,7 @@ func buildNodeQueries( // Moved check here if metric matches hardware specs if nativeScope == schema.MetricScopeAccelerator && len(acceleratorIds) == 0 { - continue scopesLoop + continue } // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) @@ -890,7 +1024,7 @@ func buildNodeQueries( continue } - // HWThread -> HWThead + // HWThread -> HWThread if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { queries = append(queries, APIQuery{ Metric: metric, @@ -1014,7 +1148,7 @@ func buildNodeQueries( continue } - // MemoryDoman -> Node + // MemoryDomain -> Node if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) queries = append(queries, APIQuery{ @@ -1078,10 +1212,37 @@ func buildNodeQueries( return queries, assignedScope, nil } +// sanitizeStats converts NaN statistics to zero for JSON compatibility. +// +// schema.Float with NaN values cannot be properly JSON-encoded, so we convert +// NaN to 0. This loses the distinction between "no data" and "zero value", +// but maintains API compatibility. +func sanitizeStats(data *APIMetricData) { + if data.Avg.IsNaN() { + data.Avg = schema.Float(0) + } + if data.Min.IsNaN() { + data.Min = schema.Float(0) + } + if data.Max.IsNaN() { + data.Max = schema.Float(0) + } +} + +// intToStringSlice converts a slice of integers to a slice of strings. +// Used to convert hardware thread/core/socket IDs from topology (int) to APIQuery TypeIds (string). +// +// Optimized to reuse a byte buffer for string conversion, reducing allocations. func intToStringSlice(is []int) []string { + if len(is) == 0 { + return nil + } + ss := make([]string, len(is)) + buf := make([]byte, 0, 16) // Reusable buffer for integer conversion for i, x := range is { - ss[i] = strconv.Itoa(x) + buf = strconv.AppendInt(buf[:0], int64(x), 10) + ss[i] = string(buf) } return ss }