From 11ec2267daaf67cbc6602e3ba395a16ede96503e Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 25 Dec 2025 08:42:54 +0100 Subject: [PATCH] Major refactor of metric data handling - make the internal memory store required and default - Rename memorystore to metricstore - Rename metricDataDispatcher to metricdispatch - Remove metricdata package - Introduce metricsync package for upstream metric data pull --- CLAUDE.md | 10 +- README.md | 8 +- cmd/cc-backend/main.go | 21 +- cmd/cc-backend/server.go | 10 +- internal/api/api_test.go | 12 +- internal/api/job.go | 6 +- .../api/{memorystore.go => metricstore.go} | 12 +- internal/api/nats_test.go | 9 +- internal/archiver/README.md | 4 +- internal/archiver/archiver.go | 4 +- internal/graph/schema.resolvers.go | 16 +- internal/graph/util.go | 6 +- internal/metricDataDispatcher/dataLoader.go | 381 ----- internal/metricdata/cc-metric-store.go | 1226 ----------------- internal/metricdata/metricdata.go | 88 -- internal/metricdata/prometheus.go | 587 -------- internal/metricdata/utils.go | 118 -- internal/metricdispatch/dataLoader.go | 490 +++++++ internal/metricdispatch/dataLoader_test.go | 125 ++ internal/{memorystore => metricstore}/api.go | 6 +- .../{memorystore => metricstore}/archive.go | 2 +- .../avroCheckpoint.go | 2 +- .../avroHelper.go | 2 +- .../avroStruct.go | 2 +- .../{memorystore => metricstore}/buffer.go | 2 +- .../checkpoint.go | 2 +- .../{memorystore => metricstore}/config.go | 4 +- .../configSchema.go | 2 +- .../{memorystore => metricstore}/debug.go | 2 +- .../healthcheck.go | 2 +- .../{memorystore => metricstore}/level.go | 2 +- .../lineprotocol.go | 2 +- .../memorystore.go | 4 +- .../memorystore_test.go | 2 +- .../query.go} | 146 +- .../{memorystore => metricstore}/stats.go | 2 +- internal/metricsync/metricdata.go | 60 + internal/repository/stats.go | 4 +- .../taskmanager/updateFootprintService.go | 10 +- 39 files changed, 815 insertions(+), 2578 deletions(-) rename internal/api/{memorystore.go => metricstore.go} (95%) delete mode 100644 internal/metricDataDispatcher/dataLoader.go delete mode 100644 internal/metricdata/cc-metric-store.go delete mode 100644 internal/metricdata/metricdata.go delete mode 100644 internal/metricdata/prometheus.go delete mode 100644 internal/metricdata/utils.go create mode 100644 internal/metricdispatch/dataLoader.go create mode 100644 internal/metricdispatch/dataLoader_test.go rename internal/{memorystore => metricstore}/api.go (98%) rename internal/{memorystore => metricstore}/archive.go (99%) rename internal/{memorystore => metricstore}/avroCheckpoint.go (99%) rename internal/{memorystore => metricstore}/avroHelper.go (99%) rename internal/{memorystore => metricstore}/avroStruct.go (99%) rename internal/{memorystore => metricstore}/buffer.go (99%) rename internal/{memorystore => metricstore}/checkpoint.go (99%) rename internal/{memorystore => metricstore}/config.go (98%) rename internal/{memorystore => metricstore}/configSchema.go (99%) rename internal/{memorystore => metricstore}/debug.go (99%) rename internal/{memorystore => metricstore}/healthcheck.go (99%) rename internal/{memorystore => metricstore}/level.go (99%) rename internal/{memorystore => metricstore}/lineprotocol.go (99%) rename internal/{memorystore => metricstore}/memorystore.go (99%) rename internal/{memorystore => metricstore}/memorystore_test.go (99%) rename internal/{metricdata/cc-metric-store-internal.go => metricstore/query.go} (87%) rename internal/{memorystore => metricstore}/stats.go (99%) create mode 100644 internal/metricsync/metricdata.go diff --git a/CLAUDE.md b/CLAUDE.md index 67412a76..f30c3923 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -96,9 +96,9 @@ The backend follows a layered architecture with clear separation of concerns: - **internal/auth**: Authentication layer - Supports local accounts, LDAP, OIDC, and JWT tokens - Implements rate limiting for login attempts -- **internal/metricdata**: Metric data repository abstraction - - Pluggable backends: cc-metric-store, Prometheus, InfluxDB - - Each cluster can have a different metric data backend +- **internal/metricstore**: Metric store with data loading API + - In-memory metric storage with checkpointing + - Query API for loading job metric data - **internal/archiver**: Job archiving to file-based archive - **internal/api/nats.go**: NATS-based API for job and node operations - Subscribes to NATS subjects for job events (start/stop) @@ -209,8 +209,8 @@ applied automatically on startup. Version tracking in `version` table. ### Adding a new metric data backend -1. Implement `MetricDataRepository` interface in `internal/metricdata/` -2. Register in `metricdata.Init()` switch statement +1. Implement metric loading functions in `internal/metricstore/query.go` +2. Add cluster configuration to metric store initialization 3. Update config.json schema documentation ### Modifying database schema diff --git a/README.md b/README.md index 468a12ad..00bcb119 100644 --- a/README.md +++ b/README.md @@ -163,11 +163,9 @@ ln -s ./var/job-archive GraphQL schema and resolvers - [`importer`](https://github.com/ClusterCockpit/cc-backend/tree/master/internal/importer) Job data import and database initialization - - [`memorystore`](https://github.com/ClusterCockpit/cc-backend/tree/master/internal/memorystore) - In-memory metric data store with checkpointing - - [`metricdata`](https://github.com/ClusterCockpit/cc-backend/tree/master/internal/metricdata) - Metric data repository implementations (cc-metric-store, Prometheus) - - [`metricDataDispatcher`](https://github.com/ClusterCockpit/cc-backend/tree/master/internal/metricDataDispatcher) + - [`metricstore`](https://github.com/ClusterCockpit/cc-backend/tree/master/internal/metricstore) + In-memory metric data store with checkpointing and metric loading + - [`metricdispatch`](https://github.com/ClusterCockpit/cc-backend/tree/master/internal/metricdispatch) Dispatches metric data loading to appropriate backends - [`repository`](https://github.com/ClusterCockpit/cc-backend/tree/master/internal/repository) Database repository layer for jobs and metadata diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index f8b4aea1..331df4f6 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -24,8 +24,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/importer" - "github.com/ClusterCockpit/cc-backend/internal/memorystore" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/metricstore" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/tagger" "github.com/ClusterCockpit/cc-backend/internal/taskmanager" @@ -283,10 +282,7 @@ func initSubsystems() error { return fmt.Errorf("initializing archive: %w", err) } - // Initialize metricdata - if err := metricdata.Init(); err != nil { - return fmt.Errorf("initializing metricdata repository: %w", err) - } + // Note: metricstore.Init() is called later in runServer() with proper configuration // Handle database re-initialization if flagReinitDB { @@ -322,13 +318,12 @@ func initSubsystems() error { func runServer(ctx context.Context) error { var wg sync.WaitGroup - // Start metric store if enabled - if memorystore.InternalCCMSFlag { - mscfg := ccconf.GetPackageConfig("metric-store") - if mscfg == nil { - return fmt.Errorf("metric store configuration must be present") - } - memorystore.Init(mscfg, &wg) + // Initialize metric store if configuration is provided + mscfg := ccconf.GetPackageConfig("metric-store") + if mscfg != nil { + metricstore.Init(mscfg, &wg) + } else { + cclog.Debug("Metric store configuration not found, skipping metricstore initialization") } // Start archiver and task manager diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index 53e24c88..8d700823 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -29,7 +29,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph/generated" - "github.com/ClusterCockpit/cc-backend/internal/memorystore" + "github.com/ClusterCockpit/cc-backend/internal/metricstore" "github.com/ClusterCockpit/cc-backend/internal/routerConfig" "github.com/ClusterCockpit/cc-backend/pkg/nats" "github.com/ClusterCockpit/cc-backend/web" @@ -253,9 +253,7 @@ func (s *Server) init() error { } } - if memorystore.InternalCCMSFlag { - s.restAPIHandle.MountMetricStoreAPIRoutes(metricstoreapi) - } + s.restAPIHandle.MountMetricStoreAPIRoutes(metricstoreapi) if config.Keys.EmbedStaticFiles { if i, err := os.Stat("./var/img"); err == nil { @@ -383,9 +381,7 @@ func (s *Server) Shutdown(ctx context.Context) { } // Archive all the metric store data - if memorystore.InternalCCMSFlag { - memorystore.Shutdown() - } + metricstore.Shutdown() // Shutdown archiver with 10 second timeout for fast shutdown if err := archiver.Shutdown(10 * time.Second); err != nil { diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 50605f7b..a2283013 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -23,8 +23,8 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" - "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatch" + "github.com/ClusterCockpit/cc-backend/internal/metricstore" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/archive" ccconf "github.com/ClusterCockpit/cc-lib/v2/ccConfig" @@ -173,9 +173,7 @@ func setup(t *testing.T) *api.RestAPI { t.Fatal(err) } - if err := metricdata.Init(); err != nil { - t.Fatal(err) - } + // metricstore initialization removed - it's initialized via callback in tests archiver.Start(repository.GetJobRepository(), context.Background()) @@ -221,7 +219,7 @@ func TestRestApi(t *testing.T) { }, } - metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) { + metricstore.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) { return testData, nil } @@ -366,7 +364,7 @@ func TestRestApi(t *testing.T) { } t.Run("CheckArchive", func(t *testing.T) { - data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60) + data, err := metricdispatch.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60) if err != nil { t.Fatal(err) } diff --git a/internal/api/job.go b/internal/api/job.go index 9b970c2e..09f7b22c 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -22,7 +22,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/importer" - "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatch" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" @@ -293,7 +293,7 @@ func (api *RestAPI) getCompleteJobByID(rw http.ResponseWriter, r *http.Request) } if r.URL.Query().Get("all-metrics") == "true" { - data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context(), resolution) + data, err = metricdispatch.LoadData(job, nil, scopes, r.Context(), resolution) if err != nil { cclog.Warnf("REST: error while loading all-metrics job data for JobID %d on %s", job.JobID, job.Cluster) return @@ -389,7 +389,7 @@ func (api *RestAPI) getJobByID(rw http.ResponseWriter, r *http.Request) { resolution = max(resolution, mc.Timestep) } - data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context(), resolution) + data, err := metricdispatch.LoadData(job, metrics, scopes, r.Context(), resolution) if err != nil { cclog.Warnf("REST: error while loading job data for JobID %d on %s", job.JobID, job.Cluster) return diff --git a/internal/api/memorystore.go b/internal/api/metricstore.go similarity index 95% rename from internal/api/memorystore.go rename to internal/api/metricstore.go index 56c396e2..d4ab1dfe 100644 --- a/internal/api/memorystore.go +++ b/internal/api/metricstore.go @@ -15,7 +15,7 @@ import ( "strconv" "strings" - "github.com/ClusterCockpit/cc-backend/internal/memorystore" + "github.com/ClusterCockpit/cc-backend/internal/metricstore" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/influxdata/line-protocol/v2/lineprotocol" @@ -58,7 +58,7 @@ func freeMetrics(rw http.ResponseWriter, r *http.Request) { return } - ms := memorystore.GetMemoryStore() + ms := metricstore.GetMemoryStore() n := 0 for _, sel := range selectors { bn, err := ms.Free(sel, to) @@ -97,9 +97,9 @@ func writeMetrics(rw http.ResponseWriter, r *http.Request) { return } - ms := memorystore.GetMemoryStore() + ms := metricstore.GetMemoryStore() dec := lineprotocol.NewDecoderWithBytes(bytes) - if err := memorystore.DecodeLine(dec, ms, r.URL.Query().Get("cluster")); err != nil { + if err := metricstore.DecodeLine(dec, ms, r.URL.Query().Get("cluster")); err != nil { cclog.Errorf("/api/write error: %s", err.Error()) handleError(err, http.StatusBadRequest, rw) return @@ -129,7 +129,7 @@ func debugMetrics(rw http.ResponseWriter, r *http.Request) { selector = strings.Split(raw, ":") } - ms := memorystore.GetMemoryStore() + ms := metricstore.GetMemoryStore() if err := ms.DebugDump(bufio.NewWriter(rw), selector); err != nil { handleError(err, http.StatusBadRequest, rw) return @@ -162,7 +162,7 @@ func metricsHealth(rw http.ResponseWriter, r *http.Request) { selector := []string{rawCluster, rawNode} - ms := memorystore.GetMemoryStore() + ms := metricstore.GetMemoryStore() if err := ms.HealthCheck(bufio.NewWriter(rw), selector); err != nil { handleError(err, http.StatusBadRequest, rw) return diff --git a/internal/api/nats_test.go b/internal/api/nats_test.go index c9415afc..9e1fa2b5 100644 --- a/internal/api/nats_test.go +++ b/internal/api/nats_test.go @@ -18,7 +18,8 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/importer" + "github.com/ClusterCockpit/cc-backend/internal/metricstore" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/archive" ccconf "github.com/ClusterCockpit/cc-lib/v2/ccConfig" @@ -167,9 +168,7 @@ func setupNatsTest(t *testing.T) *NatsAPI { t.Fatal(err) } - if err := metricdata.Init(); err != nil { - t.Fatal(err) - } + // metricstore initialization removed - it's initialized via callback in tests archiver.Start(repository.GetJobRepository(), context.Background()) @@ -564,7 +563,7 @@ func TestNatsHandleStopJob(t *testing.T) { }, } - metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) { + metricstore.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) { return testData, nil } diff --git a/internal/archiver/README.md b/internal/archiver/README.md index 0fae04ea..48aed797 100644 --- a/internal/archiver/README.md +++ b/internal/archiver/README.md @@ -106,7 +106,7 @@ Data is archived at the highest available resolution (typically 60s intervals). ```go // In archiver.go ArchiveJob() function -jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 300) +jobData, err := metricdispatch.LoadData(job, allMetrics, scopes, ctx, 300) // 0 = highest resolution // 300 = 5-minute resolution ``` @@ -185,6 +185,6 @@ Internal state is protected by: ## Dependencies - `internal/repository`: Database operations for job metadata -- `internal/metricDataDispatcher`: Loading metric data from various backends +- `internal/metricdispatch`: Loading metric data from various backends - `pkg/archive`: Archive backend abstraction (filesystem, S3, SQLite) - `cc-lib/schema`: Job and metric data structures diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 46ce8126..4e0b6473 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -10,7 +10,7 @@ import ( "math" "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatch" "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" @@ -60,7 +60,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.Job, error) { scopes = append(scopes, schema.MetricScopeAccelerator) } - jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 0) // 0 Resulotion-Value retrieves highest res (60s) + jobData, err := metricdispatch.LoadData(job, allMetrics, scopes, ctx, 0) // 0 Resulotion-Value retrieves highest res (60s) if err != nil { cclog.Error("Error wile loading job data for archiving") return nil, err diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 32499b8c..34bbf393 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -19,7 +19,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/generated" "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatch" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" @@ -484,7 +484,7 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str return nil, err } - data, err := metricDataDispatcher.LoadData(job, metrics, scopes, ctx, *resolution) + data, err := metricdispatch.LoadData(job, metrics, scopes, ctx, *resolution) if err != nil { cclog.Warn("Error while loading job data") return nil, err @@ -512,7 +512,7 @@ func (r *queryResolver) JobStats(ctx context.Context, id string, metrics []strin return nil, err } - data, err := metricDataDispatcher.LoadJobStats(job, metrics, ctx) + data, err := metricdispatch.LoadJobStats(job, metrics, ctx) if err != nil { cclog.Warnf("Error while loading jobStats data for job id %s", id) return nil, err @@ -537,7 +537,7 @@ func (r *queryResolver) ScopedJobStats(ctx context.Context, id string, metrics [ return nil, err } - data, err := metricDataDispatcher.LoadScopedJobStats(job, metrics, scopes, ctx) + data, err := metricdispatch.LoadScopedJobStats(job, metrics, scopes, ctx) if err != nil { cclog.Warnf("Error while loading scopedJobStats data for job id %s", id) return nil, err @@ -702,7 +702,7 @@ func (r *queryResolver) JobsMetricStats(ctx context.Context, filter []*model.Job res := []*model.JobStats{} for _, job := range jobs { - data, err := metricDataDispatcher.LoadJobStats(job, metrics, ctx) + data, err := metricdispatch.LoadJobStats(job, metrics, ctx) if err != nil { cclog.Warnf("Error while loading comparison jobStats data for job id %d", job.JobID) continue @@ -759,7 +759,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes [ } } - data, err := metricDataDispatcher.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) + data, err := metricdispatch.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) if err != nil { cclog.Warn("error while loading node data") return nil, err @@ -825,7 +825,7 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub } } - data, err := metricDataDispatcher.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, *resolution, from, to, ctx) + data, err := metricdispatch.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, *resolution, from, to, ctx) if err != nil { cclog.Warn("error while loading node data (Resolver.NodeMetricsList") return nil, err @@ -880,7 +880,7 @@ func (r *queryResolver) ClusterMetrics(ctx context.Context, cluster string, metr // 'nodes' == nil -> Defaults to all nodes of cluster for existing query workflow scopes := []schema.MetricScope{"node"} - data, err := metricDataDispatcher.LoadNodeData(cluster, metrics, nil, scopes, from, to, ctx) + data, err := metricdispatch.LoadNodeData(cluster, metrics, nil, scopes, from, to, ctx) if err != nil { cclog.Warn("error while loading node data") return nil, err diff --git a/internal/graph/util.go b/internal/graph/util.go index 42a1d2fb..4135ca72 100644 --- a/internal/graph/util.go +++ b/internal/graph/util.go @@ -13,7 +13,7 @@ import ( "github.com/99designs/gqlgen/graphql" "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatch" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" ) @@ -55,7 +55,7 @@ func (r *queryResolver) rooflineHeatmap( // resolution = max(resolution, mc.Timestep) // } - jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0) + jobdata, err := metricdispatch.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0) if err != nil { cclog.Errorf("Error while loading roofline metrics for job %d", job.ID) return nil, err @@ -128,7 +128,7 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF continue } - if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil { + if err := metricdispatch.LoadAverages(job, metrics, avgs, ctx); err != nil { cclog.Error("Error while loading averages for footprint") return nil, err } diff --git a/internal/metricDataDispatcher/dataLoader.go b/internal/metricDataDispatcher/dataLoader.go deleted file mode 100644 index 6d1338fa..00000000 --- a/internal/metricDataDispatcher/dataLoader.go +++ /dev/null @@ -1,381 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. This file is part of cc-backend. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package metricDataDispatcher - -import ( - "context" - "fmt" - "math" - "time" - - "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" - "github.com/ClusterCockpit/cc-backend/pkg/archive" - cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" - "github.com/ClusterCockpit/cc-lib/v2/lrucache" - "github.com/ClusterCockpit/cc-lib/v2/resampler" - "github.com/ClusterCockpit/cc-lib/v2/schema" -) - -var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) - -func cacheKey( - job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - resolution int, -) string { - // Duration and StartTime do not need to be in the cache key as StartTime is less unique than - // job.ID and the TTL of the cache entry makes sure it does not stay there forever. - return fmt.Sprintf("%d(%s):[%v],[%v]-%d", - job.ID, job.State, metrics, scopes, resolution) -} - -// Fetches the metric data for a job. -func LoadData(job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - ctx context.Context, - resolution int, -) (schema.JobData, error) { - data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ any, ttl time.Duration, size int) { - var jd schema.JobData - var err error - - if job.State == schema.JobStateRunning || - job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || - config.Keys.DisableArchive { - - repo, err := metricdata.GetMetricDataRepo(job.Cluster) - if err != nil { - return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0 - } - - if scopes == nil { - scopes = append(scopes, schema.MetricScopeNode) - } - - if metrics == nil { - cluster := archive.GetCluster(job.Cluster) - for _, mc := range cluster.MetricConfig { - metrics = append(metrics, mc.Name) - } - } - - jd, err = repo.LoadData(job, metrics, scopes, ctx, resolution) - if err != nil { - if len(jd) != 0 { - cclog.Warnf("partial error: %s", err.Error()) - // return err, 0, 0 // Reactivating will block archiving on one partial error - } else { - cclog.Error("Error while loading job data from metric repository") - return err, 0, 0 - } - } - size = jd.Size() - } else { - var jd_temp schema.JobData - jd_temp, err = archive.GetHandle().LoadJobData(job) - if err != nil { - cclog.Error("Error while loading job data from archive") - return err, 0, 0 - } - - // Deep copy the cached archive hashmap - jd = metricdata.DeepCopy(jd_temp) - - // Resampling for archived data. - // Pass the resolution from frontend here. - for _, v := range jd { - for _, v_ := range v { - timestep := int64(0) - for i := 0; i < len(v_.Series); i += 1 { - v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, int64(v_.Timestep), int64(resolution)) - if err != nil { - return err, 0, 0 - } - } - v_.Timestep = int(timestep) - } - } - - // Avoid sending unrequested data to the client: - if metrics != nil || scopes != nil { - if metrics == nil { - metrics = make([]string, 0, len(jd)) - for k := range jd { - metrics = append(metrics, k) - } - } - - res := schema.JobData{} - for _, metric := range metrics { - if perscope, ok := jd[metric]; ok { - if len(perscope) > 1 { - subset := make(map[schema.MetricScope]*schema.JobMetric) - for _, scope := range scopes { - if jm, ok := perscope[scope]; ok { - subset[scope] = jm - } - } - - if len(subset) > 0 { - perscope = subset - } - } - - res[metric] = perscope - } - } - jd = res - } - size = jd.Size() - } - - ttl = 5 * time.Hour - if job.State == schema.JobStateRunning { - ttl = 2 * time.Minute - } - - // FIXME: Review: Is this really necessary or correct. - // Note: Lines 147-170 formerly known as prepareJobData(jobData, scopes) - // For /monitoring/job/ and some other places, flops_any and mem_bw need - // to be available at the scope 'node'. If a job has a lot of nodes, - // statisticsSeries should be available so that a min/median/max Graph can be - // used instead of a lot of single lines. - // NOTE: New StatsSeries will always be calculated as 'min/median/max' - // Existing (archived) StatsSeries can be 'min/mean/max'! - const maxSeriesSize int = 15 - for _, scopes := range jd { - for _, jm := range scopes { - if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize { - continue - } - - jm.AddStatisticsSeries() - } - } - - nodeScopeRequested := false - for _, scope := range scopes { - if scope == schema.MetricScopeNode { - nodeScopeRequested = true - } - } - - if nodeScopeRequested { - jd.AddNodeScope("flops_any") - jd.AddNodeScope("mem_bw") - } - - // Round Resulting Stat Values - jd.RoundMetricStats() - - return jd, ttl, size - }) - - if err, ok := data.(error); ok { - cclog.Error("Error in returned dataset") - return nil, err - } - - return data.(schema.JobData), nil -} - -// Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize. -func LoadAverages( - job *schema.Job, - metrics []string, - data [][]schema.Float, - ctx context.Context, -) error { - if job.State != schema.JobStateRunning && !config.Keys.DisableArchive { - return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here? - } - - repo, err := metricdata.GetMetricDataRepo(job.Cluster) - if err != nil { - return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster) - } - - stats, err := repo.LoadStats(job, metrics, ctx) // #166 how to handle stats for acc normalizazion? - if err != nil { - cclog.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project) - return err - } - - for i, m := range metrics { - nodes, ok := stats[m] - if !ok { - data[i] = append(data[i], schema.NaN) - continue - } - - sum := 0.0 - for _, node := range nodes { - sum += node.Avg - } - data[i] = append(data[i], schema.Float(sum)) - } - - return nil -} - -// Used for statsTable in frontend: Return scoped statistics by metric. -func LoadScopedJobStats( - job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - ctx context.Context, -) (schema.ScopedJobStats, error) { - if job.State != schema.JobStateRunning && !config.Keys.DisableArchive { - return archive.LoadScopedStatsFromArchive(job, metrics, scopes) - } - - repo, err := metricdata.GetMetricDataRepo(job.Cluster) - if err != nil { - return nil, fmt.Errorf("job %d: no metric data repository configured for '%s'", job.JobID, job.Cluster) - } - - scopedStats, err := repo.LoadScopedStats(job, metrics, scopes, ctx) - if err != nil { - cclog.Errorf("error while loading scoped statistics for job %d (User %s, Project %s)", job.JobID, job.User, job.Project) - return nil, err - } - - return scopedStats, nil -} - -// Used for polar plots in frontend: Aggregates statistics for all nodes to single values for job per metric. -func LoadJobStats( - job *schema.Job, - metrics []string, - ctx context.Context, -) (map[string]schema.MetricStatistics, error) { - if job.State != schema.JobStateRunning && !config.Keys.DisableArchive { - return archive.LoadStatsFromArchive(job, metrics) - } - - data := make(map[string]schema.MetricStatistics, len(metrics)) - repo, err := metricdata.GetMetricDataRepo(job.Cluster) - if err != nil { - return data, fmt.Errorf("job %d: no metric data repository configured for '%s'", job.JobID, job.Cluster) - } - - stats, err := repo.LoadStats(job, metrics, ctx) - if err != nil { - cclog.Errorf("error while loading statistics for job %d (User %s, Project %s)", job.JobID, job.User, job.Project) - return data, err - } - - for _, m := range metrics { - sum, avg, min, max := 0.0, 0.0, 0.0, 0.0 - nodes, ok := stats[m] - if !ok { - data[m] = schema.MetricStatistics{Min: min, Avg: avg, Max: max} - continue - } - - for _, node := range nodes { - sum += node.Avg - min = math.Min(min, node.Min) - max = math.Max(max, node.Max) - } - - data[m] = schema.MetricStatistics{ - Avg: (math.Round((sum/float64(job.NumNodes))*100) / 100), - Min: (math.Round(min*100) / 100), - Max: (math.Round(max*100) / 100), - } - } - - return data, nil -} - -// Used for the classic node/system view. Returns a map of nodes to a map of metrics. -func LoadNodeData( - cluster string, - metrics, nodes []string, - scopes []schema.MetricScope, - from, to time.Time, - ctx context.Context, -) (map[string]map[string][]*schema.JobMetric, error) { - repo, err := metricdata.GetMetricDataRepo(cluster) - if err != nil { - return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) - } - - if metrics == nil { - for _, m := range archive.GetCluster(cluster).MetricConfig { - metrics = append(metrics, m.Name) - } - } - - data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) - if err != nil { - if len(data) != 0 { - cclog.Warnf("partial error: %s", err.Error()) - } else { - cclog.Error("Error while loading node data from metric repository") - return nil, err - } - } - - if data == nil { - return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster) - } - - return data, nil -} - -func LoadNodeListData( - cluster, subCluster string, - nodes []string, - metrics []string, - scopes []schema.MetricScope, - resolution int, - from, to time.Time, - ctx context.Context, -) (map[string]schema.JobData, error) { - repo, err := metricdata.GetMetricDataRepo(cluster) - if err != nil { - return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) - } - - if metrics == nil { - for _, m := range archive.GetCluster(cluster).MetricConfig { - metrics = append(metrics, m.Name) - } - } - - data, err := repo.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx) - if err != nil { - if len(data) != 0 { - cclog.Warnf("partial error: %s", err.Error()) - } else { - cclog.Error("Error while loading node data from metric repository") - return nil, err - } - } - - // NOTE: New StatsSeries will always be calculated as 'min/median/max' - const maxSeriesSize int = 8 - for _, jd := range data { - for _, scopes := range jd { - for _, jm := range scopes { - if jm.StatisticsSeries != nil || len(jm.Series) < maxSeriesSize { - continue - } - jm.AddStatisticsSeries() - } - } - } - - if data == nil { - return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster) - } - - return data, nil -} diff --git a/internal/metricdata/cc-metric-store.go b/internal/metricdata/cc-metric-store.go deleted file mode 100644 index 6c146f22..00000000 --- a/internal/metricdata/cc-metric-store.go +++ /dev/null @@ -1,1226 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package metricdata - -import ( - "bufio" - "bytes" - "context" - "encoding/json" - "fmt" - "net/http" - "strings" - "time" - - "github.com/ClusterCockpit/cc-backend/pkg/archive" - cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" - "github.com/ClusterCockpit/cc-lib/v2/schema" -) - -type CCMetricStoreConfig struct { - Kind string `json:"kind"` - Url string `json:"url"` - Token string `json:"token"` - - // If metrics are known to this MetricDataRepository under a different - // name than in the `metricConfig` section of the 'cluster.json', - // provide this optional mapping of local to remote name for this metric. - Renamings map[string]string `json:"metricRenamings"` -} - -type CCMetricStore struct { - here2there map[string]string - there2here map[string]string - client http.Client - jwt string - url string - queryEndpoint string -} - -type ApiQueryRequest struct { - Cluster string `json:"cluster"` - Queries []ApiQuery `json:"queries"` - ForAllNodes []string `json:"for-all-nodes"` - From int64 `json:"from"` - To int64 `json:"to"` - WithStats bool `json:"with-stats"` - WithData bool `json:"with-data"` -} - -type ApiQuery struct { - Type *string `json:"type,omitempty"` - SubType *string `json:"subtype,omitempty"` - Metric string `json:"metric"` - Hostname string `json:"host"` - Resolution int `json:"resolution"` - TypeIds []string `json:"type-ids,omitempty"` - SubTypeIds []string `json:"subtype-ids,omitempty"` - Aggregate bool `json:"aggreg"` -} - -type ApiQueryResponse struct { - Queries []ApiQuery `json:"queries,omitempty"` - Results [][]ApiMetricData `json:"results"` -} - -type ApiMetricData struct { - Error *string `json:"error"` - Data []schema.Float `json:"data"` - From int64 `json:"from"` - To int64 `json:"to"` - Resolution int `json:"resolution"` - Avg schema.Float `json:"avg"` - Min schema.Float `json:"min"` - Max schema.Float `json:"max"` -} - -func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error { - var config CCMetricStoreConfig - if err := json.Unmarshal(rawConfig, &config); err != nil { - cclog.Warn("Error while unmarshaling raw json config") - return err - } - - ccms.url = config.Url - ccms.queryEndpoint = fmt.Sprintf("%s/api/query", config.Url) - ccms.jwt = config.Token - ccms.client = http.Client{ - Timeout: 10 * time.Second, - } - - if config.Renamings != nil { - ccms.here2there = config.Renamings - ccms.there2here = make(map[string]string, len(config.Renamings)) - for k, v := range ccms.here2there { - ccms.there2here[v] = k - } - } else { - ccms.here2there = make(map[string]string) - ccms.there2here = make(map[string]string) - } - - return nil -} - -func (ccms *CCMetricStore) toRemoteName(metric string) string { - if renamed, ok := ccms.here2there[metric]; ok { - return renamed - } - - return metric -} - -func (ccms *CCMetricStore) toLocalName(metric string) string { - if renamed, ok := ccms.there2here[metric]; ok { - return renamed - } - - return metric -} - -func (ccms *CCMetricStore) doRequest( - ctx context.Context, - body *ApiQueryRequest, -) (*ApiQueryResponse, error) { - buf := &bytes.Buffer{} - if err := json.NewEncoder(buf).Encode(body); err != nil { - cclog.Errorf("Error while encoding request body: %s", err.Error()) - return nil, err - } - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, ccms.queryEndpoint, buf) - if err != nil { - cclog.Errorf("Error while building request body: %s", err.Error()) - return nil, err - } - if ccms.jwt != "" { - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt)) - } - - // versioning the cc-metric-store query API. - // v2 = data with resampling - // v1 = data without resampling - q := req.URL.Query() - q.Add("version", "v2") - req.URL.RawQuery = q.Encode() - - res, err := ccms.client.Do(req) - if err != nil { - cclog.Errorf("Error while performing request: %s", err.Error()) - return nil, err - } - - if res.StatusCode != http.StatusOK { - return nil, fmt.Errorf("'%s': HTTP Status: %s", ccms.queryEndpoint, res.Status) - } - - var resBody ApiQueryResponse - if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&resBody); err != nil { - cclog.Errorf("Error while decoding result body: %s", err.Error()) - return nil, err - } - - return &resBody, nil -} - -func (ccms *CCMetricStore) LoadData( - job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - ctx context.Context, - resolution int, -) (schema.JobData, error) { - queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, resolution) - if err != nil { - cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error()) - return nil, err - } - - req := ApiQueryRequest{ - Cluster: job.Cluster, - From: job.StartTime, - To: job.StartTime + int64(job.Duration), - Queries: queries, - WithStats: true, - WithData: true, - } - - resBody, err := ccms.doRequest(ctx, &req) - if err != nil { - cclog.Errorf("Error while performing request: %s", err.Error()) - return nil, err - } - - var errors []string - jobData := make(schema.JobData) - for i, row := range resBody.Results { - query := req.Queries[i] - metric := ccms.toLocalName(query.Metric) - scope := assignedScope[i] - mc := archive.GetMetricConfig(job.Cluster, metric) - if _, ok := jobData[metric]; !ok { - jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric) - } - - res := mc.Timestep - if len(row) > 0 { - res = row[0].Resolution - } - - jobMetric, ok := jobData[metric][scope] - if !ok { - jobMetric = &schema.JobMetric{ - Unit: mc.Unit, - Timestep: res, - Series: make([]schema.Series, 0), - } - jobData[metric][scope] = jobMetric - } - - for ndx, res := range row { - if res.Error != nil { - /* Build list for "partial errors", if any */ - errors = append(errors, fmt.Sprintf("failed to fetch '%s' from host '%s': %s", query.Metric, query.Hostname, *res.Error)) - continue - } - - id := (*string)(nil) - if query.Type != nil { - id = new(string) - *id = query.TypeIds[ndx] - } - - if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { - // "schema.Float()" because regular float64 can not be JSONed when NaN. - res.Avg = schema.Float(0) - res.Min = schema.Float(0) - res.Max = schema.Float(0) - } - - jobMetric.Series = append(jobMetric.Series, schema.Series{ - Hostname: query.Hostname, - Id: id, - Statistics: schema.MetricStatistics{ - Avg: float64(res.Avg), - Min: float64(res.Min), - Max: float64(res.Max), - }, - Data: res.Data, - }) - } - - // So that one can later check len(jobData): - if len(jobMetric.Series) == 0 { - delete(jobData[metric], scope) - if len(jobData[metric]) == 0 { - delete(jobData, metric) - } - } - } - - if len(errors) != 0 { - /* Returns list for "partial errors" */ - return jobData, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) - } - return jobData, nil -} - -func (ccms *CCMetricStore) buildQueries( - job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - resolution int, -) ([]ApiQuery, []schema.MetricScope, error) { - queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) - assignedScope := []schema.MetricScope{} - - subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster) - if scerr != nil { - return nil, nil, scerr - } - topology := subcluster.Topology - - for _, metric := range metrics { - remoteName := ccms.toRemoteName(metric) - mc := archive.GetMetricConfig(job.Cluster, metric) - if mc == nil { - // return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, job.Cluster) - cclog.Infof("metric '%s' is not specified for cluster '%s'", metric, job.Cluster) - continue - } - - // Skip if metric is removed for subcluster - if len(mc.SubClusters) != 0 { - isRemoved := false - for _, scConfig := range mc.SubClusters { - if scConfig.Name == job.SubCluster && scConfig.Remove { - isRemoved = true - break - } - } - if isRemoved { - continue - } - } - - // Avoid duplicates... - handledScopes := make([]schema.MetricScope, 0, 3) - - scopesLoop: - for _, requestedScope := range scopes { - nativeScope := mc.Scope - if nativeScope == schema.MetricScopeAccelerator && job.NumAcc == 0 { - continue - } - - scope := nativeScope.Max(requestedScope) - for _, s := range handledScopes { - if scope == s { - continue scopesLoop - } - } - handledScopes = append(handledScopes, scope) - - for _, host := range job.Resources { - hwthreads := host.HWThreads - if hwthreads == nil { - hwthreads = topology.Node - } - - // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) - if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { - if scope != schema.MetricScopeAccelerator { - // Skip all other catched cases - continue - } - - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &acceleratorString, - TypeIds: host.Accelerators, - Resolution: resolution, - }) - assignedScope = append(assignedScope, schema.MetricScopeAccelerator) - continue - } - - // Accelerator -> Node - if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode { - if len(host.Accelerators) == 0 { - continue - } - - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &acceleratorString, - TypeIds: host.Accelerators, - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // HWThread -> HWThead - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &hwthreadString, - TypeIds: intToStringSlice(hwthreads), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // HWThread -> Core - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { - cores, _ := topology.GetCoresFromHWThreads(hwthreads) - for _, core := range cores { - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Core[core]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // HWThread -> Socket - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - for _, socket := range sockets { - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Socket[socket]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // HWThread -> Node - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(hwthreads), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Core -> Core - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { - cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &coreString, - TypeIds: intToStringSlice(cores), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Core -> Socket - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromCores(hwthreads) - for _, socket := range sockets { - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(topology.Socket[socket]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // Core -> Node - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { - cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(cores), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // MemoryDomain -> MemoryDomain - if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { - sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // MemoryDoman -> Node - if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { - sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Socket -> Socket - if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &socketString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Socket -> Node - if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { - sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &socketString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Node -> Node - if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - return nil, nil, fmt.Errorf("METRICDATA/CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) - } - } - } - - return queries, assignedScope, nil -} - -func (ccms *CCMetricStore) LoadStats( - job *schema.Job, - metrics []string, - ctx context.Context, -) (map[string]map[string]schema.MetricStatistics, error) { - - queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization? - if err != nil { - cclog.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error()) - return nil, err - } - - req := ApiQueryRequest{ - Cluster: job.Cluster, - From: job.StartTime, - To: job.StartTime + int64(job.Duration), - Queries: queries, - WithStats: true, - WithData: false, - } - - resBody, err := ccms.doRequest(ctx, &req) - if err != nil { - cclog.Errorf("Error while performing request: %s", err.Error()) - return nil, err - } - - stats := make(map[string]map[string]schema.MetricStatistics, len(metrics)) - for i, res := range resBody.Results { - query := req.Queries[i] - metric := ccms.toLocalName(query.Metric) - data := res[0] - if data.Error != nil { - cclog.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error) - continue - } - - metricdata, ok := stats[metric] - if !ok { - metricdata = make(map[string]schema.MetricStatistics, job.NumNodes) - stats[metric] = metricdata - } - - if data.Avg.IsNaN() || data.Min.IsNaN() || data.Max.IsNaN() { - cclog.Warnf("fetching %s for node %s failed: one of avg/min/max is NaN", metric, query.Hostname) - continue - } - - metricdata[query.Hostname] = schema.MetricStatistics{ - Avg: float64(data.Avg), - Min: float64(data.Min), - Max: float64(data.Max), - } - } - - return stats, nil -} - -// Used for Job-View Statistics Table -func (ccms *CCMetricStore) LoadScopedStats( - job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - ctx context.Context, -) (schema.ScopedJobStats, error) { - queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, 0) - if err != nil { - cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error()) - return nil, err - } - - req := ApiQueryRequest{ - Cluster: job.Cluster, - From: job.StartTime, - To: job.StartTime + int64(job.Duration), - Queries: queries, - WithStats: true, - WithData: false, - } - - resBody, err := ccms.doRequest(ctx, &req) - if err != nil { - cclog.Errorf("Error while performing request: %s", err.Error()) - return nil, err - } - - var errors []string - scopedJobStats := make(schema.ScopedJobStats) - - for i, row := range resBody.Results { - query := req.Queries[i] - metric := ccms.toLocalName(query.Metric) - scope := assignedScope[i] - - if _, ok := scopedJobStats[metric]; !ok { - scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats) - } - - if _, ok := scopedJobStats[metric][scope]; !ok { - scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0) - } - - for ndx, res := range row { - if res.Error != nil { - /* Build list for "partial errors", if any */ - errors = append(errors, fmt.Sprintf("failed to fetch '%s' from host '%s': %s", query.Metric, query.Hostname, *res.Error)) - continue - } - - id := (*string)(nil) - if query.Type != nil { - id = new(string) - *id = query.TypeIds[ndx] - } - - if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { - // "schema.Float()" because regular float64 can not be JSONed when NaN. - res.Avg = schema.Float(0) - res.Min = schema.Float(0) - res.Max = schema.Float(0) - } - - scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ - Hostname: query.Hostname, - Id: id, - Data: &schema.MetricStatistics{ - Avg: float64(res.Avg), - Min: float64(res.Min), - Max: float64(res.Max), - }, - }) - } - - // So that one can later check len(scopedJobStats[metric][scope]): Remove from map if empty - if len(scopedJobStats[metric][scope]) == 0 { - delete(scopedJobStats[metric], scope) - if len(scopedJobStats[metric]) == 0 { - delete(scopedJobStats, metric) - } - } - } - - if len(errors) != 0 { - /* Returns list for "partial errors" */ - return scopedJobStats, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) - } - return scopedJobStats, nil -} - -// Used for Systems-View Node-Overview -func (ccms *CCMetricStore) LoadNodeData( - cluster string, - metrics, nodes []string, - scopes []schema.MetricScope, - from, to time.Time, - ctx context.Context, -) (map[string]map[string][]*schema.JobMetric, error) { - req := ApiQueryRequest{ - Cluster: cluster, - From: from.Unix(), - To: to.Unix(), - WithStats: true, - WithData: true, - } - - if nodes == nil { - for _, metric := range metrics { - req.ForAllNodes = append(req.ForAllNodes, ccms.toRemoteName(metric)) - } - } else { - for _, node := range nodes { - for _, metric := range metrics { - req.Queries = append(req.Queries, ApiQuery{ - Hostname: node, - Metric: ccms.toRemoteName(metric), - Resolution: 0, // Default for Node Queries: Will return metric $Timestep Resolution - }) - } - } - } - - resBody, err := ccms.doRequest(ctx, &req) - if err != nil { - cclog.Errorf("Error while performing request: %s", err.Error()) - return nil, err - } - - var errors []string - data := make(map[string]map[string][]*schema.JobMetric) - for i, res := range resBody.Results { - var query ApiQuery - if resBody.Queries != nil { - query = resBody.Queries[i] - } else { - query = req.Queries[i] - } - - metric := ccms.toLocalName(query.Metric) - qdata := res[0] - if qdata.Error != nil { - /* Build list for "partial errors", if any */ - errors = append(errors, fmt.Sprintf("fetching %s for node %s failed: %s", metric, query.Hostname, *qdata.Error)) - } - - if qdata.Avg.IsNaN() || qdata.Min.IsNaN() || qdata.Max.IsNaN() { - // return nil, fmt.Errorf("METRICDATA/CCMS > fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN") - qdata.Avg, qdata.Min, qdata.Max = 0., 0., 0. - } - - hostdata, ok := data[query.Hostname] - if !ok { - hostdata = make(map[string][]*schema.JobMetric) - data[query.Hostname] = hostdata - } - - mc := archive.GetMetricConfig(cluster, metric) - if mc != nil { - hostdata[metric] = append(hostdata[metric], &schema.JobMetric{ - Unit: mc.Unit, - Timestep: mc.Timestep, - Series: []schema.Series{ - { - Hostname: query.Hostname, - Data: qdata.Data, - Statistics: schema.MetricStatistics{ - Avg: float64(qdata.Avg), - Min: float64(qdata.Min), - Max: float64(qdata.Max), - }, - }, - }, - }) - } else { - cclog.Warnf("Metric '%s' not configured for cluster '%s': Skipped in LoadNodeData() Return!", metric, cluster) - } - } - - if len(errors) != 0 { - /* Returns list of "partial errors" */ - return data, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) - } - - return data, nil -} - -// Used for Systems-View Node-List -func (ccms *CCMetricStore) LoadNodeListData( - cluster, subCluster string, - nodes []string, - metrics []string, - scopes []schema.MetricScope, - resolution int, - from, to time.Time, - ctx context.Context, -) (map[string]schema.JobData, error) { - - // Note: Order of node data is not guaranteed after this point - queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, resolution) - if err != nil { - cclog.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error()) - return nil, err - } - - req := ApiQueryRequest{ - Cluster: cluster, - Queries: queries, - From: from.Unix(), - To: to.Unix(), - WithStats: true, - WithData: true, - } - - resBody, err := ccms.doRequest(ctx, &req) - if err != nil { - cclog.Errorf("Error while performing request: %s", err.Error()) - return nil, err - } - - var errors []string - data := make(map[string]schema.JobData) - for i, row := range resBody.Results { - var query ApiQuery - if resBody.Queries != nil { - query = resBody.Queries[i] - } else { - query = req.Queries[i] - } - // qdata := res[0] - metric := ccms.toLocalName(query.Metric) - scope := assignedScope[i] - mc := archive.GetMetricConfig(cluster, metric) - - res := mc.Timestep - if len(row) > 0 { - res = row[0].Resolution - } - - // Init Nested Map Data Structures If Not Found - hostData, ok := data[query.Hostname] - if !ok { - hostData = make(schema.JobData) - data[query.Hostname] = hostData - } - - metricData, ok := hostData[metric] - if !ok { - metricData = make(map[schema.MetricScope]*schema.JobMetric) - data[query.Hostname][metric] = metricData - } - - scopeData, ok := metricData[scope] - if !ok { - scopeData = &schema.JobMetric{ - Unit: mc.Unit, - Timestep: res, - Series: make([]schema.Series, 0), - } - data[query.Hostname][metric][scope] = scopeData - } - - for ndx, res := range row { - if res.Error != nil { - /* Build list for "partial errors", if any */ - errors = append(errors, fmt.Sprintf("failed to fetch '%s' from host '%s': %s", query.Metric, query.Hostname, *res.Error)) - continue - } - - id := (*string)(nil) - if query.Type != nil { - id = new(string) - *id = query.TypeIds[ndx] - } - - if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { - // "schema.Float()" because regular float64 can not be JSONed when NaN. - res.Avg = schema.Float(0) - res.Min = schema.Float(0) - res.Max = schema.Float(0) - } - - scopeData.Series = append(scopeData.Series, schema.Series{ - Hostname: query.Hostname, - Id: id, - Statistics: schema.MetricStatistics{ - Avg: float64(res.Avg), - Min: float64(res.Min), - Max: float64(res.Max), - }, - Data: res.Data, - }) - } - } - - if len(errors) != 0 { - /* Returns list of "partial errors" */ - return data, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) - } - - return data, nil -} - -func (ccms *CCMetricStore) buildNodeQueries( - cluster string, - subCluster string, - nodes []string, - metrics []string, - scopes []schema.MetricScope, - resolution int, -) ([]ApiQuery, []schema.MetricScope, error) { - - queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(nodes)) - assignedScope := []schema.MetricScope{} - - // Get Topol before loop if subCluster given - var subClusterTopol *schema.SubCluster - var scterr error - if subCluster != "" { - subClusterTopol, scterr = archive.GetSubCluster(cluster, subCluster) - if scterr != nil { - cclog.Errorf("could not load cluster %s subCluster %s topology: %s", cluster, subCluster, scterr.Error()) - return nil, nil, scterr - } - } - - for _, metric := range metrics { - remoteName := ccms.toRemoteName(metric) - mc := archive.GetMetricConfig(cluster, metric) - if mc == nil { - // return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, cluster) - cclog.Warnf("metric '%s' is not specified for cluster '%s'", metric, cluster) - continue - } - - // Skip if metric is removed for subcluster - if mc.SubClusters != nil { - isRemoved := false - for _, scConfig := range mc.SubClusters { - if scConfig.Name == subCluster && scConfig.Remove { - isRemoved = true - break - } - } - if isRemoved { - continue - } - } - - // Avoid duplicates... - handledScopes := make([]schema.MetricScope, 0, 3) - - scopesLoop: - for _, requestedScope := range scopes { - nativeScope := mc.Scope - - scope := nativeScope.Max(requestedScope) - for _, s := range handledScopes { - if scope == s { - continue scopesLoop - } - } - handledScopes = append(handledScopes, scope) - - for _, hostname := range nodes { - - // If no subCluster given, get it by node - if subCluster == "" { - subClusterName, scnerr := archive.GetSubClusterByNode(cluster, hostname) - if scnerr != nil { - return nil, nil, scnerr - } - subClusterTopol, scterr = archive.GetSubCluster(cluster, subClusterName) - if scterr != nil { - return nil, nil, scterr - } - } - - // Always full node hwthread id list, no partial queries expected -> Use "topology.Node" directly where applicable - // Always full accelerator id list, no partial queries expected -> Use "acceleratorIds" directly where applicable - topology := subClusterTopol.Topology - acceleratorIds := topology.GetAcceleratorIDs() - - // Moved check here if metric matches hardware specs - if nativeScope == schema.MetricScopeAccelerator && len(acceleratorIds) == 0 { - continue scopesLoop - } - - // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) - if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { - if scope != schema.MetricScopeAccelerator { - // Skip all other catched cases - continue - } - - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: false, - Type: &acceleratorString, - TypeIds: acceleratorIds, - Resolution: resolution, - }) - assignedScope = append(assignedScope, schema.MetricScopeAccelerator) - continue - } - - // Accelerator -> Node - if nativeScope == schema.MetricScopeAccelerator && scope == schema.MetricScopeNode { - if len(acceleratorIds) == 0 { - continue - } - - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: true, - Type: &acceleratorString, - TypeIds: acceleratorIds, - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // HWThread -> HWThead - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: false, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Node), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // HWThread -> Core - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { - cores, _ := topology.GetCoresFromHWThreads(topology.Node) - for _, core := range cores { - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Core[core]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // HWThread -> Socket - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - for _, socket := range sockets { - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Socket[socket]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // HWThread -> Node - if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Node), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Core -> Core - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { - cores, _ := topology.GetCoresFromHWThreads(topology.Node) - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: false, - Type: &coreString, - TypeIds: intToStringSlice(cores), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Core -> Socket - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromCores(topology.Node) - for _, socket := range sockets { - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(topology.Socket[socket]), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - } - continue - } - - // Core -> Node - if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { - cores, _ := topology.GetCoresFromHWThreads(topology.Node) - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(cores), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // MemoryDomain -> MemoryDomain - if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { - sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: false, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // MemoryDoman -> Node - if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { - sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: true, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Socket -> Socket - if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { - sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: false, - Type: &socketString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Socket -> Node - if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { - sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: hostname, - Aggregate: true, - Type: &socketString, - TypeIds: intToStringSlice(sockets), - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - // Node -> Node - if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: hostname, - Resolution: resolution, - }) - assignedScope = append(assignedScope, scope) - continue - } - - return nil, nil, fmt.Errorf("METRICDATA/CCMS > TODO: unhandled case: native-scope=%s, requested-scope=%s", nativeScope, requestedScope) - } - } - } - - return queries, assignedScope, nil -} diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go deleted file mode 100644 index ab0e19fb..00000000 --- a/internal/metricdata/metricdata.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. This file is part of cc-backend. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package metricdata - -import ( - "context" - "encoding/json" - "fmt" - "time" - - "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/internal/memorystore" - cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" - "github.com/ClusterCockpit/cc-lib/v2/schema" -) - -type MetricDataRepository interface { - // Initialize this MetricDataRepository. One instance of - // this interface will only ever be responsible for one cluster. - Init(rawConfig json.RawMessage) error - - // Return the JobData for the given job, only with the requested metrics. - LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) - - // Return a map of metrics to a map of nodes to the metric statistics of the job. node scope only. - LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) - - // Return a map of metrics to a map of scopes to the scoped metric statistics of the job. - LoadScopedStats(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.ScopedJobStats, error) - - // Return a map of hosts to a map of metrics at the requested scopes (currently only node) for that node. - LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) - - // Return a map of hosts to a map of metrics to a map of scopes for multiple nodes. - LoadNodeListData(cluster, subCluster string, nodes, metrics []string, scopes []schema.MetricScope, resolution int, from, to time.Time, ctx context.Context) (map[string]schema.JobData, error) -} - -var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} - -func Init() error { - for _, cluster := range config.Clusters { - if cluster.MetricDataRepository != nil { - var kind struct { - Kind string `json:"kind"` - } - if err := json.Unmarshal(cluster.MetricDataRepository, &kind); err != nil { - cclog.Warn("Error while unmarshaling raw json MetricDataRepository") - return err - } - - var mdr MetricDataRepository - switch kind.Kind { - case "cc-metric-store": - mdr = &CCMetricStore{} - case "cc-metric-store-internal": - mdr = &CCMetricStoreInternal{} - memorystore.InternalCCMSFlag = true - case "prometheus": - mdr = &PrometheusDataRepository{} - case "test": - mdr = &TestMetricDataRepository{} - default: - return fmt.Errorf("METRICDATA/METRICDATA > Unknown MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name) - } - - if err := mdr.Init(cluster.MetricDataRepository); err != nil { - cclog.Errorf("Error initializing MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name) - return err - } - metricDataRepos[cluster.Name] = mdr - } - } - return nil -} - -func GetMetricDataRepo(cluster string) (MetricDataRepository, error) { - var err error - repo, ok := metricDataRepos[cluster] - - if !ok { - err = fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) - } - - return repo, err -} diff --git a/internal/metricdata/prometheus.go b/internal/metricdata/prometheus.go deleted file mode 100644 index 3fb94d51..00000000 --- a/internal/metricdata/prometheus.go +++ /dev/null @@ -1,587 +0,0 @@ -// Copyright (C) 2022 DKRZ -// All rights reserved. This file is part of cc-backend. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package metricdata - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "math" - "net/http" - "os" - "regexp" - "sort" - "strings" - "sync" - "text/template" - "time" - - "github.com/ClusterCockpit/cc-backend/pkg/archive" - cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" - "github.com/ClusterCockpit/cc-lib/v2/schema" - promapi "github.com/prometheus/client_golang/api" - promv1 "github.com/prometheus/client_golang/api/prometheus/v1" - promcfg "github.com/prometheus/common/config" - promm "github.com/prometheus/common/model" -) - -type PrometheusDataRepositoryConfig struct { - Url string `json:"url"` - Username string `json:"username,omitempty"` - Suffix string `json:"suffix,omitempty"` - Templates map[string]string `json:"query-templates"` -} - -type PrometheusDataRepository struct { - client promapi.Client - queryClient promv1.API - suffix string - templates map[string]*template.Template -} - -type PromQLArgs struct { - Nodes string -} - -type Trie map[rune]Trie - -var logOnce sync.Once - -func contains(s []schema.MetricScope, str schema.MetricScope) bool { - for _, v := range s { - if v == str { - return true - } - } - return false -} - -func MinMaxMean(data []schema.Float) (float64, float64, float64) { - if len(data) == 0 { - return 0.0, 0.0, 0.0 - } - min := math.MaxFloat64 - max := -math.MaxFloat64 - var sum float64 - var n float64 - for _, val := range data { - if val.IsNaN() { - continue - } - sum += float64(val) - n += 1 - if float64(val) > max { - max = float64(val) - } - if float64(val) < min { - min = float64(val) - } - } - return min, max, sum / n -} - -// Rewritten from -// https://github.com/ermanh/trieregex/blob/master/trieregex/trieregex.py -func nodeRegex(nodes []string) string { - root := Trie{} - // add runes of each compute node to trie - for _, node := range nodes { - _trie := root - for _, c := range node { - if _, ok := _trie[c]; !ok { - _trie[c] = Trie{} - } - _trie = _trie[c] - } - _trie['*'] = Trie{} - } - // recursively build regex from rune trie - var trieRegex func(trie Trie, reset bool) string - trieRegex = func(trie Trie, reset bool) string { - if reset == true { - trie = root - } - if len(trie) == 0 { - return "" - } - if len(trie) == 1 { - for key, _trie := range trie { - if key == '*' { - return "" - } - return regexp.QuoteMeta(string(key)) + trieRegex(_trie, false) - } - } else { - sequences := []string{} - for key, _trie := range trie { - if key != '*' { - sequences = append(sequences, regexp.QuoteMeta(string(key))+trieRegex(_trie, false)) - } - } - sort.Slice(sequences, func(i, j int) bool { - return (-len(sequences[i]) < -len(sequences[j])) || (sequences[i] < sequences[j]) - }) - var result string - // single edge from this tree node - if len(sequences) == 1 { - result = sequences[0] - if len(result) > 1 { - result = "(?:" + result + ")" - } - // multiple edges, each length 1 - } else if s := strings.Join(sequences, ""); len(s) == len(sequences) { - // char or numeric range - if len(s)-1 == int(s[len(s)-1])-int(s[0]) { - result = fmt.Sprintf("[%c-%c]", s[0], s[len(s)-1]) - // char or numeric set - } else { - result = "[" + s + "]" - } - // multiple edges of different lengths - } else { - result = "(?:" + strings.Join(sequences, "|") + ")" - } - if _, ok := trie['*']; ok { - result += "?" - } - return result - } - return "" - } - return trieRegex(root, true) -} - -func (pdb *PrometheusDataRepository) Init(rawConfig json.RawMessage) error { - var config PrometheusDataRepositoryConfig - // parse config - if err := json.Unmarshal(rawConfig, &config); err != nil { - cclog.Warn("Error while unmarshaling raw json config") - return err - } - // support basic authentication - var rt http.RoundTripper = nil - if prom_pw := os.Getenv("PROMETHEUS_PASSWORD"); prom_pw != "" && config.Username != "" { - prom_pw := promcfg.Secret(prom_pw) - rt = promcfg.NewBasicAuthRoundTripper(promcfg.NewInlineSecret(config.Username), promcfg.NewInlineSecret(string(prom_pw)), promapi.DefaultRoundTripper) - } else { - if config.Username != "" { - return errors.New("METRICDATA/PROMETHEUS > Prometheus username provided, but PROMETHEUS_PASSWORD not set") - } - } - // init client - client, err := promapi.NewClient(promapi.Config{ - Address: config.Url, - RoundTripper: rt, - }) - if err != nil { - cclog.Error("Error while initializing new prometheus client") - return err - } - // init query client - pdb.client = client - pdb.queryClient = promv1.NewAPI(pdb.client) - // site config - pdb.suffix = config.Suffix - // init query templates - pdb.templates = make(map[string]*template.Template) - for metric, templ := range config.Templates { - pdb.templates[metric], err = template.New(metric).Parse(templ) - if err == nil { - cclog.Debugf("Added PromQL template for %s: %s", metric, templ) - } else { - cclog.Warnf("Failed to parse PromQL template %s for metric %s", templ, metric) - } - } - return nil -} - -// TODO: respect scope argument -func (pdb *PrometheusDataRepository) FormatQuery( - metric string, - scope schema.MetricScope, - nodes []string, - cluster string, -) (string, error) { - args := PromQLArgs{} - if len(nodes) > 0 { - args.Nodes = fmt.Sprintf("(%s)%s", nodeRegex(nodes), pdb.suffix) - } else { - args.Nodes = fmt.Sprintf(".*%s", pdb.suffix) - } - - buf := &bytes.Buffer{} - if templ, ok := pdb.templates[metric]; ok { - err := templ.Execute(buf, args) - if err != nil { - return "", errors.New(fmt.Sprintf("METRICDATA/PROMETHEUS > Error compiling template %v", templ)) - } else { - query := buf.String() - cclog.Debugf("PromQL: %s", query) - return query, nil - } - } else { - return "", errors.New(fmt.Sprintf("METRICDATA/PROMETHEUS > No PromQL for metric %s configured.", metric)) - } -} - -// Convert PromAPI row to CC schema.Series -func (pdb *PrometheusDataRepository) RowToSeries( - from time.Time, - step int64, - steps int64, - row *promm.SampleStream, -) schema.Series { - ts := from.Unix() - hostname := strings.TrimSuffix(string(row.Metric["exported_instance"]), pdb.suffix) - // init array of expected length with NaN - values := make([]schema.Float, steps+1) - for i := range values { - values[i] = schema.NaN - } - // copy recorded values from prom sample pair - for _, v := range row.Values { - idx := (v.Timestamp.Unix() - ts) / step - values[idx] = schema.Float(v.Value) - } - min, max, mean := MinMaxMean(values) - // output struct - return schema.Series{ - Hostname: hostname, - Data: values, - Statistics: schema.MetricStatistics{ - Avg: mean, - Min: min, - Max: max, - }, - } -} - -func (pdb *PrometheusDataRepository) LoadData( - job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - ctx context.Context, - resolution int, -) (schema.JobData, error) { - // TODO respect requested scope - if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) { - scopes = append(scopes, schema.MetricScopeNode) - } - - jobData := make(schema.JobData) - // parse job specs - nodes := make([]string, len(job.Resources)) - for i, resource := range job.Resources { - nodes[i] = resource.Hostname - } - from := time.Unix(job.StartTime, 0) - to := time.Unix(job.StartTime+int64(job.Duration), 0) - - for _, scope := range scopes { - if scope != schema.MetricScopeNode { - logOnce.Do(func() { - cclog.Infof("Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope) - }) - continue - } - - for _, metric := range metrics { - metricConfig := archive.GetMetricConfig(job.Cluster, metric) - if metricConfig == nil { - cclog.Warnf("Error in LoadData: Metric %s for cluster %s not configured", metric, job.Cluster) - return nil, errors.New("Prometheus config error") - } - query, err := pdb.FormatQuery(metric, scope, nodes, job.Cluster) - if err != nil { - cclog.Warn("Error while formatting prometheus query") - return nil, err - } - - // ranged query over all job nodes - r := promv1.Range{ - Start: from, - End: to, - Step: time.Duration(metricConfig.Timestep * 1e9), - } - result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r) - if err != nil { - cclog.Errorf("Prometheus query error in LoadData: %v\nQuery: %s", err, query) - return nil, errors.New("Prometheus query error") - } - if len(warnings) > 0 { - cclog.Warnf("Warnings: %v\n", warnings) - } - - // init data structures - if _, ok := jobData[metric]; !ok { - jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric) - } - jobMetric, ok := jobData[metric][scope] - if !ok { - jobMetric = &schema.JobMetric{ - Unit: metricConfig.Unit, - Timestep: metricConfig.Timestep, - Series: make([]schema.Series, 0), - } - } - step := int64(metricConfig.Timestep) - steps := int64(to.Sub(from).Seconds()) / step - // iter rows of host, metric, values - for _, row := range result.(promm.Matrix) { - jobMetric.Series = append(jobMetric.Series, - pdb.RowToSeries(from, step, steps, row)) - } - // only add metric if at least one host returned data - if !ok && len(jobMetric.Series) > 0 { - jobData[metric][scope] = jobMetric - } - // sort by hostname to get uniform coloring - sort.Slice(jobMetric.Series, func(i, j int) bool { - return (jobMetric.Series[i].Hostname < jobMetric.Series[j].Hostname) - }) - } - } - return jobData, nil -} - -// TODO change implementation to precomputed/cached stats -func (pdb *PrometheusDataRepository) LoadStats( - job *schema.Job, - metrics []string, - ctx context.Context, -) (map[string]map[string]schema.MetricStatistics, error) { - // map of metrics of nodes of stats - stats := map[string]map[string]schema.MetricStatistics{} - - data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/) - if err != nil { - cclog.Warn("Error while loading job for stats") - return nil, err - } - for metric, metricData := range data { - stats[metric] = make(map[string]schema.MetricStatistics) - for _, series := range metricData[schema.MetricScopeNode].Series { - stats[metric][series.Hostname] = series.Statistics - } - } - - return stats, nil -} - -func (pdb *PrometheusDataRepository) LoadNodeData( - cluster string, - metrics, nodes []string, - scopes []schema.MetricScope, - from, to time.Time, - ctx context.Context, -) (map[string]map[string][]*schema.JobMetric, error) { - t0 := time.Now() - // Map of hosts of metrics of value slices - data := make(map[string]map[string][]*schema.JobMetric) - // query db for each metric - // TODO: scopes seems to be always empty - if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) { - scopes = append(scopes, schema.MetricScopeNode) - } - for _, scope := range scopes { - if scope != schema.MetricScopeNode { - logOnce.Do(func() { - cclog.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope) - }) - continue - } - for _, metric := range metrics { - metricConfig := archive.GetMetricConfig(cluster, metric) - if metricConfig == nil { - cclog.Warnf("Error in LoadNodeData: Metric %s for cluster %s not configured", metric, cluster) - return nil, errors.New("Prometheus config error") - } - query, err := pdb.FormatQuery(metric, scope, nodes, cluster) - if err != nil { - cclog.Warn("Error while formatting prometheus query") - return nil, err - } - - // ranged query over all nodes - r := promv1.Range{ - Start: from, - End: to, - Step: time.Duration(metricConfig.Timestep * 1e9), - } - result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r) - if err != nil { - cclog.Errorf("Prometheus query error in LoadNodeData: %v\n", err) - return nil, errors.New("Prometheus query error") - } - if len(warnings) > 0 { - cclog.Warnf("Warnings: %v\n", warnings) - } - - step := int64(metricConfig.Timestep) - steps := int64(to.Sub(from).Seconds()) / step - - // iter rows of host, metric, values - for _, row := range result.(promm.Matrix) { - hostname := strings.TrimSuffix(string(row.Metric["exported_instance"]), pdb.suffix) - hostdata, ok := data[hostname] - if !ok { - hostdata = make(map[string][]*schema.JobMetric) - data[hostname] = hostdata - } - // output per host and metric - hostdata[metric] = append(hostdata[metric], &schema.JobMetric{ - Unit: metricConfig.Unit, - Timestep: metricConfig.Timestep, - Series: []schema.Series{pdb.RowToSeries(from, step, steps, row)}, - }, - ) - } - } - } - t1 := time.Since(t0) - cclog.Debugf("LoadNodeData of %v nodes took %s", len(data), t1) - return data, nil -} - -// Implemented by NHR@FAU; Used in Job-View StatsTable -func (pdb *PrometheusDataRepository) LoadScopedStats( - job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - ctx context.Context, -) (schema.ScopedJobStats, error) { - // Assumption: pdb.loadData() only returns series node-scope - use node scope for statsTable - scopedJobStats := make(schema.ScopedJobStats) - data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/) - if err != nil { - cclog.Warn("Error while loading job for scopedJobStats") - return nil, err - } - - for metric, metricData := range data { - for _, scope := range scopes { - if scope != schema.MetricScopeNode { - logOnce.Do(func() { - cclog.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope) - }) - continue - } - - if _, ok := scopedJobStats[metric]; !ok { - scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats) - } - - if _, ok := scopedJobStats[metric][scope]; !ok { - scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0) - } - - for _, series := range metricData[scope].Series { - scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ - Hostname: series.Hostname, - Data: &series.Statistics, - }) - } - } - } - - return scopedJobStats, nil -} - -// Implemented by NHR@FAU; Used in NodeList-View -func (pdb *PrometheusDataRepository) LoadNodeListData( - cluster, subCluster string, - nodes []string, - metrics []string, - scopes []schema.MetricScope, - resolution int, - from, to time.Time, - ctx context.Context, -) (map[string]schema.JobData, error) { - // Assumption: pdb.loadData() only returns series node-scope - use node scope for NodeList - - // Fetch Data, based on pdb.LoadNodeData() - t0 := time.Now() - // Map of hosts of jobData - data := make(map[string]schema.JobData) - - // query db for each metric - // TODO: scopes seems to be always empty - if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) { - scopes = append(scopes, schema.MetricScopeNode) - } - - for _, scope := range scopes { - if scope != schema.MetricScopeNode { - logOnce.Do(func() { - cclog.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope) - }) - continue - } - - for _, metric := range metrics { - metricConfig := archive.GetMetricConfig(cluster, metric) - if metricConfig == nil { - cclog.Warnf("Error in LoadNodeListData: Metric %s for cluster %s not configured", metric, cluster) - return nil, errors.New("Prometheus config error") - } - query, err := pdb.FormatQuery(metric, scope, nodes, cluster) - if err != nil { - cclog.Warn("Error while formatting prometheus query") - return nil, err - } - - // ranged query over all nodes - r := promv1.Range{ - Start: from, - End: to, - Step: time.Duration(metricConfig.Timestep * 1e9), - } - result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r) - if err != nil { - cclog.Errorf("Prometheus query error in LoadNodeData: %v\n", err) - return nil, errors.New("Prometheus query error") - } - if len(warnings) > 0 { - cclog.Warnf("Warnings: %v\n", warnings) - } - - step := int64(metricConfig.Timestep) - steps := int64(to.Sub(from).Seconds()) / step - - // iter rows of host, metric, values - for _, row := range result.(promm.Matrix) { - hostname := strings.TrimSuffix(string(row.Metric["exported_instance"]), pdb.suffix) - - hostdata, ok := data[hostname] - if !ok { - hostdata = make(schema.JobData) - data[hostname] = hostdata - } - - metricdata, ok := hostdata[metric] - if !ok { - metricdata = make(map[schema.MetricScope]*schema.JobMetric) - data[hostname][metric] = metricdata - } - - // output per host, metric and scope - scopeData, ok := metricdata[scope] - if !ok { - scopeData = &schema.JobMetric{ - Unit: metricConfig.Unit, - Timestep: metricConfig.Timestep, - Series: []schema.Series{pdb.RowToSeries(from, step, steps, row)}, - } - data[hostname][metric][scope] = scopeData - } - } - } - } - t1 := time.Since(t0) - cclog.Debugf("LoadNodeListData of %v nodes took %s", len(data), t1) - return data, nil -} diff --git a/internal/metricdata/utils.go b/internal/metricdata/utils.go deleted file mode 100644 index 21dfbcac..00000000 --- a/internal/metricdata/utils.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. This file is part of cc-backend. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package metricdata - -import ( - "context" - "encoding/json" - "time" - - "github.com/ClusterCockpit/cc-lib/v2/schema" -) - -var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) { - panic("TODO") -} - -// TestMetricDataRepository is only a mock for unit-testing. -type TestMetricDataRepository struct{} - -func (tmdr *TestMetricDataRepository) Init(_ json.RawMessage) error { - return nil -} - -func (tmdr *TestMetricDataRepository) LoadData( - job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - ctx context.Context, - resolution int, -) (schema.JobData, error) { - return TestLoadDataCallback(job, metrics, scopes, ctx, resolution) -} - -func (tmdr *TestMetricDataRepository) LoadStats( - job *schema.Job, - metrics []string, - ctx context.Context, -) (map[string]map[string]schema.MetricStatistics, error) { - panic("TODO") -} - -func (tmdr *TestMetricDataRepository) LoadScopedStats( - job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - ctx context.Context, -) (schema.ScopedJobStats, error) { - panic("TODO") -} - -func (tmdr *TestMetricDataRepository) LoadNodeData( - cluster string, - metrics, nodes []string, - scopes []schema.MetricScope, - from, to time.Time, - ctx context.Context, -) (map[string]map[string][]*schema.JobMetric, error) { - panic("TODO") -} - -func (tmdr *TestMetricDataRepository) LoadNodeListData( - cluster, subCluster string, - nodes []string, - metrics []string, - scopes []schema.MetricScope, - resolution int, - from, to time.Time, - ctx context.Context, -) (map[string]schema.JobData, error) { - panic("TODO") -} - -func DeepCopy(jdTemp schema.JobData) schema.JobData { - jd := make(schema.JobData, len(jdTemp)) - for k, v := range jdTemp { - jd[k] = make(map[schema.MetricScope]*schema.JobMetric, len(jdTemp[k])) - for k_, v_ := range v { - jd[k][k_] = new(schema.JobMetric) - jd[k][k_].Series = make([]schema.Series, len(v_.Series)) - for i := 0; i < len(v_.Series); i += 1 { - jd[k][k_].Series[i].Data = make([]schema.Float, len(v_.Series[i].Data)) - copy(jd[k][k_].Series[i].Data, v_.Series[i].Data) - jd[k][k_].Series[i].Hostname = v_.Series[i].Hostname - jd[k][k_].Series[i].Id = v_.Series[i].Id - jd[k][k_].Series[i].Statistics.Avg = v_.Series[i].Statistics.Avg - jd[k][k_].Series[i].Statistics.Min = v_.Series[i].Statistics.Min - jd[k][k_].Series[i].Statistics.Max = v_.Series[i].Statistics.Max - } - jd[k][k_].Timestep = v_.Timestep - jd[k][k_].Unit.Base = v_.Unit.Base - jd[k][k_].Unit.Prefix = v_.Unit.Prefix - if v_.StatisticsSeries != nil { - // Init Slices - jd[k][k_].StatisticsSeries = new(schema.StatsSeries) - jd[k][k_].StatisticsSeries.Max = make([]schema.Float, len(v_.StatisticsSeries.Max)) - jd[k][k_].StatisticsSeries.Min = make([]schema.Float, len(v_.StatisticsSeries.Min)) - jd[k][k_].StatisticsSeries.Median = make([]schema.Float, len(v_.StatisticsSeries.Median)) - jd[k][k_].StatisticsSeries.Mean = make([]schema.Float, len(v_.StatisticsSeries.Mean)) - // Copy Data - copy(jd[k][k_].StatisticsSeries.Max, v_.StatisticsSeries.Max) - copy(jd[k][k_].StatisticsSeries.Min, v_.StatisticsSeries.Min) - copy(jd[k][k_].StatisticsSeries.Median, v_.StatisticsSeries.Median) - copy(jd[k][k_].StatisticsSeries.Mean, v_.StatisticsSeries.Mean) - // Handle Percentiles - for k__, v__ := range v_.StatisticsSeries.Percentiles { - jd[k][k_].StatisticsSeries.Percentiles[k__] = make([]schema.Float, len(v__)) - copy(jd[k][k_].StatisticsSeries.Percentiles[k__], v__) - } - } else { - jd[k][k_].StatisticsSeries = v_.StatisticsSeries - } - } - } - return jd -} diff --git a/internal/metricdispatch/dataLoader.go b/internal/metricdispatch/dataLoader.go new file mode 100644 index 00000000..8bfebbd6 --- /dev/null +++ b/internal/metricdispatch/dataLoader.go @@ -0,0 +1,490 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +// Package metricdispatch provides a unified interface for loading and caching job metric data. +// +// This package serves as a central dispatcher that routes metric data requests to the appropriate +// backend based on job state. For running jobs, data is fetched from the metric store (e.g., cc-metric-store). +// For completed jobs, data is retrieved from the file-based job archive. +// +// # Key Features +// +// - Automatic backend selection based on job state (running vs. archived) +// - LRU cache for performance optimization (128 MB default cache size) +// - Data resampling using Largest Triangle Three Bucket algorithm for archived data +// - Automatic statistics series generation for jobs with many nodes +// - Support for scoped metrics (node, socket, accelerator, core) +// +// # Cache Behavior +// +// Cached data has different TTL (time-to-live) values depending on job state: +// - Running jobs: 2 minutes (data changes frequently) +// - Completed jobs: 5 hours (data is static) +// +// The cache key is based on job ID, state, requested metrics, scopes, and resolution. +// +// # Usage +// +// The primary entry point is LoadData, which automatically handles both running and archived jobs: +// +// jobData, err := metricdispatch.LoadData(job, metrics, scopes, ctx, resolution) +// if err != nil { +// // Handle error +// } +// +// For statistics only, use LoadJobStats, LoadScopedJobStats, or LoadAverages depending on the required format. +package metricdispatch + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/metricstore" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/lrucache" + "github.com/ClusterCockpit/cc-lib/v2/resampler" + "github.com/ClusterCockpit/cc-lib/v2/schema" +) + +// cache is an LRU cache with 128 MB capacity for storing loaded job metric data. +// The cache reduces load on both the metric store and archive backends. +var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) + +// cacheKey generates a unique cache key for a job's metric data based on job ID, state, +// requested metrics, scopes, and resolution. Duration and StartTime are intentionally excluded +// because job.ID is more unique and the cache TTL ensures entries don't persist indefinitely. +func cacheKey( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + resolution int, +) string { + return fmt.Sprintf("%d(%s):[%v],[%v]-%d", + job.ID, job.State, metrics, scopes, resolution) +} + +// LoadData retrieves metric data for a job from the appropriate backend (memory store for running jobs, +// archive for completed jobs) and applies caching, resampling, and statistics generation as needed. +// +// For running jobs or when archive is disabled, data is fetched from the metric store. +// For completed archived jobs, data is loaded from the job archive and resampled if needed. +// +// Parameters: +// - job: The job for which to load metric data +// - metrics: List of metric names to load (nil loads all metrics for the cluster) +// - scopes: Metric scopes to include (nil defaults to node scope) +// - ctx: Context for cancellation and timeouts +// - resolution: Target number of data points for resampling (only applies to archived data) +// +// Returns the loaded job data and any error encountered. For partial errors (some metrics failed), +// the function returns the successfully loaded data with a warning logged. +func LoadData(job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context, + resolution int, +) (schema.JobData, error) { + data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ any, ttl time.Duration, size int) { + var jd schema.JobData + var err error + + if job.State == schema.JobStateRunning || + job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || + config.Keys.DisableArchive { + + if scopes == nil { + scopes = append(scopes, schema.MetricScopeNode) + } + + if metrics == nil { + cluster := archive.GetCluster(job.Cluster) + for _, mc := range cluster.MetricConfig { + metrics = append(metrics, mc.Name) + } + } + + jd, err = metricstore.LoadData(job, metrics, scopes, ctx, resolution) + if err != nil { + if len(jd) != 0 { + cclog.Warnf("partial error loading metrics from store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + } else { + cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return err, 0, 0 + } + } + size = jd.Size() + } else { + var jdTemp schema.JobData + jdTemp, err = archive.GetHandle().LoadJobData(job) + if err != nil { + cclog.Errorf("failed to load job data from archive for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return err, 0, 0 + } + + jd = deepCopy(jdTemp) + + // Resample archived data using Largest Triangle Three Bucket algorithm to reduce data points + // to the requested resolution, improving transfer performance and client-side rendering. + for _, v := range jd { + for _, v_ := range v { + timestep := int64(0) + for i := 0; i < len(v_.Series); i += 1 { + v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, int64(v_.Timestep), int64(resolution)) + if err != nil { + return err, 0, 0 + } + } + v_.Timestep = int(timestep) + } + } + + // Filter job data to only include requested metrics and scopes, avoiding unnecessary data transfer. + if metrics != nil || scopes != nil { + if metrics == nil { + metrics = make([]string, 0, len(jd)) + for k := range jd { + metrics = append(metrics, k) + } + } + + res := schema.JobData{} + for _, metric := range metrics { + if perscope, ok := jd[metric]; ok { + if len(perscope) > 1 { + subset := make(map[schema.MetricScope]*schema.JobMetric) + for _, scope := range scopes { + if jm, ok := perscope[scope]; ok { + subset[scope] = jm + } + } + + if len(subset) > 0 { + perscope = subset + } + } + + res[metric] = perscope + } + } + jd = res + } + size = jd.Size() + } + + ttl = 5 * time.Hour + if job.State == schema.JobStateRunning { + ttl = 2 * time.Minute + } + + // Generate statistics series for jobs with many nodes to enable min/median/max graphs + // instead of overwhelming the UI with individual node lines. Note that newly calculated + // statistics use min/median/max, while archived statistics may use min/mean/max. + const maxSeriesSize int = 15 + for _, scopes := range jd { + for _, jm := range scopes { + if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize { + continue + } + + jm.AddStatisticsSeries() + } + } + + nodeScopeRequested := false + for _, scope := range scopes { + if scope == schema.MetricScopeNode { + nodeScopeRequested = true + } + } + + if nodeScopeRequested { + jd.AddNodeScope("flops_any") + jd.AddNodeScope("mem_bw") + } + + // Round Resulting Stat Values + jd.RoundMetricStats() + + return jd, ttl, size + }) + + if err, ok := data.(error); ok { + cclog.Errorf("error in cached dataset for job %d: %s", job.JobID, err.Error()) + return nil, err + } + + return data.(schema.JobData), nil +} + +// LoadAverages computes average values for the specified metrics across all nodes of a job. +// For running jobs, it loads statistics from the metric store. For completed jobs, it uses +// the pre-calculated averages from the job archive. The results are appended to the data slice. +func LoadAverages( + job *schema.Job, + metrics []string, + data [][]schema.Float, + ctx context.Context, +) error { + if job.State != schema.JobStateRunning && !config.Keys.DisableArchive { + return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here? + } + + stats, err := metricstore.LoadStats(job, metrics, ctx) + if err != nil { + cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return err + } + + for i, m := range metrics { + nodes, ok := stats[m] + if !ok { + data[i] = append(data[i], schema.NaN) + continue + } + + sum := 0.0 + for _, node := range nodes { + sum += node.Avg + } + data[i] = append(data[i], schema.Float(sum)) + } + + return nil +} + +// LoadScopedJobStats retrieves job statistics organized by metric scope (node, socket, core, accelerator). +// For running jobs, statistics are computed from the metric store. For completed jobs, pre-calculated +// statistics are loaded from the job archive. +func LoadScopedJobStats( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context, +) (schema.ScopedJobStats, error) { + if job.State != schema.JobStateRunning && !config.Keys.DisableArchive { + return archive.LoadScopedStatsFromArchive(job, metrics, scopes) + } + + scopedStats, err := metricstore.LoadScopedStats(job, metrics, scopes, ctx) + if err != nil { + cclog.Errorf("failed to load scoped statistics from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return nil, err + } + + return scopedStats, nil +} + +// LoadJobStats retrieves aggregated statistics (min/avg/max) for each requested metric across all job nodes. +// For running jobs, statistics are computed from the metric store. For completed jobs, pre-calculated +// statistics are loaded from the job archive. +func LoadJobStats( + job *schema.Job, + metrics []string, + ctx context.Context, +) (map[string]schema.MetricStatistics, error) { + if job.State != schema.JobStateRunning && !config.Keys.DisableArchive { + return archive.LoadStatsFromArchive(job, metrics) + } + + data := make(map[string]schema.MetricStatistics, len(metrics)) + + stats, err := metricstore.LoadStats(job, metrics, ctx) + if err != nil { + cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return data, err + } + + for _, m := range metrics { + sum, avg, min, max := 0.0, 0.0, 0.0, 0.0 + nodes, ok := stats[m] + if !ok { + data[m] = schema.MetricStatistics{Min: min, Avg: avg, Max: max} + continue + } + + for _, node := range nodes { + sum += node.Avg + min = math.Min(min, node.Min) + max = math.Max(max, node.Max) + } + + data[m] = schema.MetricStatistics{ + Avg: (math.Round((sum/float64(job.NumNodes))*100) / 100), + Min: (math.Round(min*100) / 100), + Max: (math.Round(max*100) / 100), + } + } + + return data, nil +} + +// LoadNodeData retrieves metric data for specific nodes in a cluster within a time range. +// This is used for node monitoring views and system status pages. Data is always fetched from +// the metric store (not the archive) since it's for current/recent node status monitoring. +// +// Returns a nested map structure: node -> metric -> scoped data. +func LoadNodeData( + cluster string, + metrics, nodes []string, + scopes []schema.MetricScope, + from, to time.Time, + ctx context.Context, +) (map[string]map[string][]*schema.JobMetric, error) { + if metrics == nil { + for _, m := range archive.GetCluster(cluster).MetricConfig { + metrics = append(metrics, m.Name) + } + } + + data, err := metricstore.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) + if err != nil { + if len(data) != 0 { + cclog.Warnf("partial error loading node data from metric store for cluster %s: %s", cluster, err.Error()) + } else { + cclog.Errorf("failed to load node data from metric store for cluster %s: %s", cluster, err.Error()) + return nil, err + } + } + + if data == nil { + return nil, fmt.Errorf("metric store for cluster '%s' does not support node data queries", cluster) + } + + return data, nil +} + +// LoadNodeListData retrieves time-series metric data for multiple nodes within a time range, +// with optional resampling and automatic statistics generation for large datasets. +// This is used for comparing multiple nodes or displaying node status over time. +// +// Returns a map of node names to their job-like metric data structures. +func LoadNodeListData( + cluster, subCluster string, + nodes []string, + metrics []string, + scopes []schema.MetricScope, + resolution int, + from, to time.Time, + ctx context.Context, +) (map[string]schema.JobData, error) { + if metrics == nil { + for _, m := range archive.GetCluster(cluster).MetricConfig { + metrics = append(metrics, m.Name) + } + } + + data, err := metricstore.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx) + if err != nil { + if len(data) != 0 { + cclog.Warnf("partial error loading node list data from metric store for cluster %s, subcluster %s: %s", + cluster, subCluster, err.Error()) + } else { + cclog.Errorf("failed to load node list data from metric store for cluster %s, subcluster %s: %s", + cluster, subCluster, err.Error()) + return nil, err + } + } + + // Generate statistics series for datasets with many series to improve visualization performance. + // Statistics are calculated as min/median/max. + const maxSeriesSize int = 8 + for _, jd := range data { + for _, scopes := range jd { + for _, jm := range scopes { + if jm.StatisticsSeries != nil || len(jm.Series) < maxSeriesSize { + continue + } + jm.AddStatisticsSeries() + } + } + } + + if data == nil { + return nil, fmt.Errorf("metric store for cluster '%s' does not support node list queries", cluster) + } + + return data, nil +} + +// deepCopy creates a deep copy of JobData to prevent cache corruption when modifying +// archived data (e.g., during resampling). This ensures the cached archive data remains +// immutable while allowing per-request transformations. +func deepCopy(source schema.JobData) schema.JobData { + result := make(schema.JobData, len(source)) + + for metricName, scopeMap := range source { + result[metricName] = make(map[schema.MetricScope]*schema.JobMetric, len(scopeMap)) + + for scope, jobMetric := range scopeMap { + result[metricName][scope] = copyJobMetric(jobMetric) + } + } + + return result +} + +func copyJobMetric(src *schema.JobMetric) *schema.JobMetric { + dst := &schema.JobMetric{ + Timestep: src.Timestep, + Unit: src.Unit, + Series: make([]schema.Series, len(src.Series)), + } + + for i := range src.Series { + dst.Series[i] = copySeries(&src.Series[i]) + } + + if src.StatisticsSeries != nil { + dst.StatisticsSeries = copyStatisticsSeries(src.StatisticsSeries) + } + + return dst +} + +func copySeries(src *schema.Series) schema.Series { + dst := schema.Series{ + Hostname: src.Hostname, + Id: src.Id, + Statistics: src.Statistics, + Data: make([]schema.Float, len(src.Data)), + } + + copy(dst.Data, src.Data) + return dst +} + +func copyStatisticsSeries(src *schema.StatsSeries) *schema.StatsSeries { + dst := &schema.StatsSeries{ + Min: make([]schema.Float, len(src.Min)), + Mean: make([]schema.Float, len(src.Mean)), + Median: make([]schema.Float, len(src.Median)), + Max: make([]schema.Float, len(src.Max)), + } + + copy(dst.Min, src.Min) + copy(dst.Mean, src.Mean) + copy(dst.Median, src.Median) + copy(dst.Max, src.Max) + + if len(src.Percentiles) > 0 { + dst.Percentiles = make(map[int][]schema.Float, len(src.Percentiles)) + for percentile, values := range src.Percentiles { + dst.Percentiles[percentile] = make([]schema.Float, len(values)) + copy(dst.Percentiles[percentile], values) + } + } + + return dst +} diff --git a/internal/metricdispatch/dataLoader_test.go b/internal/metricdispatch/dataLoader_test.go new file mode 100644 index 00000000..c4841f8d --- /dev/null +++ b/internal/metricdispatch/dataLoader_test.go @@ -0,0 +1,125 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package metricdispatch + +import ( + "testing" + + "github.com/ClusterCockpit/cc-lib/v2/schema" +) + +func TestDeepCopy(t *testing.T) { + nodeId := "0" + original := schema.JobData{ + "cpu_load": { + schema.MetricScopeNode: &schema.JobMetric{ + Timestep: 60, + Unit: schema.Unit{Base: "load", Prefix: ""}, + Series: []schema.Series{ + { + Hostname: "node001", + Id: &nodeId, + Data: []schema.Float{1.0, 2.0, 3.0}, + Statistics: schema.MetricStatistics{ + Min: 1.0, + Avg: 2.0, + Max: 3.0, + }, + }, + }, + StatisticsSeries: &schema.StatsSeries{ + Min: []schema.Float{1.0, 1.5, 2.0}, + Mean: []schema.Float{2.0, 2.5, 3.0}, + Median: []schema.Float{2.0, 2.5, 3.0}, + Max: []schema.Float{3.0, 3.5, 4.0}, + Percentiles: map[int][]schema.Float{ + 25: {1.5, 2.0, 2.5}, + 75: {2.5, 3.0, 3.5}, + }, + }, + }, + }, + } + + copied := deepCopy(original) + + original["cpu_load"][schema.MetricScopeNode].Series[0].Data[0] = 999.0 + original["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Min[0] = 888.0 + original["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Percentiles[25][0] = 777.0 + + if copied["cpu_load"][schema.MetricScopeNode].Series[0].Data[0] != 1.0 { + t.Errorf("Series data was not deeply copied: got %v, want 1.0", + copied["cpu_load"][schema.MetricScopeNode].Series[0].Data[0]) + } + + if copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Min[0] != 1.0 { + t.Errorf("StatisticsSeries was not deeply copied: got %v, want 1.0", + copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Min[0]) + } + + if copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Percentiles[25][0] != 1.5 { + t.Errorf("Percentiles was not deeply copied: got %v, want 1.5", + copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Percentiles[25][0]) + } + + if copied["cpu_load"][schema.MetricScopeNode].Timestep != 60 { + t.Errorf("Timestep not copied correctly: got %v, want 60", + copied["cpu_load"][schema.MetricScopeNode].Timestep) + } + + if copied["cpu_load"][schema.MetricScopeNode].Series[0].Hostname != "node001" { + t.Errorf("Hostname not copied correctly: got %v, want node001", + copied["cpu_load"][schema.MetricScopeNode].Series[0].Hostname) + } +} + +func TestDeepCopyNilStatisticsSeries(t *testing.T) { + original := schema.JobData{ + "mem_used": { + schema.MetricScopeNode: &schema.JobMetric{ + Timestep: 60, + Series: []schema.Series{ + { + Hostname: "node001", + Data: []schema.Float{1.0, 2.0}, + }, + }, + StatisticsSeries: nil, + }, + }, + } + + copied := deepCopy(original) + + if copied["mem_used"][schema.MetricScopeNode].StatisticsSeries != nil { + t.Errorf("StatisticsSeries should be nil, got %v", + copied["mem_used"][schema.MetricScopeNode].StatisticsSeries) + } +} + +func TestDeepCopyEmptyPercentiles(t *testing.T) { + original := schema.JobData{ + "cpu_load": { + schema.MetricScopeNode: &schema.JobMetric{ + Timestep: 60, + Series: []schema.Series{}, + StatisticsSeries: &schema.StatsSeries{ + Min: []schema.Float{1.0}, + Mean: []schema.Float{2.0}, + Median: []schema.Float{2.0}, + Max: []schema.Float{3.0}, + Percentiles: nil, + }, + }, + }, + } + + copied := deepCopy(original) + + if copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Percentiles != nil { + t.Errorf("Percentiles should be nil when source is nil/empty") + } +} diff --git a/internal/memorystore/api.go b/internal/metricstore/api.go similarity index 98% rename from internal/memorystore/api.go rename to internal/metricstore/api.go index 41c53a18..d8a2ea82 100644 --- a/internal/memorystore/api.go +++ b/internal/metricstore/api.go @@ -3,10 +3,11 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package memorystore +package metricstore import ( "errors" + "fmt" "math" "github.com/ClusterCockpit/cc-lib/v2/schema" @@ -124,6 +125,9 @@ func FetchData(req APIQueryRequest) (*APIQueryResponse, error) { req.WithData = true ms := GetMemoryStore() + if ms == nil { + return nil, fmt.Errorf("memorystore not initialized") + } response := APIQueryResponse{ Results: make([][]APIMetricData, 0, len(req.Queries)), diff --git a/internal/memorystore/archive.go b/internal/metricstore/archive.go similarity index 99% rename from internal/memorystore/archive.go rename to internal/metricstore/archive.go index fc46dac6..972769fd 100644 --- a/internal/memorystore/archive.go +++ b/internal/metricstore/archive.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package memorystore +package metricstore import ( "archive/zip" diff --git a/internal/memorystore/avroCheckpoint.go b/internal/metricstore/avroCheckpoint.go similarity index 99% rename from internal/memorystore/avroCheckpoint.go rename to internal/metricstore/avroCheckpoint.go index b0b0cf42..275a64bd 100644 --- a/internal/memorystore/avroCheckpoint.go +++ b/internal/metricstore/avroCheckpoint.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package memorystore +package metricstore import ( "bufio" diff --git a/internal/memorystore/avroHelper.go b/internal/metricstore/avroHelper.go similarity index 99% rename from internal/memorystore/avroHelper.go rename to internal/metricstore/avroHelper.go index 93a293bd..5587a58d 100644 --- a/internal/memorystore/avroHelper.go +++ b/internal/metricstore/avroHelper.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package memorystore +package metricstore import ( "context" diff --git a/internal/memorystore/avroStruct.go b/internal/metricstore/avroStruct.go similarity index 99% rename from internal/memorystore/avroStruct.go rename to internal/metricstore/avroStruct.go index 2643a9a7..78a8d137 100644 --- a/internal/memorystore/avroStruct.go +++ b/internal/metricstore/avroStruct.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package memorystore +package metricstore import ( "sync" diff --git a/internal/memorystore/buffer.go b/internal/metricstore/buffer.go similarity index 99% rename from internal/memorystore/buffer.go rename to internal/metricstore/buffer.go index 15e29b3a..94d3ce76 100644 --- a/internal/memorystore/buffer.go +++ b/internal/metricstore/buffer.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package memorystore +package metricstore import ( "errors" diff --git a/internal/memorystore/checkpoint.go b/internal/metricstore/checkpoint.go similarity index 99% rename from internal/memorystore/checkpoint.go rename to internal/metricstore/checkpoint.go index c48c2fd8..27d611c4 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/metricstore/checkpoint.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package memorystore +package metricstore import ( "bufio" diff --git a/internal/memorystore/config.go b/internal/metricstore/config.go similarity index 98% rename from internal/memorystore/config.go rename to internal/metricstore/config.go index fbd62341..97f16c46 100644 --- a/internal/memorystore/config.go +++ b/internal/metricstore/config.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package memorystore +package metricstore import ( "fmt" @@ -19,8 +19,6 @@ const ( DefaultAvroCheckpointInterval = time.Minute ) -var InternalCCMSFlag bool = false - type MetricStoreConfig struct { // Number of concurrent workers for checkpoint and archive operations. // If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10) diff --git a/internal/memorystore/configSchema.go b/internal/metricstore/configSchema.go similarity index 99% rename from internal/memorystore/configSchema.go rename to internal/metricstore/configSchema.go index 2616edc6..f1a20a73 100644 --- a/internal/memorystore/configSchema.go +++ b/internal/metricstore/configSchema.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package memorystore +package metricstore const configSchema = `{ "type": "object", diff --git a/internal/memorystore/debug.go b/internal/metricstore/debug.go similarity index 99% rename from internal/memorystore/debug.go rename to internal/metricstore/debug.go index b56cf254..50c91e08 100644 --- a/internal/memorystore/debug.go +++ b/internal/metricstore/debug.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package memorystore +package metricstore import ( "bufio" diff --git a/internal/memorystore/healthcheck.go b/internal/metricstore/healthcheck.go similarity index 99% rename from internal/memorystore/healthcheck.go rename to internal/metricstore/healthcheck.go index b1052f3b..2a49c47a 100644 --- a/internal/memorystore/healthcheck.go +++ b/internal/metricstore/healthcheck.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package memorystore +package metricstore import ( "bufio" diff --git a/internal/memorystore/level.go b/internal/metricstore/level.go similarity index 99% rename from internal/memorystore/level.go rename to internal/metricstore/level.go index bce2a7a6..d46f893a 100644 --- a/internal/memorystore/level.go +++ b/internal/metricstore/level.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package memorystore +package metricstore import ( "sync" diff --git a/internal/memorystore/lineprotocol.go b/internal/metricstore/lineprotocol.go similarity index 99% rename from internal/memorystore/lineprotocol.go rename to internal/metricstore/lineprotocol.go index ca8cc811..cc59e213 100644 --- a/internal/memorystore/lineprotocol.go +++ b/internal/metricstore/lineprotocol.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package memorystore +package metricstore import ( "context" diff --git a/internal/memorystore/memorystore.go b/internal/metricstore/memorystore.go similarity index 99% rename from internal/memorystore/memorystore.go rename to internal/metricstore/memorystore.go index 7c5ea0eb..14a02fcd 100644 --- a/internal/memorystore/memorystore.go +++ b/internal/metricstore/memorystore.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -// Package memorystore provides an efficient in-memory time-series metric storage system +// Package metricstore provides an efficient in-memory time-series metric storage system // with support for hierarchical data organization, checkpointing, and archiving. // // The package organizes metrics in a tree structure (cluster → host → component) and @@ -17,7 +17,7 @@ // - Concurrent checkpoint/archive workers // - Support for sum and average aggregation // - NATS integration for metric ingestion -package memorystore +package metricstore import ( "bytes" diff --git a/internal/memorystore/memorystore_test.go b/internal/metricstore/memorystore_test.go similarity index 99% rename from internal/memorystore/memorystore_test.go rename to internal/metricstore/memorystore_test.go index 57ea6938..29379d21 100644 --- a/internal/memorystore/memorystore_test.go +++ b/internal/metricstore/memorystore_test.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package memorystore +package metricstore import ( "testing" diff --git a/internal/metricdata/cc-metric-store-internal.go b/internal/metricstore/query.go similarity index 87% rename from internal/metricdata/cc-metric-store-internal.go rename to internal/metricstore/query.go index 741ce358..78c78dd5 100644 --- a/internal/metricdata/cc-metric-store-internal.go +++ b/internal/metricstore/query.go @@ -3,56 +3,41 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package metricdata +package metricstore import ( "context" - "encoding/json" "fmt" "strconv" "strings" "time" - "github.com/ClusterCockpit/cc-backend/internal/memorystore" "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" ) -// Bloat Code -type CCMetricStoreConfigInternal struct { - Kind string `json:"kind"` - Url string `json:"url"` - Token string `json:"token"` +// TestLoadDataCallback allows tests to override LoadData behavior +var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) - // If metrics are known to this MetricDataRepository under a different - // name than in the `metricConfig` section of the 'cluster.json', - // provide this optional mapping of local to remote name for this metric. - Renamings map[string]string `json:"metricRenamings"` -} - -// Bloat Code -type CCMetricStoreInternal struct{} - -// Bloat Code -func (ccms *CCMetricStoreInternal) Init(rawConfig json.RawMessage) error { - return nil -} - -func (ccms *CCMetricStoreInternal) LoadData( +func LoadData( job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int, ) (schema.JobData, error) { - queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, int64(resolution)) + if TestLoadDataCallback != nil { + return TestLoadDataCallback(job, metrics, scopes, ctx, resolution) + } + + queries, assignedScope, err := buildQueries(job, metrics, scopes, int64(resolution)) if err != nil { cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error()) return nil, err } - req := memorystore.APIQueryRequest{ + req := APIQueryRequest{ Cluster: job.Cluster, From: job.StartTime, To: job.StartTime + int64(job.Duration), @@ -61,7 +46,7 @@ func (ccms *CCMetricStoreInternal) LoadData( WithData: true, } - resBody, err := memorystore.FetchData(req) + resBody, err := FetchData(req) if err != nil { cclog.Errorf("Error while fetching data : %s", err.Error()) return nil, err @@ -149,13 +134,13 @@ var ( acceleratorString = string(schema.MetricScopeAccelerator) ) -func (ccms *CCMetricStoreInternal) buildQueries( +func buildQueries( job *schema.Job, metrics []string, scopes []schema.MetricScope, resolution int64, -) ([]memorystore.APIQuery, []schema.MetricScope, error) { - queries := make([]memorystore.APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) +) ([]APIQuery, []schema.MetricScope, error) { + queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) assignedScope := []schema.MetricScope{} subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster) @@ -217,7 +202,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( continue } - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: false, @@ -235,7 +220,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( continue } - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -249,7 +234,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // HWThread -> HWThead if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: false, @@ -265,7 +250,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(hwthreads) for _, core := range cores { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -282,7 +267,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) for _, socket := range sockets { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -297,7 +282,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // HWThread -> Node if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -312,7 +297,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // Core -> Core if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: false, @@ -328,7 +313,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromCores(hwthreads) for _, socket := range sockets { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -344,7 +329,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // Core -> Node if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -359,7 +344,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // MemoryDomain -> MemoryDomain if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: false, @@ -374,7 +359,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // MemoryDoman -> Node if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -389,7 +374,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // Socket -> Socket if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: false, @@ -404,7 +389,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // Socket -> Node if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -418,7 +403,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // Node -> Node if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Resolution: resolution, @@ -435,18 +420,18 @@ func (ccms *CCMetricStoreInternal) buildQueries( return queries, assignedScope, nil } -func (ccms *CCMetricStoreInternal) LoadStats( +func LoadStats( job *schema.Job, metrics []string, ctx context.Context, ) (map[string]map[string]schema.MetricStatistics, error) { - queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization? + queries, _, err := buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization? if err != nil { cclog.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error()) return nil, err } - req := memorystore.APIQueryRequest{ + req := APIQueryRequest{ Cluster: job.Cluster, From: job.StartTime, To: job.StartTime + int64(job.Duration), @@ -455,7 +440,7 @@ func (ccms *CCMetricStoreInternal) LoadStats( WithData: false, } - resBody, err := memorystore.FetchData(req) + resBody, err := FetchData(req) if err != nil { cclog.Errorf("Error while fetching data : %s", err.Error()) return nil, err @@ -492,20 +477,19 @@ func (ccms *CCMetricStoreInternal) LoadStats( return stats, nil } -// Used for Job-View Statistics Table -func (ccms *CCMetricStoreInternal) LoadScopedStats( +func LoadScopedStats( job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, ) (schema.ScopedJobStats, error) { - queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, 0) + queries, assignedScope, err := buildQueries(job, metrics, scopes, 0) if err != nil { cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error()) return nil, err } - req := memorystore.APIQueryRequest{ + req := APIQueryRequest{ Cluster: job.Cluster, From: job.StartTime, To: job.StartTime + int64(job.Duration), @@ -514,7 +498,7 @@ func (ccms *CCMetricStoreInternal) LoadScopedStats( WithData: false, } - resBody, err := memorystore.FetchData(req) + resBody, err := FetchData(req) if err != nil { cclog.Errorf("Error while fetching data : %s", err.Error()) return nil, err @@ -583,15 +567,14 @@ func (ccms *CCMetricStoreInternal) LoadScopedStats( return scopedJobStats, nil } -// Used for Systems-View Node-Overview -func (ccms *CCMetricStoreInternal) LoadNodeData( +func LoadNodeData( cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context, ) (map[string]map[string][]*schema.JobMetric, error) { - req := memorystore.APIQueryRequest{ + req := APIQueryRequest{ Cluster: cluster, From: from.Unix(), To: to.Unix(), @@ -604,7 +587,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData( } else { for _, node := range nodes { for _, metric := range metrics { - req.Queries = append(req.Queries, memorystore.APIQuery{ + req.Queries = append(req.Queries, APIQuery{ Hostname: node, Metric: metric, Resolution: 0, // Default for Node Queries: Will return metric $Timestep Resolution @@ -613,7 +596,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData( } } - resBody, err := memorystore.FetchData(req) + resBody, err := FetchData(req) if err != nil { cclog.Errorf("Error while fetching data : %s", err.Error()) return nil, err @@ -622,7 +605,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData( var errors []string data := make(map[string]map[string][]*schema.JobMetric) for i, res := range resBody.Results { - var query memorystore.APIQuery + var query APIQuery if resBody.Queries != nil { query = resBody.Queries[i] } else { @@ -673,8 +656,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData( return data, nil } -// Used for Systems-View Node-List -func (ccms *CCMetricStoreInternal) LoadNodeListData( +func LoadNodeListData( cluster, subCluster string, nodes []string, metrics []string, @@ -683,15 +665,14 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData( from, to time.Time, ctx context.Context, ) (map[string]schema.JobData, error) { - // Note: Order of node data is not guaranteed after this point - queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, int64(resolution)) + queries, assignedScope, err := buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, int64(resolution)) if err != nil { cclog.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error()) return nil, err } - req := memorystore.APIQueryRequest{ + req := APIQueryRequest{ Cluster: cluster, Queries: queries, From: from.Unix(), @@ -700,7 +681,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData( WithData: true, } - resBody, err := memorystore.FetchData(req) + resBody, err := FetchData(req) if err != nil { cclog.Errorf("Error while fetching data : %s", err.Error()) return nil, err @@ -709,7 +690,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData( var errors []string data := make(map[string]schema.JobData) for i, row := range resBody.Results { - var query memorystore.APIQuery + var query APIQuery if resBody.Queries != nil { query = resBody.Queries[i] } else { @@ -789,15 +770,15 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData( return data, nil } -func (ccms *CCMetricStoreInternal) buildNodeQueries( +func buildNodeQueries( cluster string, subCluster string, nodes []string, metrics []string, scopes []schema.MetricScope, resolution int64, -) ([]memorystore.APIQuery, []schema.MetricScope, error) { - queries := make([]memorystore.APIQuery, 0, len(metrics)*len(scopes)*len(nodes)) +) ([]APIQuery, []schema.MetricScope, error) { + queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes)) assignedScope := []schema.MetricScope{} // Get Topol before loop if subCluster given @@ -812,7 +793,6 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( } for _, metric := range metrics { - metric := metric mc := archive.GetMetricConfig(cluster, metric) if mc == nil { // return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, cluster) @@ -880,7 +860,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( continue } - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: false, @@ -898,7 +878,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( continue } - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -912,7 +892,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // HWThread -> HWThead if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: false, @@ -928,7 +908,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(topology.Node) for _, core := range cores { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -945,7 +925,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) for _, socket := range sockets { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -960,7 +940,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // HWThread -> Node if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -975,7 +955,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // Core -> Core if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(topology.Node) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: false, @@ -991,7 +971,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromCores(topology.Node) for _, socket := range sockets { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -1007,7 +987,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // Core -> Node if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { cores, _ := topology.GetCoresFromHWThreads(topology.Node) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -1022,7 +1002,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // MemoryDomain -> MemoryDomain if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: false, @@ -1037,7 +1017,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // MemoryDoman -> Node if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -1052,7 +1032,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // Socket -> Socket if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: false, @@ -1067,7 +1047,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // Socket -> Node if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -1081,7 +1061,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // Node -> Node if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Resolution: resolution, diff --git a/internal/memorystore/stats.go b/internal/metricstore/stats.go similarity index 99% rename from internal/memorystore/stats.go rename to internal/metricstore/stats.go index c931ab35..51ffafc1 100644 --- a/internal/memorystore/stats.go +++ b/internal/metricstore/stats.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package memorystore +package metricstore import ( "errors" diff --git a/internal/metricsync/metricdata.go b/internal/metricsync/metricdata.go new file mode 100644 index 00000000..772f16da --- /dev/null +++ b/internal/metricsync/metricdata.go @@ -0,0 +1,60 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package metricsync + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/config" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" +) + +type MetricDataSource interface { + // Initialize this MetricDataRepository. One instance of + // this interface will only ever be responsible for one cluster. + Init(rawConfig json.RawMessage) error + + // Return a map of hosts to a map of metrics at the requested scopes (currently only node) for that node. + Pull(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) +} + +var metricDataSourceRepos map[string]MetricDataSource = map[string]MetricDataSource{} + +func Init() error { + for _, cluster := range config.Clusters { + if cluster.MetricDataRepository != nil { + var kind struct { + Kind string `json:"kind"` + } + if err := json.Unmarshal(cluster.MetricDataRepository, &kind); err != nil { + cclog.Warn("Error while unmarshaling raw json MetricDataRepository") + return err + } + + var mdr MetricDataSource + switch kind.Kind { + case "cc-metric-store": + case "prometheus": + // mdr = &PrometheusDataRepository{} + case "test": + // mdr = &TestMetricDataRepository{} + default: + return fmt.Errorf("METRICDATA/METRICDATA > Unknown MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name) + } + + if err := mdr.Init(cluster.MetricDataRepository); err != nil { + cclog.Errorf("Error initializing MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name) + return err + } + metricDataSourceRepos[cluster.Name] = mdr + } + } + return nil +} diff --git a/internal/repository/stats.go b/internal/repository/stats.go index d1e16eb8..989026d1 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -12,7 +12,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatch" "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" @@ -766,7 +766,7 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram( continue } - if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil { + if err := metricdispatch.LoadAverages(job, metrics, avgs, ctx); err != nil { cclog.Errorf("Error while loading averages for histogram: %s", err) return nil } diff --git a/internal/taskmanager/updateFootprintService.go b/internal/taskmanager/updateFootprintService.go index 979a6137..c8f81e37 100644 --- a/internal/taskmanager/updateFootprintService.go +++ b/internal/taskmanager/updateFootprintService.go @@ -10,7 +10,7 @@ import ( "math" "time" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/metricstore" "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" @@ -58,12 +58,6 @@ func RegisterFootprintWorker() { allMetrics = append(allMetrics, mc.Name) } - repo, err := metricdata.GetMetricDataRepo(cluster.Name) - if err != nil { - cclog.Errorf("no metric data repository configured for '%s'", cluster.Name) - continue - } - pendingStatements := []sq.UpdateBuilder{} for _, job := range jobs { @@ -72,7 +66,7 @@ func RegisterFootprintWorker() { sJob := time.Now() - jobStats, err := repo.LoadStats(job, allMetrics, context.Background()) + jobStats, err := metricstore.LoadStats(job, allMetrics, context.Background()) if err != nil { cclog.Errorf("error wile loading job data stats for footprint update: %v", err) ce++