From 965561956eeb035782773d2d730a6267c8861b33 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Sun, 21 Dec 2025 07:34:17 +0100 Subject: [PATCH] Move ccms api to memorystore and make it default. Rename metricDataDispatcher. Refactor and document. --- internal/api/api_test.go | 4 +- internal/api/job.go | 6 +- internal/archiver/README.md | 4 +- internal/archiver/archiver.go | 4 +- internal/graph/schema.resolvers.go | 16 +- internal/graph/util.go | 6 +- .../query.go} | 141 ++--- internal/metricDataDispatcher/dataLoader.go | 381 -------------- internal/metricdata/cc-metric-store.go | 130 ++--- internal/metricdata/metricdata.go | 10 +- internal/metricdata/utils.go | 44 -- internal/metricdispatcher/dataLoader.go | 490 ++++++++++++++++++ internal/metricdispatcher/dataLoader_test.go | 125 +++++ internal/repository/stats.go | 4 +- 14 files changed, 769 insertions(+), 596 deletions(-) rename internal/{metricdata/cc-metric-store-internal.go => memorystore/query.go} (87%) delete mode 100644 internal/metricDataDispatcher/dataLoader.go create mode 100644 internal/metricdispatcher/dataLoader.go create mode 100644 internal/metricdispatcher/dataLoader_test.go diff --git a/internal/api/api_test.go b/internal/api/api_test.go index d311767..0f45ec3 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -23,8 +23,8 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" - "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatcher" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/archive" ccconf "github.com/ClusterCockpit/cc-lib/ccConfig" @@ -366,7 +366,7 @@ func TestRestApi(t *testing.T) { } t.Run("CheckArchive", func(t *testing.T) { - data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60) + data, err := metricdispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60) if err != nil { t.Fatal(err) } diff --git a/internal/api/job.go b/internal/api/job.go index 919772f..e4ed484 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -22,7 +22,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/importer" - "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatcher" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" @@ -293,7 +293,7 @@ func (api *RestAPI) getCompleteJobByID(rw http.ResponseWriter, r *http.Request) } if r.URL.Query().Get("all-metrics") == "true" { - data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context(), resolution) + data, err = metricdispatcher.LoadData(job, nil, scopes, r.Context(), resolution) if err != nil { cclog.Warnf("REST: error while loading all-metrics job data for JobID %d on %s", job.JobID, job.Cluster) return @@ -389,7 +389,7 @@ func (api *RestAPI) getJobByID(rw http.ResponseWriter, r *http.Request) { resolution = max(resolution, mc.Timestep) } - data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context(), resolution) + data, err := metricdispatcher.LoadData(job, metrics, scopes, r.Context(), resolution) if err != nil { cclog.Warnf("REST: error while loading job data for JobID %d on %s", job.JobID, job.Cluster) return diff --git a/internal/archiver/README.md b/internal/archiver/README.md index 0fae04e..3361f14 100644 --- a/internal/archiver/README.md +++ b/internal/archiver/README.md @@ -106,7 +106,7 @@ Data is archived at the highest available resolution (typically 60s intervals). ```go // In archiver.go ArchiveJob() function -jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 300) +jobData, err := metricdispatcher.LoadData(job, allMetrics, scopes, ctx, 300) // 0 = highest resolution // 300 = 5-minute resolution ``` @@ -185,6 +185,6 @@ Internal state is protected by: ## Dependencies - `internal/repository`: Database operations for job metadata -- `internal/metricDataDispatcher`: Loading metric data from various backends +- `internal/metricdispatcher`: Loading metric data from various backends - `pkg/archive`: Archive backend abstraction (filesystem, S3, SQLite) - `cc-lib/schema`: Job and metric data structures diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index b88199a..05148fe 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -10,7 +10,7 @@ import ( "math" "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatcher" "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/schema" @@ -60,7 +60,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.Job, error) { scopes = append(scopes, schema.MetricScopeAccelerator) } - jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 0) // 0 Resulotion-Value retrieves highest res (60s) + jobData, err := metricdispatcher.LoadData(job, allMetrics, scopes, ctx, 0) // 0 Resulotion-Value retrieves highest res (60s) if err != nil { cclog.Error("Error wile loading job data for archiving") return nil, err diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index cd4af05..ed42750 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -19,7 +19,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/generated" "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatcher" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" @@ -484,7 +484,7 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str return nil, err } - data, err := metricDataDispatcher.LoadData(job, metrics, scopes, ctx, *resolution) + data, err := metricdispatcher.LoadData(job, metrics, scopes, ctx, *resolution) if err != nil { cclog.Warn("Error while loading job data") return nil, err @@ -512,7 +512,7 @@ func (r *queryResolver) JobStats(ctx context.Context, id string, metrics []strin return nil, err } - data, err := metricDataDispatcher.LoadJobStats(job, metrics, ctx) + data, err := metricdispatcher.LoadJobStats(job, metrics, ctx) if err != nil { cclog.Warnf("Error while loading jobStats data for job id %s", id) return nil, err @@ -537,7 +537,7 @@ func (r *queryResolver) ScopedJobStats(ctx context.Context, id string, metrics [ return nil, err } - data, err := metricDataDispatcher.LoadScopedJobStats(job, metrics, scopes, ctx) + data, err := metricdispatcher.LoadScopedJobStats(job, metrics, scopes, ctx) if err != nil { cclog.Warnf("Error while loading scopedJobStats data for job id %s", id) return nil, err @@ -702,7 +702,7 @@ func (r *queryResolver) JobsMetricStats(ctx context.Context, filter []*model.Job res := []*model.JobStats{} for _, job := range jobs { - data, err := metricDataDispatcher.LoadJobStats(job, metrics, ctx) + data, err := metricdispatcher.LoadJobStats(job, metrics, ctx) if err != nil { cclog.Warnf("Error while loading comparison jobStats data for job id %d", job.JobID) continue @@ -759,7 +759,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes [ } } - data, err := metricDataDispatcher.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) + data, err := metricdispatcher.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) if err != nil { cclog.Warn("error while loading node data") return nil, err @@ -825,7 +825,7 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub } } - data, err := metricDataDispatcher.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, *resolution, from, to, ctx) + data, err := metricdispatcher.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, *resolution, from, to, ctx) if err != nil { cclog.Warn("error while loading node data (Resolver.NodeMetricsList") return nil, err @@ -880,7 +880,7 @@ func (r *queryResolver) ClusterMetrics(ctx context.Context, cluster string, metr // 'nodes' == nil -> Defaults to all nodes of cluster for existing query workflow scopes := []schema.MetricScope{"node"} - data, err := metricDataDispatcher.LoadNodeData(cluster, metrics, nil, scopes, from, to, ctx) + data, err := metricdispatcher.LoadNodeData(cluster, metrics, nil, scopes, from, to, ctx) if err != nil { cclog.Warn("error while loading node data") return nil, err diff --git a/internal/graph/util.go b/internal/graph/util.go index 220c3a8..ac8650b 100644 --- a/internal/graph/util.go +++ b/internal/graph/util.go @@ -13,7 +13,7 @@ import ( "github.com/99designs/gqlgen/graphql" "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatcher" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/schema" ) @@ -55,7 +55,7 @@ func (r *queryResolver) rooflineHeatmap( // resolution = max(resolution, mc.Timestep) // } - jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0) + jobdata, err := metricdispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0) if err != nil { cclog.Errorf("Error while loading roofline metrics for job %d", job.ID) return nil, err @@ -128,7 +128,7 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF continue } - if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil { + if err := metricdispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil { cclog.Error("Error while loading averages for footprint") return nil, err } diff --git a/internal/metricdata/cc-metric-store-internal.go b/internal/memorystore/query.go similarity index 87% rename from internal/metricdata/cc-metric-store-internal.go rename to internal/memorystore/query.go index 9f0cd74..19b48c3 100644 --- a/internal/metricdata/cc-metric-store-internal.go +++ b/internal/memorystore/query.go @@ -3,56 +3,34 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package metricdata +package memorystore import ( "context" - "encoding/json" "fmt" "strconv" "strings" "time" - "github.com/ClusterCockpit/cc-backend/internal/memorystore" "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/schema" ) -// Bloat Code -type CCMetricStoreConfigInternal struct { - Kind string `json:"kind"` - Url string `json:"url"` - Token string `json:"token"` - - // If metrics are known to this MetricDataRepository under a different - // name than in the `metricConfig` section of the 'cluster.json', - // provide this optional mapping of local to remote name for this metric. - Renamings map[string]string `json:"metricRenamings"` -} - -// Bloat Code -type CCMetricStoreInternal struct{} - -// Bloat Code -func (ccms *CCMetricStoreInternal) Init(rawConfig json.RawMessage) error { - return nil -} - -func (ccms *CCMetricStoreInternal) LoadData( +func LoadData( job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int, ) (schema.JobData, error) { - queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, int64(resolution)) + queries, assignedScope, err := buildQueries(job, metrics, scopes, int64(resolution)) if err != nil { cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error()) return nil, err } - req := memorystore.APIQueryRequest{ + req := APIQueryRequest{ Cluster: job.Cluster, From: job.StartTime, To: job.StartTime + int64(job.Duration), @@ -61,7 +39,7 @@ func (ccms *CCMetricStoreInternal) LoadData( WithData: true, } - resBody, err := memorystore.FetchData(req) + resBody, err := FetchData(req) if err != nil { cclog.Errorf("Error while fetching data : %s", err.Error()) return nil, err @@ -149,13 +127,13 @@ var ( acceleratorString = string(schema.MetricScopeAccelerator) ) -func (ccms *CCMetricStoreInternal) buildQueries( +func buildQueries( job *schema.Job, metrics []string, scopes []schema.MetricScope, resolution int64, -) ([]memorystore.APIQuery, []schema.MetricScope, error) { - queries := make([]memorystore.APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) +) ([]APIQuery, []schema.MetricScope, error) { + queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) assignedScope := []schema.MetricScope{} subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster) @@ -217,7 +195,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( continue } - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: false, @@ -235,7 +213,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( continue } - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -249,7 +227,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // HWThread -> HWThead if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: false, @@ -265,7 +243,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(hwthreads) for _, core := range cores { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -282,7 +260,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) for _, socket := range sockets { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -297,7 +275,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // HWThread -> Node if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -312,7 +290,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // Core -> Core if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: false, @@ -328,7 +306,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromCores(hwthreads) for _, socket := range sockets { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -344,7 +322,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // Core -> Node if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -359,7 +337,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // MemoryDomain -> MemoryDomain if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: false, @@ -374,7 +352,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // MemoryDoman -> Node if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -389,7 +367,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // Socket -> Socket if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: false, @@ -404,7 +382,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // Socket -> Node if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Aggregate: true, @@ -418,7 +396,7 @@ func (ccms *CCMetricStoreInternal) buildQueries( // Node -> Node if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: host.Hostname, Resolution: resolution, @@ -435,18 +413,18 @@ func (ccms *CCMetricStoreInternal) buildQueries( return queries, assignedScope, nil } -func (ccms *CCMetricStoreInternal) LoadStats( +func LoadStats( job *schema.Job, metrics []string, ctx context.Context, ) (map[string]map[string]schema.MetricStatistics, error) { - queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization? + queries, _, err := buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization? if err != nil { cclog.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error()) return nil, err } - req := memorystore.APIQueryRequest{ + req := APIQueryRequest{ Cluster: job.Cluster, From: job.StartTime, To: job.StartTime + int64(job.Duration), @@ -455,7 +433,7 @@ func (ccms *CCMetricStoreInternal) LoadStats( WithData: false, } - resBody, err := memorystore.FetchData(req) + resBody, err := FetchData(req) if err != nil { cclog.Errorf("Error while fetching data : %s", err.Error()) return nil, err @@ -492,20 +470,19 @@ func (ccms *CCMetricStoreInternal) LoadStats( return stats, nil } -// Used for Job-View Statistics Table -func (ccms *CCMetricStoreInternal) LoadScopedStats( +func LoadScopedStats( job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, ) (schema.ScopedJobStats, error) { - queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, 0) + queries, assignedScope, err := buildQueries(job, metrics, scopes, 0) if err != nil { cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error()) return nil, err } - req := memorystore.APIQueryRequest{ + req := APIQueryRequest{ Cluster: job.Cluster, From: job.StartTime, To: job.StartTime + int64(job.Duration), @@ -514,7 +491,7 @@ func (ccms *CCMetricStoreInternal) LoadScopedStats( WithData: false, } - resBody, err := memorystore.FetchData(req) + resBody, err := FetchData(req) if err != nil { cclog.Errorf("Error while fetching data : %s", err.Error()) return nil, err @@ -583,15 +560,14 @@ func (ccms *CCMetricStoreInternal) LoadScopedStats( return scopedJobStats, nil } -// Used for Systems-View Node-Overview -func (ccms *CCMetricStoreInternal) LoadNodeData( +func LoadNodeData( cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context, ) (map[string]map[string][]*schema.JobMetric, error) { - req := memorystore.APIQueryRequest{ + req := APIQueryRequest{ Cluster: cluster, From: from.Unix(), To: to.Unix(), @@ -604,7 +580,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData( } else { for _, node := range nodes { for _, metric := range metrics { - req.Queries = append(req.Queries, memorystore.APIQuery{ + req.Queries = append(req.Queries, APIQuery{ Hostname: node, Metric: metric, Resolution: 0, // Default for Node Queries: Will return metric $Timestep Resolution @@ -613,7 +589,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData( } } - resBody, err := memorystore.FetchData(req) + resBody, err := FetchData(req) if err != nil { cclog.Errorf("Error while fetching data : %s", err.Error()) return nil, err @@ -622,7 +598,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData( var errors []string data := make(map[string]map[string][]*schema.JobMetric) for i, res := range resBody.Results { - var query memorystore.APIQuery + var query APIQuery if resBody.Queries != nil { query = resBody.Queries[i] } else { @@ -673,8 +649,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData( return data, nil } -// Used for Systems-View Node-List -func (ccms *CCMetricStoreInternal) LoadNodeListData( +func LoadNodeListData( cluster, subCluster string, nodes []string, metrics []string, @@ -683,15 +658,14 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData( from, to time.Time, ctx context.Context, ) (map[string]schema.JobData, error) { - // Note: Order of node data is not guaranteed after this point - queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, int64(resolution)) + queries, assignedScope, err := buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, int64(resolution)) if err != nil { cclog.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error()) return nil, err } - req := memorystore.APIQueryRequest{ + req := APIQueryRequest{ Cluster: cluster, Queries: queries, From: from.Unix(), @@ -700,7 +674,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData( WithData: true, } - resBody, err := memorystore.FetchData(req) + resBody, err := FetchData(req) if err != nil { cclog.Errorf("Error while fetching data : %s", err.Error()) return nil, err @@ -709,7 +683,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData( var errors []string data := make(map[string]schema.JobData) for i, row := range resBody.Results { - var query memorystore.APIQuery + var query APIQuery if resBody.Queries != nil { query = resBody.Queries[i] } else { @@ -789,15 +763,15 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData( return data, nil } -func (ccms *CCMetricStoreInternal) buildNodeQueries( +func buildNodeQueries( cluster string, subCluster string, nodes []string, metrics []string, scopes []schema.MetricScope, resolution int64, -) ([]memorystore.APIQuery, []schema.MetricScope, error) { - queries := make([]memorystore.APIQuery, 0, len(metrics)*len(scopes)*len(nodes)) +) ([]APIQuery, []schema.MetricScope, error) { + queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes)) assignedScope := []schema.MetricScope{} // Get Topol before loop if subCluster given @@ -812,7 +786,6 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( } for _, metric := range metrics { - metric := metric mc := archive.GetMetricConfig(cluster, metric) if mc == nil { // return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, cluster) @@ -880,7 +853,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( continue } - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: false, @@ -898,7 +871,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( continue } - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -912,7 +885,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // HWThread -> HWThead if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: false, @@ -928,7 +901,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(topology.Node) for _, core := range cores { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -945,7 +918,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) for _, socket := range sockets { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -960,7 +933,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // HWThread -> Node if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -975,7 +948,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // Core -> Core if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(topology.Node) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: false, @@ -991,7 +964,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromCores(topology.Node) for _, socket := range sockets { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -1007,7 +980,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // Core -> Node if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { cores, _ := topology.GetCoresFromHWThreads(topology.Node) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -1022,7 +995,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // MemoryDomain -> MemoryDomain if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: false, @@ -1037,7 +1010,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // MemoryDoman -> Node if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -1052,7 +1025,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // Socket -> Socket if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: false, @@ -1067,7 +1040,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // Socket -> Node if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Aggregate: true, @@ -1081,7 +1054,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries( // Node -> Node if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, memorystore.APIQuery{ + queries = append(queries, APIQuery{ Metric: metric, Hostname: hostname, Resolution: resolution, diff --git a/internal/metricDataDispatcher/dataLoader.go b/internal/metricDataDispatcher/dataLoader.go deleted file mode 100644 index 780eb73..0000000 --- a/internal/metricDataDispatcher/dataLoader.go +++ /dev/null @@ -1,381 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. This file is part of cc-backend. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package metricDataDispatcher - -import ( - "context" - "fmt" - "math" - "time" - - "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" - "github.com/ClusterCockpit/cc-backend/pkg/archive" - cclog "github.com/ClusterCockpit/cc-lib/ccLogger" - "github.com/ClusterCockpit/cc-lib/lrucache" - "github.com/ClusterCockpit/cc-lib/resampler" - "github.com/ClusterCockpit/cc-lib/schema" -) - -var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) - -func cacheKey( - job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - resolution int, -) string { - // Duration and StartTime do not need to be in the cache key as StartTime is less unique than - // job.ID and the TTL of the cache entry makes sure it does not stay there forever. - return fmt.Sprintf("%d(%s):[%v],[%v]-%d", - job.ID, job.State, metrics, scopes, resolution) -} - -// Fetches the metric data for a job. -func LoadData(job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - ctx context.Context, - resolution int, -) (schema.JobData, error) { - data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ any, ttl time.Duration, size int) { - var jd schema.JobData - var err error - - if job.State == schema.JobStateRunning || - job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || - config.Keys.DisableArchive { - - repo, err := metricdata.GetMetricDataRepo(job.Cluster) - if err != nil { - return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0 - } - - if scopes == nil { - scopes = append(scopes, schema.MetricScopeNode) - } - - if metrics == nil { - cluster := archive.GetCluster(job.Cluster) - for _, mc := range cluster.MetricConfig { - metrics = append(metrics, mc.Name) - } - } - - jd, err = repo.LoadData(job, metrics, scopes, ctx, resolution) - if err != nil { - if len(jd) != 0 { - cclog.Warnf("partial error: %s", err.Error()) - // return err, 0, 0 // Reactivating will block archiving on one partial error - } else { - cclog.Error("Error while loading job data from metric repository") - return err, 0, 0 - } - } - size = jd.Size() - } else { - var jd_temp schema.JobData - jd_temp, err = archive.GetHandle().LoadJobData(job) - if err != nil { - cclog.Error("Error while loading job data from archive") - return err, 0, 0 - } - - // Deep copy the cached archive hashmap - jd = metricdata.DeepCopy(jd_temp) - - // Resampling for archived data. - // Pass the resolution from frontend here. - for _, v := range jd { - for _, v_ := range v { - timestep := int64(0) - for i := 0; i < len(v_.Series); i += 1 { - v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, int64(v_.Timestep), int64(resolution)) - if err != nil { - return err, 0, 0 - } - } - v_.Timestep = int(timestep) - } - } - - // Avoid sending unrequested data to the client: - if metrics != nil || scopes != nil { - if metrics == nil { - metrics = make([]string, 0, len(jd)) - for k := range jd { - metrics = append(metrics, k) - } - } - - res := schema.JobData{} - for _, metric := range metrics { - if perscope, ok := jd[metric]; ok { - if len(perscope) > 1 { - subset := make(map[schema.MetricScope]*schema.JobMetric) - for _, scope := range scopes { - if jm, ok := perscope[scope]; ok { - subset[scope] = jm - } - } - - if len(subset) > 0 { - perscope = subset - } - } - - res[metric] = perscope - } - } - jd = res - } - size = jd.Size() - } - - ttl = 5 * time.Hour - if job.State == schema.JobStateRunning { - ttl = 2 * time.Minute - } - - // FIXME: Review: Is this really necessary or correct. - // Note: Lines 147-170 formerly known as prepareJobData(jobData, scopes) - // For /monitoring/job/ and some other places, flops_any and mem_bw need - // to be available at the scope 'node'. If a job has a lot of nodes, - // statisticsSeries should be available so that a min/median/max Graph can be - // used instead of a lot of single lines. - // NOTE: New StatsSeries will always be calculated as 'min/median/max' - // Existing (archived) StatsSeries can be 'min/mean/max'! - const maxSeriesSize int = 15 - for _, scopes := range jd { - for _, jm := range scopes { - if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize { - continue - } - - jm.AddStatisticsSeries() - } - } - - nodeScopeRequested := false - for _, scope := range scopes { - if scope == schema.MetricScopeNode { - nodeScopeRequested = true - } - } - - if nodeScopeRequested { - jd.AddNodeScope("flops_any") - jd.AddNodeScope("mem_bw") - } - - // Round Resulting Stat Values - jd.RoundMetricStats() - - return jd, ttl, size - }) - - if err, ok := data.(error); ok { - cclog.Error("Error in returned dataset") - return nil, err - } - - return data.(schema.JobData), nil -} - -// Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize. -func LoadAverages( - job *schema.Job, - metrics []string, - data [][]schema.Float, - ctx context.Context, -) error { - if job.State != schema.JobStateRunning && !config.Keys.DisableArchive { - return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here? - } - - repo, err := metricdata.GetMetricDataRepo(job.Cluster) - if err != nil { - return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster) - } - - stats, err := repo.LoadStats(job, metrics, ctx) // #166 how to handle stats for acc normalizazion? - if err != nil { - cclog.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project) - return err - } - - for i, m := range metrics { - nodes, ok := stats[m] - if !ok { - data[i] = append(data[i], schema.NaN) - continue - } - - sum := 0.0 - for _, node := range nodes { - sum += node.Avg - } - data[i] = append(data[i], schema.Float(sum)) - } - - return nil -} - -// Used for statsTable in frontend: Return scoped statistics by metric. -func LoadScopedJobStats( - job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - ctx context.Context, -) (schema.ScopedJobStats, error) { - if job.State != schema.JobStateRunning && !config.Keys.DisableArchive { - return archive.LoadScopedStatsFromArchive(job, metrics, scopes) - } - - repo, err := metricdata.GetMetricDataRepo(job.Cluster) - if err != nil { - return nil, fmt.Errorf("job %d: no metric data repository configured for '%s'", job.JobID, job.Cluster) - } - - scopedStats, err := repo.LoadScopedStats(job, metrics, scopes, ctx) - if err != nil { - cclog.Errorf("error while loading scoped statistics for job %d (User %s, Project %s)", job.JobID, job.User, job.Project) - return nil, err - } - - return scopedStats, nil -} - -// Used for polar plots in frontend: Aggregates statistics for all nodes to single values for job per metric. -func LoadJobStats( - job *schema.Job, - metrics []string, - ctx context.Context, -) (map[string]schema.MetricStatistics, error) { - if job.State != schema.JobStateRunning && !config.Keys.DisableArchive { - return archive.LoadStatsFromArchive(job, metrics) - } - - data := make(map[string]schema.MetricStatistics, len(metrics)) - repo, err := metricdata.GetMetricDataRepo(job.Cluster) - if err != nil { - return data, fmt.Errorf("job %d: no metric data repository configured for '%s'", job.JobID, job.Cluster) - } - - stats, err := repo.LoadStats(job, metrics, ctx) - if err != nil { - cclog.Errorf("error while loading statistics for job %d (User %s, Project %s)", job.JobID, job.User, job.Project) - return data, err - } - - for _, m := range metrics { - sum, avg, min, max := 0.0, 0.0, 0.0, 0.0 - nodes, ok := stats[m] - if !ok { - data[m] = schema.MetricStatistics{Min: min, Avg: avg, Max: max} - continue - } - - for _, node := range nodes { - sum += node.Avg - min = math.Min(min, node.Min) - max = math.Max(max, node.Max) - } - - data[m] = schema.MetricStatistics{ - Avg: (math.Round((sum/float64(job.NumNodes))*100) / 100), - Min: (math.Round(min*100) / 100), - Max: (math.Round(max*100) / 100), - } - } - - return data, nil -} - -// Used for the classic node/system view. Returns a map of nodes to a map of metrics. -func LoadNodeData( - cluster string, - metrics, nodes []string, - scopes []schema.MetricScope, - from, to time.Time, - ctx context.Context, -) (map[string]map[string][]*schema.JobMetric, error) { - repo, err := metricdata.GetMetricDataRepo(cluster) - if err != nil { - return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) - } - - if metrics == nil { - for _, m := range archive.GetCluster(cluster).MetricConfig { - metrics = append(metrics, m.Name) - } - } - - data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) - if err != nil { - if len(data) != 0 { - cclog.Warnf("partial error: %s", err.Error()) - } else { - cclog.Error("Error while loading node data from metric repository") - return nil, err - } - } - - if data == nil { - return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster) - } - - return data, nil -} - -func LoadNodeListData( - cluster, subCluster string, - nodes []string, - metrics []string, - scopes []schema.MetricScope, - resolution int, - from, to time.Time, - ctx context.Context, -) (map[string]schema.JobData, error) { - repo, err := metricdata.GetMetricDataRepo(cluster) - if err != nil { - return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) - } - - if metrics == nil { - for _, m := range archive.GetCluster(cluster).MetricConfig { - metrics = append(metrics, m.Name) - } - } - - data, err := repo.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx) - if err != nil { - if len(data) != 0 { - cclog.Warnf("partial error: %s", err.Error()) - } else { - cclog.Error("Error while loading node data from metric repository") - return nil, err - } - } - - // NOTE: New StatsSeries will always be calculated as 'min/median/max' - const maxSeriesSize int = 8 - for _, jd := range data { - for _, scopes := range jd { - for _, jm := range scopes { - if jm.StatisticsSeries != nil || len(jm.Series) < maxSeriesSize { - continue - } - jm.AddStatisticsSeries() - } - } - } - - if data == nil { - return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster) - } - - return data, nil -} diff --git a/internal/metricdata/cc-metric-store.go b/internal/metricdata/cc-metric-store.go index 6d446d1..53d5f17 100644 --- a/internal/metricdata/cc-metric-store.go +++ b/internal/metricdata/cc-metric-store.go @@ -2,6 +2,7 @@ // All rights reserved. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package metricdata import ( @@ -11,6 +12,7 @@ import ( "encoding/json" "fmt" "net/http" + "strconv" "strings" "time" @@ -21,7 +23,7 @@ import ( type CCMetricStoreConfig struct { Kind string `json:"kind"` - Url string `json:"url"` + URL string `json:"url"` Token string `json:"token"` // If metrics are known to this MetricDataRepository under a different @@ -39,9 +41,9 @@ type CCMetricStore struct { queryEndpoint string } -type ApiQueryRequest struct { +type APIQueryRequest struct { Cluster string `json:"cluster"` - Queries []ApiQuery `json:"queries"` + Queries []APIQuery `json:"queries"` ForAllNodes []string `json:"for-all-nodes"` From int64 `json:"from"` To int64 `json:"to"` @@ -49,7 +51,7 @@ type ApiQueryRequest struct { WithData bool `json:"with-data"` } -type ApiQuery struct { +type APIQuery struct { Type *string `json:"type,omitempty"` SubType *string `json:"subtype,omitempty"` Metric string `json:"metric"` @@ -60,12 +62,12 @@ type ApiQuery struct { Aggregate bool `json:"aggreg"` } -type ApiQueryResponse struct { - Queries []ApiQuery `json:"queries,omitempty"` - Results [][]ApiMetricData `json:"results"` +type APIQueryResponse struct { + Queries []APIQuery `json:"queries,omitempty"` + Results [][]APIMetricData `json:"results"` } -type ApiMetricData struct { +type APIMetricData struct { Error *string `json:"error"` Data []schema.Float `json:"data"` From int64 `json:"from"` @@ -76,6 +78,14 @@ type ApiMetricData struct { Max schema.Float `json:"max"` } +var ( + hwthreadString = string(schema.MetricScopeHWThread) + coreString = string(schema.MetricScopeCore) + memoryDomainString = string(schema.MetricScopeMemoryDomain) + socketString = string(schema.MetricScopeSocket) + acceleratorString = string(schema.MetricScopeAccelerator) +) + func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error { var config CCMetricStoreConfig if err := json.Unmarshal(rawConfig, &config); err != nil { @@ -83,8 +93,8 @@ func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error { return err } - ccms.url = config.Url - ccms.queryEndpoint = fmt.Sprintf("%s/api/query", config.Url) + ccms.url = config.URL + ccms.queryEndpoint = fmt.Sprintf("%s/api/query", config.URL) ccms.jwt = config.Token ccms.client = http.Client{ Timeout: 10 * time.Second, @@ -122,8 +132,8 @@ func (ccms *CCMetricStore) toLocalName(metric string) string { func (ccms *CCMetricStore) doRequest( ctx context.Context, - body *ApiQueryRequest, -) (*ApiQueryResponse, error) { + body *APIQueryRequest, +) (*APIQueryResponse, error) { buf := &bytes.Buffer{} if err := json.NewEncoder(buf).Encode(body); err != nil { cclog.Errorf("Error while encoding request body: %s", err.Error()) @@ -156,7 +166,7 @@ func (ccms *CCMetricStore) doRequest( return nil, fmt.Errorf("'%s': HTTP Status: %s", ccms.queryEndpoint, res.Status) } - var resBody ApiQueryResponse + var resBody APIQueryResponse if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&resBody); err != nil { cclog.Errorf("Error while decoding result body: %s", err.Error()) return nil, err @@ -178,7 +188,7 @@ func (ccms *CCMetricStore) LoadData( return nil, err } - req := ApiQueryRequest{ + req := APIQueryRequest{ Cluster: job.Cluster, From: job.StartTime, To: job.StartTime + int64(job.Duration), @@ -272,8 +282,8 @@ func (ccms *CCMetricStore) buildQueries( metrics []string, scopes []schema.MetricScope, resolution int, -) ([]ApiQuery, []schema.MetricScope, error) { - queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) +) ([]APIQuery, []schema.MetricScope, error) { + queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) assignedScope := []schema.MetricScope{} subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster) @@ -336,7 +346,7 @@ func (ccms *CCMetricStore) buildQueries( continue } - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: host.Hostname, Aggregate: false, @@ -354,7 +364,7 @@ func (ccms *CCMetricStore) buildQueries( continue } - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: host.Hostname, Aggregate: true, @@ -368,7 +378,7 @@ func (ccms *CCMetricStore) buildQueries( // HWThread -> HWThead if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: host.Hostname, Aggregate: false, @@ -384,7 +394,7 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(hwthreads) for _, core := range cores { - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: host.Hostname, Aggregate: true, @@ -401,7 +411,7 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) for _, socket := range sockets { - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: host.Hostname, Aggregate: true, @@ -416,7 +426,7 @@ func (ccms *CCMetricStore) buildQueries( // HWThread -> Node if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: host.Hostname, Aggregate: true, @@ -431,7 +441,7 @@ func (ccms *CCMetricStore) buildQueries( // Core -> Core if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: host.Hostname, Aggregate: false, @@ -447,7 +457,7 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromCores(hwthreads) for _, socket := range sockets { - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: host.Hostname, Aggregate: true, @@ -463,7 +473,7 @@ func (ccms *CCMetricStore) buildQueries( // Core -> Node if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { cores, _ := topology.GetCoresFromHWThreads(hwthreads) - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: host.Hostname, Aggregate: true, @@ -478,7 +488,7 @@ func (ccms *CCMetricStore) buildQueries( // MemoryDomain -> MemoryDomain if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: host.Hostname, Aggregate: false, @@ -493,7 +503,7 @@ func (ccms *CCMetricStore) buildQueries( // MemoryDoman -> Node if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: host.Hostname, Aggregate: true, @@ -508,7 +518,7 @@ func (ccms *CCMetricStore) buildQueries( // Socket -> Socket if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: host.Hostname, Aggregate: false, @@ -523,7 +533,7 @@ func (ccms *CCMetricStore) buildQueries( // Socket -> Node if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: host.Hostname, Aggregate: true, @@ -537,7 +547,7 @@ func (ccms *CCMetricStore) buildQueries( // Node -> Node if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: host.Hostname, Resolution: resolution, @@ -559,14 +569,13 @@ func (ccms *CCMetricStore) LoadStats( metrics []string, ctx context.Context, ) (map[string]map[string]schema.MetricStatistics, error) { - queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization? if err != nil { cclog.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error()) return nil, err } - req := ApiQueryRequest{ + req := APIQueryRequest{ Cluster: job.Cluster, From: job.StartTime, To: job.StartTime + int64(job.Duration), @@ -612,7 +621,6 @@ func (ccms *CCMetricStore) LoadStats( return stats, nil } -// Used for Job-View Statistics Table func (ccms *CCMetricStore) LoadScopedStats( job *schema.Job, metrics []string, @@ -625,7 +633,7 @@ func (ccms *CCMetricStore) LoadScopedStats( return nil, err } - req := ApiQueryRequest{ + req := APIQueryRequest{ Cluster: job.Cluster, From: job.StartTime, To: job.StartTime + int64(job.Duration), @@ -703,7 +711,6 @@ func (ccms *CCMetricStore) LoadScopedStats( return scopedJobStats, nil } -// Used for Systems-View Node-Overview func (ccms *CCMetricStore) LoadNodeData( cluster string, metrics, nodes []string, @@ -711,7 +718,7 @@ func (ccms *CCMetricStore) LoadNodeData( from, to time.Time, ctx context.Context, ) (map[string]map[string][]*schema.JobMetric, error) { - req := ApiQueryRequest{ + req := APIQueryRequest{ Cluster: cluster, From: from.Unix(), To: to.Unix(), @@ -726,7 +733,7 @@ func (ccms *CCMetricStore) LoadNodeData( } else { for _, node := range nodes { for _, metric := range metrics { - req.Queries = append(req.Queries, ApiQuery{ + req.Queries = append(req.Queries, APIQuery{ Hostname: node, Metric: ccms.toRemoteName(metric), Resolution: 0, // Default for Node Queries: Will return metric $Timestep Resolution @@ -744,7 +751,7 @@ func (ccms *CCMetricStore) LoadNodeData( var errors []string data := make(map[string]map[string][]*schema.JobMetric) for i, res := range resBody.Results { - var query ApiQuery + var query APIQuery if resBody.Queries != nil { query = resBody.Queries[i] } else { @@ -795,7 +802,6 @@ func (ccms *CCMetricStore) LoadNodeData( return data, nil } -// Used for Systems-View Node-List func (ccms *CCMetricStore) LoadNodeListData( cluster, subCluster string, nodes []string, @@ -805,7 +811,6 @@ func (ccms *CCMetricStore) LoadNodeListData( from, to time.Time, ctx context.Context, ) (map[string]schema.JobData, error) { - // Note: Order of node data is not guaranteed after this point queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, resolution) if err != nil { @@ -813,7 +818,7 @@ func (ccms *CCMetricStore) LoadNodeListData( return nil, err } - req := ApiQueryRequest{ + req := APIQueryRequest{ Cluster: cluster, Queries: queries, From: from.Unix(), @@ -831,7 +836,7 @@ func (ccms *CCMetricStore) LoadNodeListData( var errors []string data := make(map[string]schema.JobData) for i, row := range resBody.Results { - var query ApiQuery + var query APIQuery if resBody.Queries != nil { query = resBody.Queries[i] } else { @@ -918,9 +923,8 @@ func (ccms *CCMetricStore) buildNodeQueries( metrics []string, scopes []schema.MetricScope, resolution int, -) ([]ApiQuery, []schema.MetricScope, error) { - - queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(nodes)) +) ([]APIQuery, []schema.MetricScope, error) { + queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes)) assignedScope := []schema.MetricScope{} // Get Topol before loop if subCluster given @@ -1003,7 +1007,7 @@ func (ccms *CCMetricStore) buildNodeQueries( continue } - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: hostname, Aggregate: false, @@ -1021,7 +1025,7 @@ func (ccms *CCMetricStore) buildNodeQueries( continue } - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: hostname, Aggregate: true, @@ -1035,7 +1039,7 @@ func (ccms *CCMetricStore) buildNodeQueries( // HWThread -> HWThead if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: hostname, Aggregate: false, @@ -1051,7 +1055,7 @@ func (ccms *CCMetricStore) buildNodeQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(topology.Node) for _, core := range cores { - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: hostname, Aggregate: true, @@ -1068,7 +1072,7 @@ func (ccms *CCMetricStore) buildNodeQueries( if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) for _, socket := range sockets { - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: hostname, Aggregate: true, @@ -1083,7 +1087,7 @@ func (ccms *CCMetricStore) buildNodeQueries( // HWThread -> Node if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: hostname, Aggregate: true, @@ -1098,7 +1102,7 @@ func (ccms *CCMetricStore) buildNodeQueries( // Core -> Core if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(topology.Node) - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: hostname, Aggregate: false, @@ -1114,7 +1118,7 @@ func (ccms *CCMetricStore) buildNodeQueries( if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromCores(topology.Node) for _, socket := range sockets { - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: hostname, Aggregate: true, @@ -1130,7 +1134,7 @@ func (ccms *CCMetricStore) buildNodeQueries( // Core -> Node if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { cores, _ := topology.GetCoresFromHWThreads(topology.Node) - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: hostname, Aggregate: true, @@ -1145,7 +1149,7 @@ func (ccms *CCMetricStore) buildNodeQueries( // MemoryDomain -> MemoryDomain if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: hostname, Aggregate: false, @@ -1160,7 +1164,7 @@ func (ccms *CCMetricStore) buildNodeQueries( // MemoryDoman -> Node if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node) - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: hostname, Aggregate: true, @@ -1175,7 +1179,7 @@ func (ccms *CCMetricStore) buildNodeQueries( // Socket -> Socket if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: hostname, Aggregate: false, @@ -1190,7 +1194,7 @@ func (ccms *CCMetricStore) buildNodeQueries( // Socket -> Node if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { sockets, _ := topology.GetSocketsFromHWThreads(topology.Node) - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: hostname, Aggregate: true, @@ -1204,7 +1208,7 @@ func (ccms *CCMetricStore) buildNodeQueries( // Node -> Node if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { - queries = append(queries, ApiQuery{ + queries = append(queries, APIQuery{ Metric: remoteName, Hostname: hostname, Resolution: resolution, @@ -1220,3 +1224,11 @@ func (ccms *CCMetricStore) buildNodeQueries( return queries, assignedScope, nil } + +func intToStringSlice(is []int) []string { + ss := make([]string, len(is)) + for i, x := range is { + ss[i] = strconv.Itoa(x) + } + return ss +} diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index 9a20001..df33ab4 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -12,7 +12,6 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/internal/memorystore" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/schema" ) @@ -38,8 +37,10 @@ type MetricDataRepository interface { LoadNodeListData(cluster, subCluster string, nodes, metrics []string, scopes []schema.MetricScope, resolution int, from, to time.Time, ctx context.Context) (map[string]schema.JobData, error) } -var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} -var upstreamMetricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} +var ( + metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} + upstreamMetricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} +) func Init() error { for _, cluster := range config.Clusters { @@ -56,9 +57,6 @@ func Init() error { switch kind.Kind { case "cc-metric-store": mdr = &CCMetricStore{} - case "cc-metric-store-internal": - mdr = &CCMetricStoreInternal{} - memorystore.InternalCCMSFlag = true case "prometheus": mdr = &PrometheusDataRepository{} case "test": diff --git a/internal/metricdata/utils.go b/internal/metricdata/utils.go index 0b2bb7e..c5b99e8 100644 --- a/internal/metricdata/utils.go +++ b/internal/metricdata/utils.go @@ -72,47 +72,3 @@ func (tmdr *TestMetricDataRepository) LoadNodeListData( ) (map[string]schema.JobData, error) { panic("TODO") } - -func DeepCopy(jdTemp schema.JobData) schema.JobData { - jd := make(schema.JobData, len(jdTemp)) - for k, v := range jdTemp { - jd[k] = make(map[schema.MetricScope]*schema.JobMetric, len(jdTemp[k])) - for k_, v_ := range v { - jd[k][k_] = new(schema.JobMetric) - jd[k][k_].Series = make([]schema.Series, len(v_.Series)) - for i := 0; i < len(v_.Series); i += 1 { - jd[k][k_].Series[i].Data = make([]schema.Float, len(v_.Series[i].Data)) - copy(jd[k][k_].Series[i].Data, v_.Series[i].Data) - jd[k][k_].Series[i].Hostname = v_.Series[i].Hostname - jd[k][k_].Series[i].Id = v_.Series[i].Id - jd[k][k_].Series[i].Statistics.Avg = v_.Series[i].Statistics.Avg - jd[k][k_].Series[i].Statistics.Min = v_.Series[i].Statistics.Min - jd[k][k_].Series[i].Statistics.Max = v_.Series[i].Statistics.Max - } - jd[k][k_].Timestep = v_.Timestep - jd[k][k_].Unit.Base = v_.Unit.Base - jd[k][k_].Unit.Prefix = v_.Unit.Prefix - if v_.StatisticsSeries != nil { - // Init Slices - jd[k][k_].StatisticsSeries = new(schema.StatsSeries) - jd[k][k_].StatisticsSeries.Max = make([]schema.Float, len(v_.StatisticsSeries.Max)) - jd[k][k_].StatisticsSeries.Min = make([]schema.Float, len(v_.StatisticsSeries.Min)) - jd[k][k_].StatisticsSeries.Median = make([]schema.Float, len(v_.StatisticsSeries.Median)) - jd[k][k_].StatisticsSeries.Mean = make([]schema.Float, len(v_.StatisticsSeries.Mean)) - // Copy Data - copy(jd[k][k_].StatisticsSeries.Max, v_.StatisticsSeries.Max) - copy(jd[k][k_].StatisticsSeries.Min, v_.StatisticsSeries.Min) - copy(jd[k][k_].StatisticsSeries.Median, v_.StatisticsSeries.Median) - copy(jd[k][k_].StatisticsSeries.Mean, v_.StatisticsSeries.Mean) - // Handle Percentiles - for k__, v__ := range v_.StatisticsSeries.Percentiles { - jd[k][k_].StatisticsSeries.Percentiles[k__] = make([]schema.Float, len(v__)) - copy(jd[k][k_].StatisticsSeries.Percentiles[k__], v__) - } - } else { - jd[k][k_].StatisticsSeries = v_.StatisticsSeries - } - } - } - return jd -} diff --git a/internal/metricdispatcher/dataLoader.go b/internal/metricdispatcher/dataLoader.go new file mode 100644 index 0000000..18286ca --- /dev/null +++ b/internal/metricdispatcher/dataLoader.go @@ -0,0 +1,490 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +// Package metricdispatcher provides a unified interface for loading and caching job metric data. +// +// This package serves as a central dispatcher that routes metric data requests to the appropriate +// backend based on job state. For running jobs, data is fetched from the metric store (e.g., cc-metric-store). +// For completed jobs, data is retrieved from the file-based job archive. +// +// # Key Features +// +// - Automatic backend selection based on job state (running vs. archived) +// - LRU cache for performance optimization (128 MB default cache size) +// - Data resampling using Largest Triangle Three Bucket algorithm for archived data +// - Automatic statistics series generation for jobs with many nodes +// - Support for scoped metrics (node, socket, accelerator, core) +// +// # Cache Behavior +// +// Cached data has different TTL (time-to-live) values depending on job state: +// - Running jobs: 2 minutes (data changes frequently) +// - Completed jobs: 5 hours (data is static) +// +// The cache key is based on job ID, state, requested metrics, scopes, and resolution. +// +// # Usage +// +// The primary entry point is LoadData, which automatically handles both running and archived jobs: +// +// jobData, err := metricdispatcher.LoadData(job, metrics, scopes, ctx, resolution) +// if err != nil { +// // Handle error +// } +// +// For statistics only, use LoadJobStats, LoadScopedJobStats, or LoadAverages depending on the required format. +package metricdispatcher + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/memorystore" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + cclog "github.com/ClusterCockpit/cc-lib/ccLogger" + "github.com/ClusterCockpit/cc-lib/lrucache" + "github.com/ClusterCockpit/cc-lib/resampler" + "github.com/ClusterCockpit/cc-lib/schema" +) + +// cache is an LRU cache with 128 MB capacity for storing loaded job metric data. +// The cache reduces load on both the metric store and archive backends. +var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) + +// cacheKey generates a unique cache key for a job's metric data based on job ID, state, +// requested metrics, scopes, and resolution. Duration and StartTime are intentionally excluded +// because job.ID is more unique and the cache TTL ensures entries don't persist indefinitely. +func cacheKey( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + resolution int, +) string { + return fmt.Sprintf("%d(%s):[%v],[%v]-%d", + job.ID, job.State, metrics, scopes, resolution) +} + +// LoadData retrieves metric data for a job from the appropriate backend (memory store for running jobs, +// archive for completed jobs) and applies caching, resampling, and statistics generation as needed. +// +// For running jobs or when archive is disabled, data is fetched from the metric store. +// For completed archived jobs, data is loaded from the job archive and resampled if needed. +// +// Parameters: +// - job: The job for which to load metric data +// - metrics: List of metric names to load (nil loads all metrics for the cluster) +// - scopes: Metric scopes to include (nil defaults to node scope) +// - ctx: Context for cancellation and timeouts +// - resolution: Target number of data points for resampling (only applies to archived data) +// +// Returns the loaded job data and any error encountered. For partial errors (some metrics failed), +// the function returns the successfully loaded data with a warning logged. +func LoadData(job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context, + resolution int, +) (schema.JobData, error) { + data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ any, ttl time.Duration, size int) { + var jd schema.JobData + var err error + + if job.State == schema.JobStateRunning || + job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || + config.Keys.DisableArchive { + + if scopes == nil { + scopes = append(scopes, schema.MetricScopeNode) + } + + if metrics == nil { + cluster := archive.GetCluster(job.Cluster) + for _, mc := range cluster.MetricConfig { + metrics = append(metrics, mc.Name) + } + } + + jd, err = memorystore.LoadData(job, metrics, scopes, ctx, resolution) + if err != nil { + if len(jd) != 0 { + cclog.Warnf("partial error loading metrics from store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + } else { + cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return err, 0, 0 + } + } + size = jd.Size() + } else { + var jdTemp schema.JobData + jdTemp, err = archive.GetHandle().LoadJobData(job) + if err != nil { + cclog.Errorf("failed to load job data from archive for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return err, 0, 0 + } + + jd = deepCopy(jdTemp) + + // Resample archived data using Largest Triangle Three Bucket algorithm to reduce data points + // to the requested resolution, improving transfer performance and client-side rendering. + for _, v := range jd { + for _, v_ := range v { + timestep := int64(0) + for i := 0; i < len(v_.Series); i += 1 { + v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, int64(v_.Timestep), int64(resolution)) + if err != nil { + return err, 0, 0 + } + } + v_.Timestep = int(timestep) + } + } + + // Filter job data to only include requested metrics and scopes, avoiding unnecessary data transfer. + if metrics != nil || scopes != nil { + if metrics == nil { + metrics = make([]string, 0, len(jd)) + for k := range jd { + metrics = append(metrics, k) + } + } + + res := schema.JobData{} + for _, metric := range metrics { + if perscope, ok := jd[metric]; ok { + if len(perscope) > 1 { + subset := make(map[schema.MetricScope]*schema.JobMetric) + for _, scope := range scopes { + if jm, ok := perscope[scope]; ok { + subset[scope] = jm + } + } + + if len(subset) > 0 { + perscope = subset + } + } + + res[metric] = perscope + } + } + jd = res + } + size = jd.Size() + } + + ttl = 5 * time.Hour + if job.State == schema.JobStateRunning { + ttl = 2 * time.Minute + } + + // Generate statistics series for jobs with many nodes to enable min/median/max graphs + // instead of overwhelming the UI with individual node lines. Note that newly calculated + // statistics use min/median/max, while archived statistics may use min/mean/max. + const maxSeriesSize int = 15 + for _, scopes := range jd { + for _, jm := range scopes { + if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize { + continue + } + + jm.AddStatisticsSeries() + } + } + + nodeScopeRequested := false + for _, scope := range scopes { + if scope == schema.MetricScopeNode { + nodeScopeRequested = true + } + } + + if nodeScopeRequested { + jd.AddNodeScope("flops_any") + jd.AddNodeScope("mem_bw") + } + + // Round Resulting Stat Values + jd.RoundMetricStats() + + return jd, ttl, size + }) + + if err, ok := data.(error); ok { + cclog.Errorf("error in cached dataset for job %d: %s", job.JobID, err.Error()) + return nil, err + } + + return data.(schema.JobData), nil +} + +// LoadAverages computes average values for the specified metrics across all nodes of a job. +// For running jobs, it loads statistics from the metric store. For completed jobs, it uses +// the pre-calculated averages from the job archive. The results are appended to the data slice. +func LoadAverages( + job *schema.Job, + metrics []string, + data [][]schema.Float, + ctx context.Context, +) error { + if job.State != schema.JobStateRunning && !config.Keys.DisableArchive { + return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here? + } + + stats, err := memorystore.LoadStats(job, metrics, ctx) + if err != nil { + cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return err + } + + for i, m := range metrics { + nodes, ok := stats[m] + if !ok { + data[i] = append(data[i], schema.NaN) + continue + } + + sum := 0.0 + for _, node := range nodes { + sum += node.Avg + } + data[i] = append(data[i], schema.Float(sum)) + } + + return nil +} + +// LoadScopedJobStats retrieves job statistics organized by metric scope (node, socket, core, accelerator). +// For running jobs, statistics are computed from the metric store. For completed jobs, pre-calculated +// statistics are loaded from the job archive. +func LoadScopedJobStats( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context, +) (schema.ScopedJobStats, error) { + if job.State != schema.JobStateRunning && !config.Keys.DisableArchive { + return archive.LoadScopedStatsFromArchive(job, metrics, scopes) + } + + scopedStats, err := memorystore.LoadScopedStats(job, metrics, scopes, ctx) + if err != nil { + cclog.Errorf("failed to load scoped statistics from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return nil, err + } + + return scopedStats, nil +} + +// LoadJobStats retrieves aggregated statistics (min/avg/max) for each requested metric across all job nodes. +// For running jobs, statistics are computed from the metric store. For completed jobs, pre-calculated +// statistics are loaded from the job archive. +func LoadJobStats( + job *schema.Job, + metrics []string, + ctx context.Context, +) (map[string]schema.MetricStatistics, error) { + if job.State != schema.JobStateRunning && !config.Keys.DisableArchive { + return archive.LoadStatsFromArchive(job, metrics) + } + + data := make(map[string]schema.MetricStatistics, len(metrics)) + + stats, err := memorystore.LoadStats(job, metrics, ctx) + if err != nil { + cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return data, err + } + + for _, m := range metrics { + sum, avg, min, max := 0.0, 0.0, 0.0, 0.0 + nodes, ok := stats[m] + if !ok { + data[m] = schema.MetricStatistics{Min: min, Avg: avg, Max: max} + continue + } + + for _, node := range nodes { + sum += node.Avg + min = math.Min(min, node.Min) + max = math.Max(max, node.Max) + } + + data[m] = schema.MetricStatistics{ + Avg: (math.Round((sum/float64(job.NumNodes))*100) / 100), + Min: (math.Round(min*100) / 100), + Max: (math.Round(max*100) / 100), + } + } + + return data, nil +} + +// LoadNodeData retrieves metric data for specific nodes in a cluster within a time range. +// This is used for node monitoring views and system status pages. Data is always fetched from +// the metric store (not the archive) since it's for current/recent node status monitoring. +// +// Returns a nested map structure: node -> metric -> scoped data. +func LoadNodeData( + cluster string, + metrics, nodes []string, + scopes []schema.MetricScope, + from, to time.Time, + ctx context.Context, +) (map[string]map[string][]*schema.JobMetric, error) { + if metrics == nil { + for _, m := range archive.GetCluster(cluster).MetricConfig { + metrics = append(metrics, m.Name) + } + } + + data, err := memorystore.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) + if err != nil { + if len(data) != 0 { + cclog.Warnf("partial error loading node data from metric store for cluster %s: %s", cluster, err.Error()) + } else { + cclog.Errorf("failed to load node data from metric store for cluster %s: %s", cluster, err.Error()) + return nil, err + } + } + + if data == nil { + return nil, fmt.Errorf("metric store for cluster '%s' does not support node data queries", cluster) + } + + return data, nil +} + +// LoadNodeListData retrieves time-series metric data for multiple nodes within a time range, +// with optional resampling and automatic statistics generation for large datasets. +// This is used for comparing multiple nodes or displaying node status over time. +// +// Returns a map of node names to their job-like metric data structures. +func LoadNodeListData( + cluster, subCluster string, + nodes []string, + metrics []string, + scopes []schema.MetricScope, + resolution int, + from, to time.Time, + ctx context.Context, +) (map[string]schema.JobData, error) { + if metrics == nil { + for _, m := range archive.GetCluster(cluster).MetricConfig { + metrics = append(metrics, m.Name) + } + } + + data, err := memorystore.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx) + if err != nil { + if len(data) != 0 { + cclog.Warnf("partial error loading node list data from metric store for cluster %s, subcluster %s: %s", + cluster, subCluster, err.Error()) + } else { + cclog.Errorf("failed to load node list data from metric store for cluster %s, subcluster %s: %s", + cluster, subCluster, err.Error()) + return nil, err + } + } + + // Generate statistics series for datasets with many series to improve visualization performance. + // Statistics are calculated as min/median/max. + const maxSeriesSize int = 8 + for _, jd := range data { + for _, scopes := range jd { + for _, jm := range scopes { + if jm.StatisticsSeries != nil || len(jm.Series) < maxSeriesSize { + continue + } + jm.AddStatisticsSeries() + } + } + } + + if data == nil { + return nil, fmt.Errorf("metric store for cluster '%s' does not support node list queries", cluster) + } + + return data, nil +} + +// deepCopy creates a deep copy of JobData to prevent cache corruption when modifying +// archived data (e.g., during resampling). This ensures the cached archive data remains +// immutable while allowing per-request transformations. +func deepCopy(source schema.JobData) schema.JobData { + result := make(schema.JobData, len(source)) + + for metricName, scopeMap := range source { + result[metricName] = make(map[schema.MetricScope]*schema.JobMetric, len(scopeMap)) + + for scope, jobMetric := range scopeMap { + result[metricName][scope] = copyJobMetric(jobMetric) + } + } + + return result +} + +func copyJobMetric(src *schema.JobMetric) *schema.JobMetric { + dst := &schema.JobMetric{ + Timestep: src.Timestep, + Unit: src.Unit, + Series: make([]schema.Series, len(src.Series)), + } + + for i := range src.Series { + dst.Series[i] = copySeries(&src.Series[i]) + } + + if src.StatisticsSeries != nil { + dst.StatisticsSeries = copyStatisticsSeries(src.StatisticsSeries) + } + + return dst +} + +func copySeries(src *schema.Series) schema.Series { + dst := schema.Series{ + Hostname: src.Hostname, + Id: src.Id, + Statistics: src.Statistics, + Data: make([]schema.Float, len(src.Data)), + } + + copy(dst.Data, src.Data) + return dst +} + +func copyStatisticsSeries(src *schema.StatsSeries) *schema.StatsSeries { + dst := &schema.StatsSeries{ + Min: make([]schema.Float, len(src.Min)), + Mean: make([]schema.Float, len(src.Mean)), + Median: make([]schema.Float, len(src.Median)), + Max: make([]schema.Float, len(src.Max)), + } + + copy(dst.Min, src.Min) + copy(dst.Mean, src.Mean) + copy(dst.Median, src.Median) + copy(dst.Max, src.Max) + + if len(src.Percentiles) > 0 { + dst.Percentiles = make(map[int][]schema.Float, len(src.Percentiles)) + for percentile, values := range src.Percentiles { + dst.Percentiles[percentile] = make([]schema.Float, len(values)) + copy(dst.Percentiles[percentile], values) + } + } + + return dst +} diff --git a/internal/metricdispatcher/dataLoader_test.go b/internal/metricdispatcher/dataLoader_test.go new file mode 100644 index 0000000..8df39d3 --- /dev/null +++ b/internal/metricdispatcher/dataLoader_test.go @@ -0,0 +1,125 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package metricdispatcher + +import ( + "testing" + + "github.com/ClusterCockpit/cc-lib/schema" +) + +func TestDeepCopy(t *testing.T) { + nodeId := "0" + original := schema.JobData{ + "cpu_load": { + schema.MetricScopeNode: &schema.JobMetric{ + Timestep: 60, + Unit: schema.Unit{Base: "load", Prefix: ""}, + Series: []schema.Series{ + { + Hostname: "node001", + Id: &nodeId, + Data: []schema.Float{1.0, 2.0, 3.0}, + Statistics: schema.MetricStatistics{ + Min: 1.0, + Avg: 2.0, + Max: 3.0, + }, + }, + }, + StatisticsSeries: &schema.StatsSeries{ + Min: []schema.Float{1.0, 1.5, 2.0}, + Mean: []schema.Float{2.0, 2.5, 3.0}, + Median: []schema.Float{2.0, 2.5, 3.0}, + Max: []schema.Float{3.0, 3.5, 4.0}, + Percentiles: map[int][]schema.Float{ + 25: {1.5, 2.0, 2.5}, + 75: {2.5, 3.0, 3.5}, + }, + }, + }, + }, + } + + copied := deepCopy(original) + + original["cpu_load"][schema.MetricScopeNode].Series[0].Data[0] = 999.0 + original["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Min[0] = 888.0 + original["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Percentiles[25][0] = 777.0 + + if copied["cpu_load"][schema.MetricScopeNode].Series[0].Data[0] != 1.0 { + t.Errorf("Series data was not deeply copied: got %v, want 1.0", + copied["cpu_load"][schema.MetricScopeNode].Series[0].Data[0]) + } + + if copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Min[0] != 1.0 { + t.Errorf("StatisticsSeries was not deeply copied: got %v, want 1.0", + copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Min[0]) + } + + if copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Percentiles[25][0] != 1.5 { + t.Errorf("Percentiles was not deeply copied: got %v, want 1.5", + copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Percentiles[25][0]) + } + + if copied["cpu_load"][schema.MetricScopeNode].Timestep != 60 { + t.Errorf("Timestep not copied correctly: got %v, want 60", + copied["cpu_load"][schema.MetricScopeNode].Timestep) + } + + if copied["cpu_load"][schema.MetricScopeNode].Series[0].Hostname != "node001" { + t.Errorf("Hostname not copied correctly: got %v, want node001", + copied["cpu_load"][schema.MetricScopeNode].Series[0].Hostname) + } +} + +func TestDeepCopyNilStatisticsSeries(t *testing.T) { + original := schema.JobData{ + "mem_used": { + schema.MetricScopeNode: &schema.JobMetric{ + Timestep: 60, + Series: []schema.Series{ + { + Hostname: "node001", + Data: []schema.Float{1.0, 2.0}, + }, + }, + StatisticsSeries: nil, + }, + }, + } + + copied := deepCopy(original) + + if copied["mem_used"][schema.MetricScopeNode].StatisticsSeries != nil { + t.Errorf("StatisticsSeries should be nil, got %v", + copied["mem_used"][schema.MetricScopeNode].StatisticsSeries) + } +} + +func TestDeepCopyEmptyPercentiles(t *testing.T) { + original := schema.JobData{ + "cpu_load": { + schema.MetricScopeNode: &schema.JobMetric{ + Timestep: 60, + Series: []schema.Series{}, + StatisticsSeries: &schema.StatsSeries{ + Min: []schema.Float{1.0}, + Mean: []schema.Float{2.0}, + Median: []schema.Float{2.0}, + Max: []schema.Float{3.0}, + Percentiles: nil, + }, + }, + }, + } + + copied := deepCopy(original) + + if copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Percentiles != nil { + t.Errorf("Percentiles should be nil when source is nil/empty") + } +} diff --git a/internal/repository/stats.go b/internal/repository/stats.go index c92f519..81b212c 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -12,7 +12,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatcher" "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/schema" @@ -766,7 +766,7 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram( continue } - if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil { + if err := metricdispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil { cclog.Errorf("Error while loading averages for histogram: %s", err) return nil }