Merge pull request #477 from ClusterCockpit/dev

Dev
This commit is contained in:
Jan Eitzinger
2026-01-28 11:24:10 +01:00
committed by GitHub
14 changed files with 282 additions and 260 deletions

View File

@@ -321,7 +321,8 @@ func runServer(ctx context.Context) error {
haveMetricstore := false haveMetricstore := false
mscfg := ccconf.GetPackageConfig("metric-store") mscfg := ccconf.GetPackageConfig("metric-store")
if mscfg != nil { if mscfg != nil {
metricstore.Init(mscfg, &wg) metrics := metricstore.BuildMetricList()
metricstore.Init(mscfg, metrics, &wg)
// Inject repository as NodeProvider to break import cycle // Inject repository as NodeProvider to break import cycle
ms := metricstore.GetMemoryStore() ms := metricstore.GetMemoryStore()
@@ -398,7 +399,7 @@ func runServer(ctx context.Context) error {
// Set GC percent if not configured // Set GC percent if not configured
if os.Getenv(envGOGC) == "" { if os.Getenv(envGOGC) == "" {
debug.SetGCPercent(25) debug.SetGCPercent(15)
} }
runtime.SystemdNotify(true, "running") runtime.SystemdNotify(true, "running")

View File

@@ -455,4 +455,38 @@ func TestRestApi(t *testing.T) {
if !ok { if !ok {
t.Fatal("subtest failed") t.Fatal("subtest failed")
} }
t.Run("GetUsedNodesNoRunning", func(t *testing.T) {
contextUserValue := &schema.User{
Username: "testuser",
Projects: make([]string, 0),
Roles: []string{"api"},
AuthType: 0,
AuthSource: 2,
}
req := httptest.NewRequest(http.MethodGet, "/jobs/used_nodes?ts=123456790", nil)
recorder := httptest.NewRecorder()
ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue)
r.ServeHTTP(recorder, req.WithContext(ctx))
response := recorder.Result()
if response.StatusCode != http.StatusOK {
t.Fatal(response.Status, recorder.Body.String())
}
var result api.GetUsedNodesAPIResponse
if err := json.NewDecoder(response.Body).Decode(&result); err != nil {
t.Fatal(err)
}
if result.UsedNodes == nil {
t.Fatal("expected usedNodes to be non-nil")
}
if len(result.UsedNodes) != 0 {
t.Fatalf("expected no used nodes for stopped jobs, got: %v", result.UsedNodes)
}
})
} }

View File

@@ -1021,3 +1021,57 @@ func (api *RestAPI) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
cclog.Errorf("Failed to encode response: %v", err) cclog.Errorf("Failed to encode response: %v", err)
} }
} }
// GetUsedNodesAPIResponse model
type GetUsedNodesAPIResponse struct {
UsedNodes map[string][]string `json:"usedNodes"` // Map of cluster names to lists of used node hostnames
}
// getUsedNodes godoc
// @summary Lists used nodes by cluster
// @tags Job query
// @description Get a map of cluster names to lists of unique hostnames that are currently in use by running jobs that started before the specified timestamp.
// @produce json
// @param ts query int true "Unix timestamp to filter jobs (jobs with start_time < ts)"
// @success 200 {object} api.GetUsedNodesAPIResponse "Map of cluster names to hostname lists"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /api/jobs/used_nodes [get]
func (api *RestAPI) getUsedNodes(rw http.ResponseWriter, r *http.Request) {
if user := repository.GetUserFromContext(r.Context()); user != nil &&
!user.HasRole(schema.RoleApi) {
handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw)
return
}
tsStr := r.URL.Query().Get("ts")
if tsStr == "" {
handleError(fmt.Errorf("missing required query parameter: ts"), http.StatusBadRequest, rw)
return
}
ts, err := strconv.ParseInt(tsStr, 10, 64)
if err != nil {
handleError(fmt.Errorf("invalid timestamp format: %w", err), http.StatusBadRequest, rw)
return
}
usedNodes, err := api.JobRepository.GetUsedNodes(ts)
if err != nil {
handleError(fmt.Errorf("failed to get used nodes: %w", err), http.StatusInternalServerError, rw)
return
}
rw.Header().Add("Content-Type", "application/json")
payload := GetUsedNodesAPIResponse{
UsedNodes: usedNodes,
}
if err := json.NewEncoder(rw).Encode(payload); err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
}

View File

@@ -89,8 +89,7 @@ func (api *RestAPI) MountAPIRoutes(r *mux.Router) {
r.HandleFunc("/jobs/stop_job/", api.stopJobByRequest).Methods(http.MethodPost, http.MethodPut) r.HandleFunc("/jobs/stop_job/", api.stopJobByRequest).Methods(http.MethodPost, http.MethodPut)
} }
r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet) r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet)
r.HandleFunc("/jobs/{id}", api.getJobByID).Methods(http.MethodPost) r.HandleFunc("/jobs/used_nodes", api.getUsedNodes).Methods(http.MethodGet)
r.HandleFunc("/jobs/{id}", api.getCompleteJobByID).Methods(http.MethodGet)
r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch) r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch)
r.HandleFunc("/jobs/tag_job/{id}", api.removeTagJob).Methods(http.MethodDelete) r.HandleFunc("/jobs/tag_job/{id}", api.removeTagJob).Methods(http.MethodDelete)
r.HandleFunc("/jobs/edit_meta/{id}", api.editMeta).Methods(http.MethodPost, http.MethodPatch) r.HandleFunc("/jobs/edit_meta/{id}", api.editMeta).Methods(http.MethodPost, http.MethodPatch)
@@ -98,6 +97,8 @@ func (api *RestAPI) MountAPIRoutes(r *mux.Router) {
r.HandleFunc("/jobs/delete_job/", api.deleteJobByRequest).Methods(http.MethodDelete) r.HandleFunc("/jobs/delete_job/", api.deleteJobByRequest).Methods(http.MethodDelete)
r.HandleFunc("/jobs/delete_job/{id}", api.deleteJobByID).Methods(http.MethodDelete) r.HandleFunc("/jobs/delete_job/{id}", api.deleteJobByID).Methods(http.MethodDelete)
r.HandleFunc("/jobs/delete_job_before/{ts}", api.deleteJobBefore).Methods(http.MethodDelete) r.HandleFunc("/jobs/delete_job_before/{ts}", api.deleteJobBefore).Methods(http.MethodDelete)
r.HandleFunc("/jobs/{id}", api.getJobByID).Methods(http.MethodPost)
r.HandleFunc("/jobs/{id}", api.getCompleteJobByID).Methods(http.MethodGet)
r.HandleFunc("/tags/", api.removeTags).Methods(http.MethodDelete) r.HandleFunc("/tags/", api.removeTags).Methods(http.MethodDelete)

View File

@@ -905,26 +905,32 @@ func (r *queryResolver) ClusterMetrics(ctx context.Context, cluster string, metr
for _, metrics := range data { for _, metrics := range data {
clusterMetrics.NodeCount += 1 clusterMetrics.NodeCount += 1
for metric, scopedMetrics := range metrics { for metric, scopedMetrics := range metrics {
_, ok := collectorData[metric] for _, scopedMetric := range scopedMetrics {
if !ok { // Collect Info Once
collectorData[metric] = make([]schema.Float, 0) _, okTimestep := collectorTimestep[metric]
for _, scopedMetric := range scopedMetrics { if !okTimestep {
// Collect Info
collectorTimestep[metric] = scopedMetric.Timestep collectorTimestep[metric] = scopedMetric.Timestep
collectorUnit[metric] = scopedMetric.Unit
// Collect Initial Data
for _, ser := range scopedMetric.Series {
collectorData[metric] = append(collectorData[metric], ser.Data...)
}
} }
} else { _, okUnit := collectorUnit[metric]
// Sum up values by index if !okUnit {
for _, scopedMetric := range scopedMetrics { collectorUnit[metric] = scopedMetric.Unit
// For This Purpose (Cluster_Wide-Sum of Node Metrics) OK }
for _, ser := range scopedMetric.Series { // Collect Data
for _, ser := range scopedMetric.Series {
_, okData := collectorData[metric]
// Init With Datasize > 0
if !okData && len(ser.Data) != 0 {
collectorData[metric] = make([]schema.Float, len(ser.Data))
} else if !okData {
cclog.Debugf("ClusterMetrics Skip Init: No Data -> %s at %s; Size %d", metric, ser.Hostname, len(ser.Data))
}
// Sum if init'd and matching size
if okData && len(ser.Data) == len(collectorData[metric]) {
for i, val := range ser.Data { for i, val := range ser.Data {
collectorData[metric][i] += val collectorData[metric][i] += val
} }
} else if okData {
cclog.Debugf("ClusterMetrics Skip Sum: Data Diff -> %s at %s; Want Size %d, Have Size %d", metric, ser.Hostname, len(collectorData[metric]), len(ser.Data))
} }
} }
} }

View File

@@ -49,6 +49,7 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
// runWorker takes simple values to configure what it does // runWorker takes simple values to configure what it does
func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) { func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) {
wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()

View File

@@ -203,6 +203,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to open existing avro file: %v", err) return fmt.Errorf("failed to open existing avro file: %v", err)
} }
defer f.Close()
br := bufio.NewReader(f) br := bufio.NewReader(f)
@@ -212,8 +213,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
} }
codec = reader.Codec() codec = reader.Codec()
schema = codec.Schema() schema = codec.Schema()
f.Close()
} }
timeRef := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix() timeRef := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix()
@@ -249,31 +248,35 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
return fmt.Errorf("failed to compare read and generated schema: %v", err) return fmt.Errorf("failed to compare read and generated schema: %v", err)
} }
if flag && readFlag && !errors.Is(err_, os.ErrNotExist) { if flag && readFlag && !errors.Is(err_, os.ErrNotExist) {
// Use closure to ensure file is closed even on error
f.Close() err := func() error {
f2, err := os.Open(filePath)
f, err = os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open Avro file: %v", err)
}
br := bufio.NewReader(f)
ocfReader, err := goavro.NewOCFReader(br)
if err != nil {
return fmt.Errorf("failed to create OCF reader while changing schema: %v", err)
}
for ocfReader.Scan() {
record, err := ocfReader.Read()
if err != nil { if err != nil {
return fmt.Errorf("failed to read record: %v", err) return fmt.Errorf("failed to open Avro file: %v", err)
}
defer f2.Close()
br := bufio.NewReader(f2)
ocfReader, err := goavro.NewOCFReader(br)
if err != nil {
return fmt.Errorf("failed to create OCF reader while changing schema: %v", err)
} }
recordList = append(recordList, record.(map[string]any)) for ocfReader.Scan() {
} record, err := ocfReader.Read()
if err != nil {
return fmt.Errorf("failed to read record: %v", err)
}
f.Close() recordList = append(recordList, record.(map[string]any))
}
return nil
}()
if err != nil {
return err
}
err = os.Remove(filePath) err = os.Remove(filePath)
if err != nil { if err != nil {
@@ -300,6 +303,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to append new avro file: %v", err) return fmt.Errorf("failed to append new avro file: %v", err)
} }
defer f.Close()
// fmt.Printf("Codec : %#v\n", codec) // fmt.Printf("Codec : %#v\n", codec)
@@ -317,8 +321,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
return fmt.Errorf("failed to append record: %v", err) return fmt.Errorf("failed to append record: %v", err)
} }
f.Close()
return nil return nil
} }

View File

@@ -15,15 +15,15 @@ import (
) )
func DataStaging(wg *sync.WaitGroup, ctx context.Context) { func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
// AvroPool is a pool of Avro writers. wg.Add(1)
go func() { go func() {
if Keys.Checkpoints.FileFormat == "json" {
wg.Done() // Mark this goroutine as done
return // Exit the goroutine
}
defer wg.Done() defer wg.Done()
if Keys.Checkpoints.FileFormat == "json" {
return
}
ms := GetMemoryStore()
var avroLevel *AvroLevel var avroLevel *AvroLevel
oldSelector := make([]string, 0) oldSelector := make([]string, 0)
@@ -39,7 +39,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
return return
} }
// Process remaining message // Process remaining message
freq, err := GetMetricFrequency(val.MetricName) freq, err := ms.GetMetricFrequency(val.MetricName)
if err != nil { if err != nil {
continue continue
} }
@@ -76,7 +76,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
} }
// Fetch the frequency of the metric from the global configuration // Fetch the frequency of the metric from the global configuration
freq, err := GetMetricFrequency(val.MetricName) freq, err := ms.GetMetricFrequency(val.MetricName)
if err != nil { if err != nil {
cclog.Errorf("Error fetching metric frequency: %s\n", err) cclog.Errorf("Error fetching metric frequency: %s\n", err)
continue continue

View File

@@ -43,7 +43,6 @@ import (
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"runtime"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@@ -100,6 +99,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
if Keys.Checkpoints.FileFormat == "json" { if Keys.Checkpoints.FileFormat == "json" {
ms := GetMemoryStore() ms := GetMemoryStore()
wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
d, err := time.ParseDuration(Keys.Checkpoints.Interval) d, err := time.ParseDuration(Keys.Checkpoints.Interval)
@@ -139,6 +139,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
} }
}() }()
} else { } else {
wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
@@ -394,14 +395,14 @@ func enqueueCheckpointHosts(dir string, work chan<- [2]string) error {
} }
gcCounter++ gcCounter++
if gcCounter%GCTriggerInterval == 0 { // if gcCounter%GCTriggerInterval == 0 {
// Forcing garbage collection runs here regulary during the loading of checkpoints // Forcing garbage collection runs here regulary during the loading of checkpoints
// will decrease the total heap size after loading everything back to memory is done. // will decrease the total heap size after loading everything back to memory is done.
// While loading data, the heap will grow fast, so the GC target size will double // While loading data, the heap will grow fast, so the GC target size will double
// almost always. By forcing GCs here, we can keep it growing more slowly so that // almost always. By forcing GCs here, we can keep it growing more slowly so that
// at the end, less memory is wasted. // at the end, less memory is wasted.
runtime.GC() // runtime.GC()
} // }
work <- [2]string{clusterDir.Name(), hostDir.Name()} work <- [2]string{clusterDir.Name(), hostDir.Name()}
} }

View File

@@ -45,6 +45,10 @@ package metricstore
import ( import (
"fmt" "fmt"
"time" "time"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
) )
const ( const (
@@ -207,55 +211,51 @@ type MetricConfig struct {
offset int offset int
} }
// Metrics is the global map of metric configurations. func BuildMetricList() map[string]MetricConfig {
// var metrics map[string]MetricConfig = make(map[string]MetricConfig)
// Keyed by metric name (e.g., "cpu_load", "mem_used"). Populated during Init()
// from cluster configuration and checkpoint restoration. Each MetricConfig.offset
// corresponds to the buffer slice index in Level.metrics.
var Metrics map[string]MetricConfig
// GetMetricFrequency retrieves the measurement interval for a metric. addMetric := func(name string, metric MetricConfig) error {
// if metrics == nil {
// Parameters: metrics = make(map[string]MetricConfig, 0)
// - metricName: Metric name (e.g., "cpu_load") }
//
// Returns:
// - int64: Frequency in seconds
// - error: Non-nil if metric not found in Metrics map
func GetMetricFrequency(metricName string) (int64, error) {
if metric, ok := Metrics[metricName]; ok {
return metric.Frequency, nil
}
return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName)
}
// AddMetric registers a new metric or updates an existing one. if existingMetric, ok := metrics[name]; ok {
// if existingMetric.Frequency != metric.Frequency {
// If the metric already exists with a different frequency, uses the higher frequency if existingMetric.Frequency < metric.Frequency {
// (finer granularity). This handles cases where different clusters report the same existingMetric.Frequency = metric.Frequency
// metric at different intervals. metrics[name] = existingMetric
// }
// Parameters: }
// - name: Metric name (e.g., "cpu_load") } else {
// - metric: Configuration (frequency, aggregation strategy) metrics[name] = metric
// }
// Returns:
// - error: Always nil (signature for future error handling) return nil
func AddMetric(name string, metric MetricConfig) error {
if Metrics == nil {
Metrics = make(map[string]MetricConfig, 0)
} }
if existingMetric, ok := Metrics[name]; ok { // Helper function to add metric configuration
if existingMetric.Frequency != metric.Frequency { addMetricConfig := func(mc *schema.MetricConfig) {
if existingMetric.Frequency < metric.Frequency { agg, err := AssignAggregationStrategy(mc.Aggregation)
existingMetric.Frequency = metric.Frequency if err != nil {
Metrics[name] = existingMetric cclog.Warnf("Could not find aggregation strategy for metric config '%s': %s", mc.Name, err.Error())
}
addMetric(mc.Name, MetricConfig{
Frequency: int64(mc.Timestep),
Aggregation: agg,
})
}
for _, c := range archive.Clusters {
for _, mc := range c.MetricConfig {
addMetricConfig(mc)
}
for _, sc := range c.SubClusters {
for _, mc := range sc.MetricConfig {
addMetricConfig(mc)
} }
} }
} else {
Metrics[name] = metric
} }
return nil return metrics
} }

View File

@@ -24,13 +24,14 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"runtime" "runtime"
"runtime/debug"
"slices" "slices"
"sync" "sync"
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/resampler" "github.com/ClusterCockpit/cc-lib/v2/resampler"
"github.com/ClusterCockpit/cc-lib/v2/schema" "github.com/ClusterCockpit/cc-lib/v2/schema"
@@ -120,7 +121,7 @@ type MemoryStore struct {
// //
// Note: Signal handling must be implemented by the caller. Call Shutdown() when // Note: Signal handling must be implemented by the caller. Call Shutdown() when
// receiving termination signals to ensure checkpoint data is persisted. // receiving termination signals to ensure checkpoint data is persisted.
func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.WaitGroup) {
startupTime := time.Now() startupTime := time.Now()
if rawConfig != nil { if rawConfig != nil {
@@ -138,33 +139,8 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
} }
cclog.Debugf("[METRICSTORE]> Using %d workers for checkpoint/archive operations\n", Keys.NumWorkers) cclog.Debugf("[METRICSTORE]> Using %d workers for checkpoint/archive operations\n", Keys.NumWorkers)
// Helper function to add metric configuration
addMetricConfig := func(mc *schema.MetricConfig) {
agg, err := AssignAggregationStrategy(mc.Aggregation)
if err != nil {
cclog.Warnf("Could not find aggregation strategy for metric config '%s': %s", mc.Name, err.Error())
}
AddMetric(mc.Name, MetricConfig{
Frequency: int64(mc.Timestep),
Aggregation: agg,
})
}
for _, c := range archive.Clusters {
for _, mc := range c.MetricConfig {
addMetricConfig(mc)
}
for _, sc := range c.SubClusters {
for _, mc := range sc.MetricConfig {
addMetricConfig(mc)
}
}
}
// Pass the config.MetricStoreKeys // Pass the config.MetricStoreKeys
InitMetrics(Metrics) InitMetrics(metrics)
ms := GetMemoryStore() ms := GetMemoryStore()
@@ -189,24 +165,10 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
// previously active heap, a GC is triggered. // previously active heap, a GC is triggered.
// Forcing a GC here will set the "previously active heap" // Forcing a GC here will set the "previously active heap"
// to a minumum. // to a minumum.
runtime.GC() // runtime.GC()
ctx, shutdown := context.WithCancel(context.Background()) ctx, shutdown := context.WithCancel(context.Background())
retentionGoroutines := 1
checkpointingGoroutines := 1
dataStagingGoroutines := 1
archivingGoroutines := 1
memoryUsageTracker := 1
totalGoroutines := retentionGoroutines +
checkpointingGoroutines +
dataStagingGoroutines +
archivingGoroutines +
memoryUsageTracker
wg.Add(totalGoroutines)
Retention(wg, ctx) Retention(wg, ctx)
Checkpointing(wg, ctx) Checkpointing(wg, ctx)
CleanUp(wg, ctx) CleanUp(wg, ctx)
@@ -279,6 +241,13 @@ func GetMemoryStore() *MemoryStore {
return msInstance return msInstance
} }
func (ms *MemoryStore) GetMetricFrequency(metricName string) (int64, error) {
if metric, ok := ms.Metrics[metricName]; ok {
return metric.Frequency, nil
}
return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName)
}
// SetNodeProvider sets the NodeProvider implementation for the MemoryStore. // SetNodeProvider sets the NodeProvider implementation for the MemoryStore.
// This must be called during initialization to provide job state information // This must be called during initialization to provide job state information
// for selective buffer retention during Free operations. // for selective buffer retention during Free operations.
@@ -343,6 +312,7 @@ func Shutdown() {
func Retention(wg *sync.WaitGroup, ctx context.Context) { func Retention(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore() ms := GetMemoryStore()
wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
d, err := time.ParseDuration(Keys.RetentionInMemory) d, err := time.ParseDuration(Keys.RetentionInMemory)
@@ -388,9 +358,13 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
// MemoryUsageTracker starts a background goroutine that monitors memory usage. // MemoryUsageTracker starts a background goroutine that monitors memory usage.
// //
// This worker checks memory usage every minute and force-frees buffers if memory // This worker checks actual process memory usage (via runtime.MemStats) periodically
// exceeds the configured cap. It protects against infinite loops by limiting // and force-frees buffers if memory exceeds the configured cap. It uses FreeOSMemory()
// iterations and forcing garbage collection between attempts. // to return memory to the OS after freeing buffers, avoiding aggressive GC that causes
// performance issues.
//
// The tracker logs both actual memory usage (heap allocated) and metric data size for
// visibility into memory overhead from Go runtime structures and allocations.
// //
// Parameters: // Parameters:
// - wg: WaitGroup to signal completion when context is cancelled // - wg: WaitGroup to signal completion when context is cancelled
@@ -400,6 +374,7 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore() ms := GetMemoryStore()
wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
d := DefaultMemoryUsageTrackerInterval d := DefaultMemoryUsageTrackerInterval
@@ -416,65 +391,75 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-ticker.C: case <-ticker.C:
state.mu.RLock() var mem runtime.MemStats
runtime.ReadMemStats(&mem)
actualMemoryGB := float64(mem.Alloc) / 1e9
metricDataGB := ms.SizeInGB()
cclog.Infof("[METRICSTORE]> memory usage: %.2f GB actual (%.2f GB metric data)", actualMemoryGB, metricDataGB)
memoryUsageGB := ms.SizeInGB() freedExcluded := 0
cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB\n", memoryUsageGB) freedEmergency := 0
freedTotal := 0
var err error var err error
// First force-free all the checkpoints that were state.mu.RLock()
if state.lastRetentionTime != 0 && state.selectorsExcluded { lastRetention := state.lastRetentionTime
freedTotal, err = ms.Free(nil, state.lastRetentionTime) selectorsExcluded := state.selectorsExcluded
state.mu.RUnlock()
if lastRetention != 0 && selectorsExcluded {
freedExcluded, err = ms.Free(nil, lastRetention)
if err != nil { if err != nil {
cclog.Errorf("[METRICSTORE]> error while force-freeing the excluded buffers: %s", err) cclog.Errorf("[METRICSTORE]> error while force-freeing the excluded buffers: %s", err)
} }
// Calling runtime.GC() twice in succession tp completely empty a bufferPool (sync.Pool) if freedExcluded > 0 {
runtime.GC() debug.FreeOSMemory()
runtime.GC() cclog.Infof("[METRICSTORE]> done: %d excluded buffers force-freed", freedExcluded)
}
cclog.Infof("[METRICSTORE]> done: %d excluded buffers force-freed\n", freedTotal)
} }
state.mu.RUnlock() runtime.ReadMemStats(&mem)
actualMemoryGB = float64(mem.Alloc) / 1e9
memoryUsageGB = ms.SizeInGB() if actualMemoryGB > float64(Keys.MemoryCap) {
cclog.Warnf("[METRICSTORE]> memory usage %.2f GB exceeds cap %d GB, starting emergency buffer freeing", actualMemoryGB, Keys.MemoryCap)
if memoryUsageGB > float64(Keys.MemoryCap) {
cclog.Warnf("[METRICSTORE]> memory usage is still greater than the Memory Cap: %d GB\n", Keys.MemoryCap)
cclog.Warnf("[METRICSTORE]> starting to force-free the buffers from the Metric Store\n")
const maxIterations = 100 const maxIterations = 100
for range maxIterations { for i := range maxIterations {
memoryUsageGB = ms.SizeInGB() if actualMemoryGB < float64(Keys.MemoryCap) {
if memoryUsageGB < float64(Keys.MemoryCap) {
break break
} }
freed, err := ms.ForceFree() freed, err := ms.ForceFree()
if err != nil { if err != nil {
cclog.Errorf("[METRICSTORE]> error while force-freeing the buffers: %s", err) cclog.Errorf("[METRICSTORE]> error while force-freeing buffers: %s", err)
} }
if freed == 0 { if freed == 0 {
cclog.Errorf("[METRICSTORE]> 0 buffers force-freed in last try, %d total buffers force-freed, memory usage of %.2f GB remains higher than the memory cap of %d GB and there are no buffers left to force-free\n", freedTotal, memoryUsageGB, Keys.MemoryCap) cclog.Errorf("[METRICSTORE]> no more buffers to free after %d emergency frees, memory usage %.2f GB still exceeds cap %d GB", freedEmergency, actualMemoryGB, Keys.MemoryCap)
break break
} }
freedTotal += freed freedEmergency += freed
runtime.GC() if i%10 == 0 && freedEmergency > 0 {
runtime.ReadMemStats(&mem)
actualMemoryGB = float64(mem.Alloc) / 1e9
}
} }
if memoryUsageGB >= float64(Keys.MemoryCap) { // if freedEmergency > 0 {
cclog.Errorf("[METRICSTORE]> reached maximum iterations (%d) or no more buffers to free, current memory usage: %.2f GB\n", maxIterations, memoryUsageGB) // debug.FreeOSMemory()
// }
runtime.ReadMemStats(&mem)
actualMemoryGB = float64(mem.Alloc) / 1e9
if actualMemoryGB >= float64(Keys.MemoryCap) {
cclog.Errorf("[METRICSTORE]> after %d emergency frees, memory usage %.2f GB still at/above cap %d GB", freedEmergency, actualMemoryGB, Keys.MemoryCap)
} else { } else {
cclog.Infof("[METRICSTORE]> done: %d buffers force-freed\n", freedTotal) cclog.Infof("[METRICSTORE]> emergency freeing complete: %d buffers freed, memory now %.2f GB", freedEmergency, actualMemoryGB)
cclog.Infof("[METRICSTORE]> current memory usage after force-freeing the buffers: %.2f GB\n", memoryUsageGB)
} }
} }
} }
} }
}() }()

View File

@@ -38,72 +38,6 @@ func TestAssignAggregationStrategy(t *testing.T) {
} }
} }
func TestAddMetric(t *testing.T) {
// Reset Metrics before test
Metrics = make(map[string]MetricConfig)
err := AddMetric("test_metric", MetricConfig{
Frequency: 60,
Aggregation: SumAggregation,
})
if err != nil {
t.Errorf("AddMetric() error = %v", err)
}
if _, ok := Metrics["test_metric"]; !ok {
t.Error("AddMetric() did not add metric to Metrics map")
}
// Test updating with higher frequency
err = AddMetric("test_metric", MetricConfig{
Frequency: 120,
Aggregation: SumAggregation,
})
if err != nil {
t.Errorf("AddMetric() error = %v", err)
}
if Metrics["test_metric"].Frequency != 120 {
t.Errorf("AddMetric() frequency = %d, want 120", Metrics["test_metric"].Frequency)
}
// Test updating with lower frequency (should not update)
err = AddMetric("test_metric", MetricConfig{
Frequency: 30,
Aggregation: SumAggregation,
})
if err != nil {
t.Errorf("AddMetric() error = %v", err)
}
if Metrics["test_metric"].Frequency != 120 {
t.Errorf("AddMetric() frequency = %d, want 120 (should not downgrade)", Metrics["test_metric"].Frequency)
}
}
func TestGetMetricFrequency(t *testing.T) {
// Reset Metrics before test
Metrics = map[string]MetricConfig{
"test_metric": {
Frequency: 60,
Aggregation: SumAggregation,
},
}
freq, err := GetMetricFrequency("test_metric")
if err != nil {
t.Errorf("GetMetricFrequency() error = %v", err)
}
if freq != 60 {
t.Errorf("GetMetricFrequency() = %d, want 60", freq)
}
_, err = GetMetricFrequency("nonexistent")
if err == nil {
t.Error("GetMetricFrequency() expected error for nonexistent metric")
}
}
func TestBufferWrite(t *testing.T) { func TestBufferWrite(t *testing.T) {
b := newBuffer(100, 10) b := newBuffer(100, 10)

View File

@@ -286,6 +286,8 @@
sort((a, b) => b.count - a.count) sort((a, b) => b.count - a.count)
}); });
const sortedClusterMetrics = $derived($statusQuery?.data?.clusterMetrics?.metrics.sort((a, b) => b.name.localeCompare(a.name)));
/* Functions */ /* Functions */
function transformNodesStatsToData(subclusterData) { function transformNodesStatsToData(subclusterData) {
let data = null let data = null
@@ -516,10 +518,10 @@
<h5 class="mt-2 mb-0"> <h5 class="mt-2 mb-0">
Cluster Utilization ( Cluster Utilization (
<span style="color: #0000ff;"> <span style="color: #0000ff;">
{`${$statusQuery?.data?.clusterMetrics?.metrics[0]?.name} (${$statusQuery?.data?.clusterMetrics?.metrics[0]?.unit?.prefix}${$statusQuery?.data?.clusterMetrics?.metrics[0]?.unit?.base})`} {`${sortedClusterMetrics[0]?.name} (${sortedClusterMetrics[0]?.unit?.prefix}${sortedClusterMetrics[0]?.unit?.base})`}
</span>, </span>,
<span style="color: #ff0000;"> <span style="color: #ff0000;">
{`${$statusQuery?.data?.clusterMetrics?.metrics[1]?.name} (${$statusQuery?.data?.clusterMetrics?.metrics[1]?.unit?.prefix}${$statusQuery?.data?.clusterMetrics?.metrics[1]?.unit?.base})`} {`${sortedClusterMetrics[1]?.name} (${sortedClusterMetrics[1]?.unit?.prefix}${sortedClusterMetrics[1]?.unit?.base})`}
</span> </span>
) )
</h5> </h5>
@@ -528,7 +530,7 @@
<DoubleMetric <DoubleMetric
timestep={$statusQuery?.data?.clusterMetrics[0]?.timestep || 60} timestep={$statusQuery?.data?.clusterMetrics[0]?.timestep || 60}
numNodes={$statusQuery?.data?.clusterMetrics?.nodeCount || 0} numNodes={$statusQuery?.data?.clusterMetrics?.nodeCount || 0}
metricData={$statusQuery?.data?.clusterMetrics?.metrics || []} metricData={sortedClusterMetrics || []}
height={250} height={250}
publicMode publicMode
/> />

View File

@@ -4,7 +4,7 @@
Only width/height should change reactively. Only width/height should change reactively.
Properties: Properties:
- `metricData [Data]`: Two series of metric data including unit info - `metricData [Data]`: Two series of metric data including unit info, unsorted
- `timestep Number`: Data timestep - `timestep Number`: Data timestep
- `numNodes Number`: Number of nodes from which metric data is aggregated - `numNodes Number`: Number of nodes from which metric data is aggregated
- `cluster String`: Cluster name of the parent job / data [Default: ""] - `cluster String`: Cluster name of the parent job / data [Default: ""]
@@ -46,10 +46,11 @@
let uplot = $state(null); let uplot = $state(null);
/* Derived */ /* Derived */
const sortedMetricData = $derived(publicMode ? [...metricData] : metricData.sort((a, b) => b.name.localeCompare(a.name))); // PublicMode: Presorted
const maxX = $derived(longestSeries * timestep); const maxX = $derived(longestSeries * timestep);
const lineWidth = $derived(publicMode ? 2 : clusterCockpitConfig.plotConfiguration_lineWidth / window.devicePixelRatio); const lineWidth = $derived(publicMode ? 2 : clusterCockpitConfig.plotConfiguration_lineWidth / window.devicePixelRatio);
const longestSeries = $derived.by(() => { const longestSeries = $derived.by(() => {
return metricData.reduce((n, m) => Math.max(n, m.data.length), 0); return sortedMetricData.reduce((n, m) => Math.max(n, m.data.length), 0);
}); });
// Derive Plot Params // Derive Plot Params
@@ -68,8 +69,8 @@
}; };
}; };
// Y // Y
for (let i = 0; i < metricData.length; i++) { for (let i = 0; i < sortedMetricData.length; i++) {
pendingData.push(metricData[i]?.data); pendingData.push(sortedMetricData[i]?.data);
}; };
return pendingData; return pendingData;
}) })
@@ -84,9 +85,9 @@
} }
]; ];
// Y // Y
for (let i = 0; i < metricData.length; i++) { for (let i = 0; i < sortedMetricData.length; i++) {
pendingSeries.push({ pendingSeries.push({
label: publicMode ? null : `${metricData[i]?.name} (${metricData[i]?.unit?.prefix}${metricData[i]?.unit?.base})`, label: publicMode ? null : `${sortedMetricData[i]?.name} (${sortedMetricData[i]?.unit?.prefix}${sortedMetricData[i]?.unit?.base})`,
scale: `y${i+1}`, scale: `y${i+1}`,
width: lineWidth, width: lineWidth,
stroke: fixedLineColors[i], stroke: fixedLineColors[i],
@@ -156,9 +157,9 @@
// X // X
baseOpts.axes[0].label = 'Time'; baseOpts.axes[0].label = 'Time';
// Y1 // Y1
baseOpts.axes[1].label = `${metricData[0]?.name} (${metricData[0]?.unit?.prefix}${metricData[0]?.unit?.base})`; baseOpts.axes[1].label = `${sortedMetricData[0]?.name} (${sortedMetricData[0]?.unit?.prefix}${sortedMetricData[0]?.unit?.base})`;
// Y2 // Y2
baseOpts.axes[2].label = `${metricData[1]?.name} (${metricData[1]?.unit?.prefix}${metricData[1]?.unit?.base})`; baseOpts.axes[2].label = `${sortedMetricData[1]?.name} (${sortedMetricData[1]?.unit?.prefix}${sortedMetricData[1]?.unit?.base})`;
baseOpts.hooks.draw = [ baseOpts.hooks.draw = [
(u) => { (u) => {
// Draw plot type label: // Draw plot type label:
@@ -212,7 +213,7 @@
style = { backgroundColor: "rgba(255, 249, 196, 0.92)", color: "black" }, style = { backgroundColor: "rgba(255, 249, 196, 0.92)", color: "black" },
} = {}) { } = {}) {
let legendEl; let legendEl;
const dataSize = metricData.length; const dataSize = sortedMetricData.length;
function init(u, opts) { function init(u, opts) {
legendEl = u.root.querySelector(".u-legend"); legendEl = u.root.querySelector(".u-legend");
@@ -311,7 +312,7 @@
</script> </script>
<!-- Define $width Wrapper and NoData Card --> <!-- Define $width Wrapper and NoData Card -->
{#if metricData[0]?.data && metricData[0]?.data?.length > 0} {#if sortedMetricData[0]?.data && sortedMetricData[0]?.data?.length > 0}
<div bind:this={plotWrapper} bind:clientWidth={width} <div bind:this={plotWrapper} bind:clientWidth={width}
class={forNode ? 'py-2 rounded' : 'rounded'} class={forNode ? 'py-2 rounded' : 'rounded'}
></div> ></div>