Fix memoryStore Init and move MetricConfig init

This commit is contained in:
2025-10-20 10:22:40 +02:00
parent 67be9aa27b
commit bc43c844fc
5 changed files with 45 additions and 28 deletions

View File

@@ -248,12 +248,10 @@ func main() {
// Metric Store starts after all flags have been processes
if memorystore.InternalCCMSFlag {
if mscfg := ccconf.GetPackageConfig("metric-store"); mscfg != nil {
memorystore.InitMetricStore(mscfg)
memorystore.Init(mscfg, &wg)
} else {
cclog.Abort("Metric Store configuration must be present")
}
memorystore.Init(&wg)
}
archiver.Start(repository.GetJobRepository())

View File

@@ -2,6 +2,7 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package api
import (
@@ -15,8 +16,8 @@ import (
"github.com/ClusterCockpit/cc-lib/schema"
)
// GetClustersApiResponse model
type GetClustersApiResponse struct {
// GetClustersAPIResponse model
type GetClustersAPIResponse struct {
Clusters []*schema.Cluster `json:"clusters"` // Array of clusters
}
@@ -59,7 +60,7 @@ func (api *RestApi) getClusters(rw http.ResponseWriter, r *http.Request) {
clusters = archive.Clusters
}
payload := GetClustersApiResponse{
payload := GetClustersAPIResponse{
Clusters: clusters,
}

View File

@@ -97,11 +97,15 @@ func (api *RestApi) MountUserApiRoutes(r *mux.Router) {
func (api *RestApi) MountMetricStoreApiRoutes(r *mux.Router) {
// REST API Uses TokenAuth
// Refactor ??
r.HandleFunc("/api/free", freeMetrics).Methods(http.MethodPost)
r.HandleFunc("/api/write", writeMetrics).Methods(http.MethodPost)
r.HandleFunc("/api/debug", debugMetrics).Methods(http.MethodGet)
r.HandleFunc("/api/healthcheck", metricsHealth).Methods(http.MethodGet)
// Same endpoints but with trailing slash
r.HandleFunc("/api/free/", freeMetrics).Methods(http.MethodPost)
r.HandleFunc("/api/write/", writeMetrics).Methods(http.MethodPost)
r.HandleFunc("/api/debug/", debugMetrics).Methods(http.MethodGet)
r.HandleFunc("/api/healthcheck/", metricsHealth).Methods(http.MethodGet)
}
func (api *RestApi) MountConfigApiRoutes(r *mux.Router) {

View File

@@ -18,6 +18,7 @@ import (
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/resampler"
"github.com/ClusterCockpit/cc-lib/runtimeEnv"
@@ -30,8 +31,6 @@ var (
msInstance *MemoryStore
)
var Clusters = make([]string, 0)
var NumWorkers int = 4
func init() {
@@ -62,6 +61,34 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
}
}
for _, c := range archive.Clusters {
for _, mc := range c.MetricConfig {
agg, err := AssignAggregationStratergy(mc.Aggregation)
if err != nil {
cclog.Warnf("Could not find aggregation stratergy for metric config '%s': %s", mc.Name, err.Error())
}
AddMetric(mc.Name, MetricConfig{
Frequency: int64(mc.Timestep),
Aggregation: agg,
})
}
for _, sc := range c.SubClusters {
for _, mc := range sc.MetricConfig {
agg, err := AssignAggregationStratergy(mc.Aggregation)
if err != nil {
cclog.Warnf("Could not find aggregation stratergy for metric config '%s': %s", mc.Name, err.Error())
}
AddMetric(mc.Name, MetricConfig{
Frequency: int64(mc.Timestep),
Aggregation: agg,
})
}
}
}
// Pass the config.MetricStoreKeys
InitMetrics(Metrics)
@@ -312,7 +339,7 @@ func (m *MemoryStore) GetLevel(selector []string) *Level {
return m.root.findLevelOrCreate(selector, len(m.Metrics))
}
// Assumes that `minfo` in `metrics` is filled in!
// WriteToLevel assumes that `minfo` in `metrics` is filled in
func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metrics []Metric) error {
l = l.findLevelOrCreate(selector, len(m.Metrics))
l.lock.Lock()
@@ -343,7 +370,7 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric
return nil
}
// Returns all values for metric `metric` from `from` to `to` for the selected level(s).
// Read returns all values for metric `metric` from `from` to `to` for the selected level(s).
// If the level does not hold the metric itself, the data will be aggregated recursively from the children.
// The second and third return value are the actual from/to for the data. Those can be different from
// the range asked for if no data was available.
@@ -413,8 +440,8 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso
return data, from, to, resolution, nil
}
// Release all buffers for the selected level and all its children that contain only
// values older than `t`.
// Free releases all buffers for the selected level and all its children that
// contain only values older than `t`.
func (m *MemoryStore) Free(selector []string, t int64) (int, error) {
return m.GetLevel(selector).free(t)
}
@@ -431,7 +458,8 @@ func (m *MemoryStore) SizeInBytes() int64 {
return m.root.sizeInBytes()
}
// Given a selector, return a list of all children of the level selected.
// ListChildren , given a selector, returns a list of all children of the level
// selected.
func (m *MemoryStore) ListChildren(selector []string) []string {
lvl := &m.root
for lvl != nil && len(selector) != 0 {

View File

@@ -9,8 +9,6 @@ import (
"errors"
"fmt"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/memorystore"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
)
@@ -34,8 +32,6 @@ func initClusterConfig() error {
return err
}
memorystore.Clusters = append(memorystore.Clusters, cluster.Name)
if len(cluster.Name) == 0 ||
len(cluster.MetricConfig) == 0 ||
len(cluster.SubClusters) == 0 {
@@ -127,16 +123,6 @@ func initClusterConfig() error {
}
ml.Availability = append(metricLookup[mc.Name].Availability, availability)
metricLookup[mc.Name] = ml
agg, err := config.AssignAggregationStratergy(mc.Aggregation)
if err != nil {
return fmt.Errorf("ARCHIVE/CLUSTERCONFIG > in %s/cluster.json: %w", cluster.Name, err)
}
config.AddMetric(mc.Name, config.MetricConfig{
Frequency: int64(mc.Timestep),
Aggregation: agg,
})
}
Clusters = append(Clusters, cluster)