From bc43c844fc7db12212964cd2833bcaeae3895c92 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 20 Oct 2025 10:22:40 +0200 Subject: [PATCH] Fix memoryStore Init and move MetricConfig init --- cmd/cc-backend/main.go | 4 +-- internal/api/cluster.go | 7 ++--- internal/api/rest.go | 6 ++++- internal/memorystore/memorystore.go | 42 ++++++++++++++++++++++++----- pkg/archive/clusterConfig.go | 14 ---------- 5 files changed, 45 insertions(+), 28 deletions(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 659998d..aeace97 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -248,12 +248,10 @@ func main() { // Metric Store starts after all flags have been processes if memorystore.InternalCCMSFlag { if mscfg := ccconf.GetPackageConfig("metric-store"); mscfg != nil { - memorystore.InitMetricStore(mscfg) + memorystore.Init(mscfg, &wg) } else { cclog.Abort("Metric Store configuration must be present") } - - memorystore.Init(&wg) } archiver.Start(repository.GetJobRepository()) diff --git a/internal/api/cluster.go b/internal/api/cluster.go index 0a11d9d..2576067 100644 --- a/internal/api/cluster.go +++ b/internal/api/cluster.go @@ -2,6 +2,7 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package api import ( @@ -15,8 +16,8 @@ import ( "github.com/ClusterCockpit/cc-lib/schema" ) -// GetClustersApiResponse model -type GetClustersApiResponse struct { +// GetClustersAPIResponse model +type GetClustersAPIResponse struct { Clusters []*schema.Cluster `json:"clusters"` // Array of clusters } @@ -59,7 +60,7 @@ func (api *RestApi) getClusters(rw http.ResponseWriter, r *http.Request) { clusters = archive.Clusters } - payload := GetClustersApiResponse{ + payload := GetClustersAPIResponse{ Clusters: clusters, } diff --git a/internal/api/rest.go b/internal/api/rest.go index 1c6b601..907e737 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -97,11 +97,15 @@ func (api *RestApi) MountUserApiRoutes(r *mux.Router) { func (api *RestApi) MountMetricStoreApiRoutes(r *mux.Router) { // REST API Uses TokenAuth - // Refactor ?? r.HandleFunc("/api/free", freeMetrics).Methods(http.MethodPost) r.HandleFunc("/api/write", writeMetrics).Methods(http.MethodPost) r.HandleFunc("/api/debug", debugMetrics).Methods(http.MethodGet) r.HandleFunc("/api/healthcheck", metricsHealth).Methods(http.MethodGet) + // Same endpoints but with trailing slash + r.HandleFunc("/api/free/", freeMetrics).Methods(http.MethodPost) + r.HandleFunc("/api/write/", writeMetrics).Methods(http.MethodPost) + r.HandleFunc("/api/debug/", debugMetrics).Methods(http.MethodGet) + r.HandleFunc("/api/healthcheck/", metricsHealth).Methods(http.MethodGet) } func (api *RestApi) MountConfigApiRoutes(r *mux.Router) { diff --git a/internal/memorystore/memorystore.go b/internal/memorystore/memorystore.go index ed0b4c8..d76b83b 100644 --- a/internal/memorystore/memorystore.go +++ b/internal/memorystore/memorystore.go @@ -18,6 +18,7 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/resampler" "github.com/ClusterCockpit/cc-lib/runtimeEnv" @@ -30,8 +31,6 @@ var ( msInstance *MemoryStore ) -var Clusters = make([]string, 0) - var NumWorkers int = 4 func init() { @@ -62,6 +61,34 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { } } + for _, c := range archive.Clusters { + for _, mc := range c.MetricConfig { + agg, err := AssignAggregationStratergy(mc.Aggregation) + if err != nil { + cclog.Warnf("Could not find aggregation stratergy for metric config '%s': %s", mc.Name, err.Error()) + } + + AddMetric(mc.Name, MetricConfig{ + Frequency: int64(mc.Timestep), + Aggregation: agg, + }) + } + + for _, sc := range c.SubClusters { + for _, mc := range sc.MetricConfig { + agg, err := AssignAggregationStratergy(mc.Aggregation) + if err != nil { + cclog.Warnf("Could not find aggregation stratergy for metric config '%s': %s", mc.Name, err.Error()) + } + + AddMetric(mc.Name, MetricConfig{ + Frequency: int64(mc.Timestep), + Aggregation: agg, + }) + } + } + } + // Pass the config.MetricStoreKeys InitMetrics(Metrics) @@ -312,7 +339,7 @@ func (m *MemoryStore) GetLevel(selector []string) *Level { return m.root.findLevelOrCreate(selector, len(m.Metrics)) } -// Assumes that `minfo` in `metrics` is filled in! +// WriteToLevel assumes that `minfo` in `metrics` is filled in func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metrics []Metric) error { l = l.findLevelOrCreate(selector, len(m.Metrics)) l.lock.Lock() @@ -343,7 +370,7 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric return nil } -// Returns all values for metric `metric` from `from` to `to` for the selected level(s). +// Read returns all values for metric `metric` from `from` to `to` for the selected level(s). // If the level does not hold the metric itself, the data will be aggregated recursively from the children. // The second and third return value are the actual from/to for the data. Those can be different from // the range asked for if no data was available. @@ -413,8 +440,8 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso return data, from, to, resolution, nil } -// Release all buffers for the selected level and all its children that contain only -// values older than `t`. +// Free releases all buffers for the selected level and all its children that +// contain only values older than `t`. func (m *MemoryStore) Free(selector []string, t int64) (int, error) { return m.GetLevel(selector).free(t) } @@ -431,7 +458,8 @@ func (m *MemoryStore) SizeInBytes() int64 { return m.root.sizeInBytes() } -// Given a selector, return a list of all children of the level selected. +// ListChildren , given a selector, returns a list of all children of the level +// selected. func (m *MemoryStore) ListChildren(selector []string) []string { lvl := &m.root for lvl != nil && len(selector) != 0 { diff --git a/pkg/archive/clusterConfig.go b/pkg/archive/clusterConfig.go index 9e490c5..13890c9 100644 --- a/pkg/archive/clusterConfig.go +++ b/pkg/archive/clusterConfig.go @@ -9,8 +9,6 @@ import ( "errors" "fmt" - "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/internal/memorystore" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/schema" ) @@ -34,8 +32,6 @@ func initClusterConfig() error { return err } - memorystore.Clusters = append(memorystore.Clusters, cluster.Name) - if len(cluster.Name) == 0 || len(cluster.MetricConfig) == 0 || len(cluster.SubClusters) == 0 { @@ -127,16 +123,6 @@ func initClusterConfig() error { } ml.Availability = append(metricLookup[mc.Name].Availability, availability) metricLookup[mc.Name] = ml - - agg, err := config.AssignAggregationStratergy(mc.Aggregation) - if err != nil { - return fmt.Errorf("ARCHIVE/CLUSTERCONFIG > in %s/cluster.json: %w", cluster.Name, err) - } - - config.AddMetric(mc.Name, config.MetricConfig{ - Frequency: int64(mc.Timestep), - Aggregation: agg, - }) } Clusters = append(Clusters, cluster)