diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 9ded95ba..3c70a960 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -321,7 +321,8 @@ func runServer(ctx context.Context) error { haveMetricstore := false mscfg := ccconf.GetPackageConfig("metric-store") if mscfg != nil { - metricstore.Init(mscfg, &wg) + metrics := metricstore.BuildMetricList() + metricstore.Init(mscfg, metrics, &wg) // Inject repository as NodeProvider to break import cycle ms := metricstore.GetMemoryStore() @@ -398,7 +399,7 @@ func runServer(ctx context.Context) error { // Set GC percent if not configured if os.Getenv(envGOGC) == "" { - debug.SetGCPercent(25) + debug.SetGCPercent(15) } runtime.SystemdNotify(true, "running") diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 4a7fc07c..8cbf95d7 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -455,4 +455,38 @@ func TestRestApi(t *testing.T) { if !ok { t.Fatal("subtest failed") } + + t.Run("GetUsedNodesNoRunning", func(t *testing.T) { + contextUserValue := &schema.User{ + Username: "testuser", + Projects: make([]string, 0), + Roles: []string{"api"}, + AuthType: 0, + AuthSource: 2, + } + + req := httptest.NewRequest(http.MethodGet, "/jobs/used_nodes?ts=123456790", nil) + recorder := httptest.NewRecorder() + + ctx := context.WithValue(req.Context(), contextUserKey, contextUserValue) + + r.ServeHTTP(recorder, req.WithContext(ctx)) + response := recorder.Result() + if response.StatusCode != http.StatusOK { + t.Fatal(response.Status, recorder.Body.String()) + } + + var result api.GetUsedNodesAPIResponse + if err := json.NewDecoder(response.Body).Decode(&result); err != nil { + t.Fatal(err) + } + + if result.UsedNodes == nil { + t.Fatal("expected usedNodes to be non-nil") + } + + if len(result.UsedNodes) != 0 { + t.Fatalf("expected no used nodes for stopped jobs, got: %v", result.UsedNodes) + } + }) } diff --git a/internal/api/job.go b/internal/api/job.go index 1b1e05d6..64f6a92c 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -1021,3 +1021,57 @@ func (api *RestAPI) getJobMetrics(rw http.ResponseWriter, r *http.Request) { cclog.Errorf("Failed to encode response: %v", err) } } + +// GetUsedNodesAPIResponse model +type GetUsedNodesAPIResponse struct { + UsedNodes map[string][]string `json:"usedNodes"` // Map of cluster names to lists of used node hostnames +} + +// getUsedNodes godoc +// @summary Lists used nodes by cluster +// @tags Job query +// @description Get a map of cluster names to lists of unique hostnames that are currently in use by running jobs that started before the specified timestamp. +// @produce json +// @param ts query int true "Unix timestamp to filter jobs (jobs with start_time < ts)" +// @success 200 {object} api.GetUsedNodesAPIResponse "Map of cluster names to hostname lists" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 403 {object} api.ErrorResponse "Forbidden" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /api/jobs/used_nodes [get] +func (api *RestAPI) getUsedNodes(rw http.ResponseWriter, r *http.Request) { + if user := repository.GetUserFromContext(r.Context()); user != nil && + !user.HasRole(schema.RoleApi) { + handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw) + return + } + + tsStr := r.URL.Query().Get("ts") + if tsStr == "" { + handleError(fmt.Errorf("missing required query parameter: ts"), http.StatusBadRequest, rw) + return + } + + ts, err := strconv.ParseInt(tsStr, 10, 64) + if err != nil { + handleError(fmt.Errorf("invalid timestamp format: %w", err), http.StatusBadRequest, rw) + return + } + + usedNodes, err := api.JobRepository.GetUsedNodes(ts) + if err != nil { + handleError(fmt.Errorf("failed to get used nodes: %w", err), http.StatusInternalServerError, rw) + return + } + + rw.Header().Add("Content-Type", "application/json") + payload := GetUsedNodesAPIResponse{ + UsedNodes: usedNodes, + } + + if err := json.NewEncoder(rw).Encode(payload); err != nil { + handleError(err, http.StatusInternalServerError, rw) + return + } +} diff --git a/internal/api/rest.go b/internal/api/rest.go index c0fa7c2a..0d52742e 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -89,8 +89,7 @@ func (api *RestAPI) MountAPIRoutes(r *mux.Router) { r.HandleFunc("/jobs/stop_job/", api.stopJobByRequest).Methods(http.MethodPost, http.MethodPut) } r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet) - r.HandleFunc("/jobs/{id}", api.getJobByID).Methods(http.MethodPost) - r.HandleFunc("/jobs/{id}", api.getCompleteJobByID).Methods(http.MethodGet) + r.HandleFunc("/jobs/used_nodes", api.getUsedNodes).Methods(http.MethodGet) r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch) r.HandleFunc("/jobs/tag_job/{id}", api.removeTagJob).Methods(http.MethodDelete) r.HandleFunc("/jobs/edit_meta/{id}", api.editMeta).Methods(http.MethodPost, http.MethodPatch) @@ -98,6 +97,8 @@ func (api *RestAPI) MountAPIRoutes(r *mux.Router) { r.HandleFunc("/jobs/delete_job/", api.deleteJobByRequest).Methods(http.MethodDelete) r.HandleFunc("/jobs/delete_job/{id}", api.deleteJobByID).Methods(http.MethodDelete) r.HandleFunc("/jobs/delete_job_before/{ts}", api.deleteJobBefore).Methods(http.MethodDelete) + r.HandleFunc("/jobs/{id}", api.getJobByID).Methods(http.MethodPost) + r.HandleFunc("/jobs/{id}", api.getCompleteJobByID).Methods(http.MethodGet) r.HandleFunc("/tags/", api.removeTags).Methods(http.MethodDelete) diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 3e142f9a..9bc8811d 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -905,26 +905,32 @@ func (r *queryResolver) ClusterMetrics(ctx context.Context, cluster string, metr for _, metrics := range data { clusterMetrics.NodeCount += 1 for metric, scopedMetrics := range metrics { - _, ok := collectorData[metric] - if !ok { - collectorData[metric] = make([]schema.Float, 0) - for _, scopedMetric := range scopedMetrics { - // Collect Info + for _, scopedMetric := range scopedMetrics { + // Collect Info Once + _, okTimestep := collectorTimestep[metric] + if !okTimestep { collectorTimestep[metric] = scopedMetric.Timestep - collectorUnit[metric] = scopedMetric.Unit - // Collect Initial Data - for _, ser := range scopedMetric.Series { - collectorData[metric] = append(collectorData[metric], ser.Data...) - } } - } else { - // Sum up values by index - for _, scopedMetric := range scopedMetrics { - // For This Purpose (Cluster_Wide-Sum of Node Metrics) OK - for _, ser := range scopedMetric.Series { + _, okUnit := collectorUnit[metric] + if !okUnit { + collectorUnit[metric] = scopedMetric.Unit + } + // Collect Data + for _, ser := range scopedMetric.Series { + _, okData := collectorData[metric] + // Init With Datasize > 0 + if !okData && len(ser.Data) != 0 { + collectorData[metric] = make([]schema.Float, len(ser.Data)) + } else if !okData { + cclog.Debugf("ClusterMetrics Skip Init: No Data -> %s at %s; Size %d", metric, ser.Hostname, len(ser.Data)) + } + // Sum if init'd and matching size + if okData && len(ser.Data) == len(collectorData[metric]) { for i, val := range ser.Data { collectorData[metric][i] += val } + } else if okData { + cclog.Debugf("ClusterMetrics Skip Sum: Data Diff -> %s at %s; Want Size %d, Have Size %d", metric, ser.Hostname, len(collectorData[metric]), len(ser.Data)) } } } diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index 6abcb183..cab4c24f 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -49,6 +49,7 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) { // runWorker takes simple values to configure what it does func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) { + wg.Add(1) go func() { defer wg.Done() diff --git a/pkg/metricstore/avroCheckpoint.go b/pkg/metricstore/avroCheckpoint.go index aa14ce5a..14898186 100644 --- a/pkg/metricstore/avroCheckpoint.go +++ b/pkg/metricstore/avroCheckpoint.go @@ -203,6 +203,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { if err != nil { return fmt.Errorf("failed to open existing avro file: %v", err) } + defer f.Close() br := bufio.NewReader(f) @@ -212,8 +213,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { } codec = reader.Codec() schema = codec.Schema() - - f.Close() } timeRef := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix() @@ -249,31 +248,35 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { return fmt.Errorf("failed to compare read and generated schema: %v", err) } if flag && readFlag && !errors.Is(err_, os.ErrNotExist) { - - f.Close() - - f, err = os.Open(filePath) - if err != nil { - return fmt.Errorf("failed to open Avro file: %v", err) - } - - br := bufio.NewReader(f) - - ocfReader, err := goavro.NewOCFReader(br) - if err != nil { - return fmt.Errorf("failed to create OCF reader while changing schema: %v", err) - } - - for ocfReader.Scan() { - record, err := ocfReader.Read() + // Use closure to ensure file is closed even on error + err := func() error { + f2, err := os.Open(filePath) if err != nil { - return fmt.Errorf("failed to read record: %v", err) + return fmt.Errorf("failed to open Avro file: %v", err) + } + defer f2.Close() + + br := bufio.NewReader(f2) + + ocfReader, err := goavro.NewOCFReader(br) + if err != nil { + return fmt.Errorf("failed to create OCF reader while changing schema: %v", err) } - recordList = append(recordList, record.(map[string]any)) - } + for ocfReader.Scan() { + record, err := ocfReader.Read() + if err != nil { + return fmt.Errorf("failed to read record: %v", err) + } - f.Close() + recordList = append(recordList, record.(map[string]any)) + } + + return nil + }() + if err != nil { + return err + } err = os.Remove(filePath) if err != nil { @@ -300,6 +303,7 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { if err != nil { return fmt.Errorf("failed to append new avro file: %v", err) } + defer f.Close() // fmt.Printf("Codec : %#v\n", codec) @@ -317,8 +321,6 @@ func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error { return fmt.Errorf("failed to append record: %v", err) } - f.Close() - return nil } diff --git a/pkg/metricstore/avroHelper.go b/pkg/metricstore/avroHelper.go index 985fdd78..62827afd 100644 --- a/pkg/metricstore/avroHelper.go +++ b/pkg/metricstore/avroHelper.go @@ -15,15 +15,15 @@ import ( ) func DataStaging(wg *sync.WaitGroup, ctx context.Context) { - // AvroPool is a pool of Avro writers. + wg.Add(1) go func() { - if Keys.Checkpoints.FileFormat == "json" { - wg.Done() // Mark this goroutine as done - return // Exit the goroutine - } - defer wg.Done() + if Keys.Checkpoints.FileFormat == "json" { + return + } + + ms := GetMemoryStore() var avroLevel *AvroLevel oldSelector := make([]string, 0) @@ -39,7 +39,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { return } // Process remaining message - freq, err := GetMetricFrequency(val.MetricName) + freq, err := ms.GetMetricFrequency(val.MetricName) if err != nil { continue } @@ -76,7 +76,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { } // Fetch the frequency of the metric from the global configuration - freq, err := GetMetricFrequency(val.MetricName) + freq, err := ms.GetMetricFrequency(val.MetricName) if err != nil { cclog.Errorf("Error fetching metric frequency: %s\n", err) continue diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index b90b1c22..715566e4 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -43,7 +43,6 @@ import ( "os" "path" "path/filepath" - "runtime" "sort" "strconv" "strings" @@ -100,6 +99,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { if Keys.Checkpoints.FileFormat == "json" { ms := GetMemoryStore() + wg.Add(1) go func() { defer wg.Done() d, err := time.ParseDuration(Keys.Checkpoints.Interval) @@ -139,6 +139,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { } }() } else { + wg.Add(1) go func() { defer wg.Done() @@ -394,14 +395,14 @@ func enqueueCheckpointHosts(dir string, work chan<- [2]string) error { } gcCounter++ - if gcCounter%GCTriggerInterval == 0 { - // Forcing garbage collection runs here regulary during the loading of checkpoints - // will decrease the total heap size after loading everything back to memory is done. - // While loading data, the heap will grow fast, so the GC target size will double - // almost always. By forcing GCs here, we can keep it growing more slowly so that - // at the end, less memory is wasted. - runtime.GC() - } + // if gcCounter%GCTriggerInterval == 0 { + // Forcing garbage collection runs here regulary during the loading of checkpoints + // will decrease the total heap size after loading everything back to memory is done. + // While loading data, the heap will grow fast, so the GC target size will double + // almost always. By forcing GCs here, we can keep it growing more slowly so that + // at the end, less memory is wasted. + // runtime.GC() + // } work <- [2]string{clusterDir.Name(), hostDir.Name()} } diff --git a/pkg/metricstore/config.go b/pkg/metricstore/config.go index 44a24f7d..69ee3563 100644 --- a/pkg/metricstore/config.go +++ b/pkg/metricstore/config.go @@ -45,6 +45,10 @@ package metricstore import ( "fmt" "time" + + "github.com/ClusterCockpit/cc-backend/pkg/archive" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" ) const ( @@ -207,55 +211,51 @@ type MetricConfig struct { offset int } -// Metrics is the global map of metric configurations. -// -// Keyed by metric name (e.g., "cpu_load", "mem_used"). Populated during Init() -// from cluster configuration and checkpoint restoration. Each MetricConfig.offset -// corresponds to the buffer slice index in Level.metrics. -var Metrics map[string]MetricConfig +func BuildMetricList() map[string]MetricConfig { + var metrics map[string]MetricConfig = make(map[string]MetricConfig) -// GetMetricFrequency retrieves the measurement interval for a metric. -// -// Parameters: -// - metricName: Metric name (e.g., "cpu_load") -// -// Returns: -// - int64: Frequency in seconds -// - error: Non-nil if metric not found in Metrics map -func GetMetricFrequency(metricName string) (int64, error) { - if metric, ok := Metrics[metricName]; ok { - return metric.Frequency, nil - } - return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName) -} + addMetric := func(name string, metric MetricConfig) error { + if metrics == nil { + metrics = make(map[string]MetricConfig, 0) + } -// AddMetric registers a new metric or updates an existing one. -// -// If the metric already exists with a different frequency, uses the higher frequency -// (finer granularity). This handles cases where different clusters report the same -// metric at different intervals. -// -// Parameters: -// - name: Metric name (e.g., "cpu_load") -// - metric: Configuration (frequency, aggregation strategy) -// -// Returns: -// - error: Always nil (signature for future error handling) -func AddMetric(name string, metric MetricConfig) error { - if Metrics == nil { - Metrics = make(map[string]MetricConfig, 0) + if existingMetric, ok := metrics[name]; ok { + if existingMetric.Frequency != metric.Frequency { + if existingMetric.Frequency < metric.Frequency { + existingMetric.Frequency = metric.Frequency + metrics[name] = existingMetric + } + } + } else { + metrics[name] = metric + } + + return nil } - if existingMetric, ok := Metrics[name]; ok { - if existingMetric.Frequency != metric.Frequency { - if existingMetric.Frequency < metric.Frequency { - existingMetric.Frequency = metric.Frequency - Metrics[name] = existingMetric + // Helper function to add metric configuration + addMetricConfig := func(mc *schema.MetricConfig) { + agg, err := AssignAggregationStrategy(mc.Aggregation) + if err != nil { + cclog.Warnf("Could not find aggregation strategy for metric config '%s': %s", mc.Name, err.Error()) + } + + addMetric(mc.Name, MetricConfig{ + Frequency: int64(mc.Timestep), + Aggregation: agg, + }) + } + for _, c := range archive.Clusters { + for _, mc := range c.MetricConfig { + addMetricConfig(mc) + } + + for _, sc := range c.SubClusters { + for _, mc := range sc.MetricConfig { + addMetricConfig(mc) } } - } else { - Metrics[name] = metric } - return nil + return metrics } diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 0d1f19c9..617e945e 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -24,13 +24,14 @@ import ( "context" "encoding/json" "errors" + "fmt" "runtime" + "runtime/debug" "slices" "sync" "time" "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/resampler" "github.com/ClusterCockpit/cc-lib/v2/schema" @@ -120,7 +121,7 @@ type MemoryStore struct { // // Note: Signal handling must be implemented by the caller. Call Shutdown() when // receiving termination signals to ensure checkpoint data is persisted. -func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { +func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.WaitGroup) { startupTime := time.Now() if rawConfig != nil { @@ -138,33 +139,8 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { } cclog.Debugf("[METRICSTORE]> Using %d workers for checkpoint/archive operations\n", Keys.NumWorkers) - // Helper function to add metric configuration - addMetricConfig := func(mc *schema.MetricConfig) { - agg, err := AssignAggregationStrategy(mc.Aggregation) - if err != nil { - cclog.Warnf("Could not find aggregation strategy for metric config '%s': %s", mc.Name, err.Error()) - } - - AddMetric(mc.Name, MetricConfig{ - Frequency: int64(mc.Timestep), - Aggregation: agg, - }) - } - - for _, c := range archive.Clusters { - for _, mc := range c.MetricConfig { - addMetricConfig(mc) - } - - for _, sc := range c.SubClusters { - for _, mc := range sc.MetricConfig { - addMetricConfig(mc) - } - } - } - // Pass the config.MetricStoreKeys - InitMetrics(Metrics) + InitMetrics(metrics) ms := GetMemoryStore() @@ -189,24 +165,10 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { // previously active heap, a GC is triggered. // Forcing a GC here will set the "previously active heap" // to a minumum. - runtime.GC() + // runtime.GC() ctx, shutdown := context.WithCancel(context.Background()) - retentionGoroutines := 1 - checkpointingGoroutines := 1 - dataStagingGoroutines := 1 - archivingGoroutines := 1 - memoryUsageTracker := 1 - - totalGoroutines := retentionGoroutines + - checkpointingGoroutines + - dataStagingGoroutines + - archivingGoroutines + - memoryUsageTracker - - wg.Add(totalGoroutines) - Retention(wg, ctx) Checkpointing(wg, ctx) CleanUp(wg, ctx) @@ -279,6 +241,13 @@ func GetMemoryStore() *MemoryStore { return msInstance } +func (ms *MemoryStore) GetMetricFrequency(metricName string) (int64, error) { + if metric, ok := ms.Metrics[metricName]; ok { + return metric.Frequency, nil + } + return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName) +} + // SetNodeProvider sets the NodeProvider implementation for the MemoryStore. // This must be called during initialization to provide job state information // for selective buffer retention during Free operations. @@ -343,6 +312,7 @@ func Shutdown() { func Retention(wg *sync.WaitGroup, ctx context.Context) { ms := GetMemoryStore() + wg.Add(1) go func() { defer wg.Done() d, err := time.ParseDuration(Keys.RetentionInMemory) @@ -388,9 +358,13 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { // MemoryUsageTracker starts a background goroutine that monitors memory usage. // -// This worker checks memory usage every minute and force-frees buffers if memory -// exceeds the configured cap. It protects against infinite loops by limiting -// iterations and forcing garbage collection between attempts. +// This worker checks actual process memory usage (via runtime.MemStats) periodically +// and force-frees buffers if memory exceeds the configured cap. It uses FreeOSMemory() +// to return memory to the OS after freeing buffers, avoiding aggressive GC that causes +// performance issues. +// +// The tracker logs both actual memory usage (heap allocated) and metric data size for +// visibility into memory overhead from Go runtime structures and allocations. // // Parameters: // - wg: WaitGroup to signal completion when context is cancelled @@ -400,6 +374,7 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { ms := GetMemoryStore() + wg.Add(1) go func() { defer wg.Done() d := DefaultMemoryUsageTrackerInterval @@ -416,65 +391,75 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - state.mu.RLock() + var mem runtime.MemStats + runtime.ReadMemStats(&mem) + actualMemoryGB := float64(mem.Alloc) / 1e9 + metricDataGB := ms.SizeInGB() + cclog.Infof("[METRICSTORE]> memory usage: %.2f GB actual (%.2f GB metric data)", actualMemoryGB, metricDataGB) - memoryUsageGB := ms.SizeInGB() - cclog.Infof("[METRICSTORE]> current memory usage: %.2f GB\n", memoryUsageGB) - - freedTotal := 0 + freedExcluded := 0 + freedEmergency := 0 var err error - // First force-free all the checkpoints that were - if state.lastRetentionTime != 0 && state.selectorsExcluded { - freedTotal, err = ms.Free(nil, state.lastRetentionTime) + state.mu.RLock() + lastRetention := state.lastRetentionTime + selectorsExcluded := state.selectorsExcluded + state.mu.RUnlock() + + if lastRetention != 0 && selectorsExcluded { + freedExcluded, err = ms.Free(nil, lastRetention) if err != nil { cclog.Errorf("[METRICSTORE]> error while force-freeing the excluded buffers: %s", err) } - // Calling runtime.GC() twice in succession tp completely empty a bufferPool (sync.Pool) - runtime.GC() - runtime.GC() - - cclog.Infof("[METRICSTORE]> done: %d excluded buffers force-freed\n", freedTotal) + if freedExcluded > 0 { + debug.FreeOSMemory() + cclog.Infof("[METRICSTORE]> done: %d excluded buffers force-freed", freedExcluded) + } } - state.mu.RUnlock() + runtime.ReadMemStats(&mem) + actualMemoryGB = float64(mem.Alloc) / 1e9 - memoryUsageGB = ms.SizeInGB() - - if memoryUsageGB > float64(Keys.MemoryCap) { - cclog.Warnf("[METRICSTORE]> memory usage is still greater than the Memory Cap: %d GB\n", Keys.MemoryCap) - cclog.Warnf("[METRICSTORE]> starting to force-free the buffers from the Metric Store\n") + if actualMemoryGB > float64(Keys.MemoryCap) { + cclog.Warnf("[METRICSTORE]> memory usage %.2f GB exceeds cap %d GB, starting emergency buffer freeing", actualMemoryGB, Keys.MemoryCap) const maxIterations = 100 - for range maxIterations { - memoryUsageGB = ms.SizeInGB() - if memoryUsageGB < float64(Keys.MemoryCap) { + for i := range maxIterations { + if actualMemoryGB < float64(Keys.MemoryCap) { break } freed, err := ms.ForceFree() if err != nil { - cclog.Errorf("[METRICSTORE]> error while force-freeing the buffers: %s", err) + cclog.Errorf("[METRICSTORE]> error while force-freeing buffers: %s", err) } if freed == 0 { - cclog.Errorf("[METRICSTORE]> 0 buffers force-freed in last try, %d total buffers force-freed, memory usage of %.2f GB remains higher than the memory cap of %d GB and there are no buffers left to force-free\n", freedTotal, memoryUsageGB, Keys.MemoryCap) + cclog.Errorf("[METRICSTORE]> no more buffers to free after %d emergency frees, memory usage %.2f GB still exceeds cap %d GB", freedEmergency, actualMemoryGB, Keys.MemoryCap) break } - freedTotal += freed + freedEmergency += freed - runtime.GC() + if i%10 == 0 && freedEmergency > 0 { + runtime.ReadMemStats(&mem) + actualMemoryGB = float64(mem.Alloc) / 1e9 + } } - if memoryUsageGB >= float64(Keys.MemoryCap) { - cclog.Errorf("[METRICSTORE]> reached maximum iterations (%d) or no more buffers to free, current memory usage: %.2f GB\n", maxIterations, memoryUsageGB) + // if freedEmergency > 0 { + // debug.FreeOSMemory() + // } + + runtime.ReadMemStats(&mem) + actualMemoryGB = float64(mem.Alloc) / 1e9 + + if actualMemoryGB >= float64(Keys.MemoryCap) { + cclog.Errorf("[METRICSTORE]> after %d emergency frees, memory usage %.2f GB still at/above cap %d GB", freedEmergency, actualMemoryGB, Keys.MemoryCap) } else { - cclog.Infof("[METRICSTORE]> done: %d buffers force-freed\n", freedTotal) - cclog.Infof("[METRICSTORE]> current memory usage after force-freeing the buffers: %.2f GB\n", memoryUsageGB) + cclog.Infof("[METRICSTORE]> emergency freeing complete: %d buffers freed, memory now %.2f GB", freedEmergency, actualMemoryGB) } } - } } }() diff --git a/pkg/metricstore/metricstore_test.go b/pkg/metricstore/metricstore_test.go index fd7c963f..90cec2bd 100644 --- a/pkg/metricstore/metricstore_test.go +++ b/pkg/metricstore/metricstore_test.go @@ -38,72 +38,6 @@ func TestAssignAggregationStrategy(t *testing.T) { } } -func TestAddMetric(t *testing.T) { - // Reset Metrics before test - Metrics = make(map[string]MetricConfig) - - err := AddMetric("test_metric", MetricConfig{ - Frequency: 60, - Aggregation: SumAggregation, - }) - if err != nil { - t.Errorf("AddMetric() error = %v", err) - } - - if _, ok := Metrics["test_metric"]; !ok { - t.Error("AddMetric() did not add metric to Metrics map") - } - - // Test updating with higher frequency - err = AddMetric("test_metric", MetricConfig{ - Frequency: 120, - Aggregation: SumAggregation, - }) - if err != nil { - t.Errorf("AddMetric() error = %v", err) - } - - if Metrics["test_metric"].Frequency != 120 { - t.Errorf("AddMetric() frequency = %d, want 120", Metrics["test_metric"].Frequency) - } - - // Test updating with lower frequency (should not update) - err = AddMetric("test_metric", MetricConfig{ - Frequency: 30, - Aggregation: SumAggregation, - }) - if err != nil { - t.Errorf("AddMetric() error = %v", err) - } - - if Metrics["test_metric"].Frequency != 120 { - t.Errorf("AddMetric() frequency = %d, want 120 (should not downgrade)", Metrics["test_metric"].Frequency) - } -} - -func TestGetMetricFrequency(t *testing.T) { - // Reset Metrics before test - Metrics = map[string]MetricConfig{ - "test_metric": { - Frequency: 60, - Aggregation: SumAggregation, - }, - } - - freq, err := GetMetricFrequency("test_metric") - if err != nil { - t.Errorf("GetMetricFrequency() error = %v", err) - } - if freq != 60 { - t.Errorf("GetMetricFrequency() = %d, want 60", freq) - } - - _, err = GetMetricFrequency("nonexistent") - if err == nil { - t.Error("GetMetricFrequency() expected error for nonexistent metric") - } -} - func TestBufferWrite(t *testing.T) { b := newBuffer(100, 10) diff --git a/web/frontend/src/DashPublic.root.svelte b/web/frontend/src/DashPublic.root.svelte index 91f4664c..9c17e7d8 100644 --- a/web/frontend/src/DashPublic.root.svelte +++ b/web/frontend/src/DashPublic.root.svelte @@ -286,6 +286,8 @@ sort((a, b) => b.count - a.count) }); + const sortedClusterMetrics = $derived($statusQuery?.data?.clusterMetrics?.metrics.sort((a, b) => b.name.localeCompare(a.name))); + /* Functions */ function transformNodesStatsToData(subclusterData) { let data = null @@ -516,10 +518,10 @@