From b307e885ce480b2c18413617984f1d3222093ee6 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Jan 2026 10:02:07 +0100 Subject: [PATCH] feat: Add support for multiple external metric stores --- cmd/cc-backend/main.go | 21 ++++ configs/config.json | 34 ++++-- internal/api/api_test.go | 8 ++ internal/metricdispatch/configSchema.go | 29 +++++ internal/metricdispatch/dataLoader.go | 56 +++++++-- internal/metricdispatch/metricdata.go | 112 ++++++++++++++++++ internal/metricstoreclient/cc-metric-store.go | 81 +++---------- .../taskmanager/updateFootprintService.go | 11 +- pkg/metricstore/query.go | 15 ++- 9 files changed, 280 insertions(+), 87 deletions(-) create mode 100644 internal/metricdispatch/configSchema.go create mode 100755 internal/metricdispatch/metricdata.go diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 0e93944c..9ded95ba 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -24,6 +24,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/importer" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatch" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/tagger" "github.com/ClusterCockpit/cc-backend/internal/taskmanager" @@ -317,6 +318,7 @@ func runServer(ctx context.Context) error { var wg sync.WaitGroup // Initialize metric store if configuration is provided + haveMetricstore := false mscfg := ccconf.GetPackageConfig("metric-store") if mscfg != nil { metricstore.Init(mscfg, &wg) @@ -325,7 +327,26 @@ func runServer(ctx context.Context) error { ms := metricstore.GetMemoryStore() jobRepo := repository.GetJobRepository() ms.SetNodeProvider(jobRepo) + metricstore.MetricStoreHandle = &metricstore.InternalMetricStore{} + haveMetricstore = true } else { + metricstore.MetricStoreHandle = nil + cclog.Debug("missing internal metricstore configuration") + } + + // Initialize external metric stores if configuration is provided + mscfg = ccconf.GetPackageConfig("metric-store-external") + if mscfg != nil { + err := metricdispatch.Init(mscfg) + + if err != nil { + cclog.Debugf("initializing metricdispatch: %v", err) + } else { + haveMetricstore = true + } + } + + if !haveMetricstore { return fmt.Errorf("missing metricstore configuration") } diff --git a/configs/config.json b/configs/config.json index 00cca0af..584baed3 100644 --- a/configs/config.json +++ b/configs/config.json @@ -5,18 +5,13 @@ "https-key-file": "/etc/letsencrypt/live/url/privkey.pem", "user": "clustercockpit", "group": "clustercockpit", - "api-allowed-ips": [ - "*" - ], + "api-allowed-ips": ["*"], "short-running-jobs-duration": 300, "enable-job-taggers": true, "resampling": { "minimum-points": 600, "trigger": 180, - "resolutions": [ - 240, - 60 - ] + "resolutions": [240, 60] }, "api-subjects": { "subject-job-event": "cc.job.event", @@ -50,6 +45,28 @@ "location": "./var/archive" } }, + "metric-store-external": [ + { + "scope": "*", + "url": "http://x.x.x.x:8082", + "token": "MySecret" + }, + { + "scope": "fritz", + "url": "http://x.x.x.x:8084", + "token": "MySecret" + }, + { + "scope": "fritz-spr1tb", + "url": "http://x.x.x.x:8083", + "token": "MySecret" + }, + { + "scope": "alex", + "url": "http://x.x.x.x:8084", + "token": "MySecret" + } + ], "metric-store": { "checkpoints": { "interval": "12h", @@ -74,4 +91,5 @@ ] }, "ui-file": "ui-config.json" -} \ No newline at end of file +} + diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 7153cd1d..4a7fc07c 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -45,6 +45,13 @@ func setup(t *testing.T) *api.RestAPI { "api-allowed-ips": [ "*" ] + }, + "metric-store": { + "checkpoints": { + "interval": "12h" + }, + "retention-in-memory": "48h", + "memory-cap": 100 }, "archive": { "kind": "file", @@ -143,6 +150,7 @@ func setup(t *testing.T) *api.RestAPI { } ccconf.Init(cfgFilePath) + metricstore.MetricStoreHandle = &metricstore.InternalMetricStore{} // Load and check main configuration if cfg := ccconf.GetPackageConfig("main"); cfg != nil { diff --git a/internal/metricdispatch/configSchema.go b/internal/metricdispatch/configSchema.go new file mode 100644 index 00000000..6dec69bf --- /dev/null +++ b/internal/metricdispatch/configSchema.go @@ -0,0 +1,29 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package metricdispatch + +const configSchema = `{ + "type": "array", + "description": "Array of metric store configurations with scope-based routing.", + "items": { + "type": "object", + "properties": { + "scope": { + "description": "Scope identifier for routing metrics (e.g., cluster name, '*' for default)", + "type": "string" + }, + "url": { + "description": "URL of the metric store endpoint", + "type": "string" + }, + "token": { + "description": "Authentication token for the metric store", + "type": "string" + } + }, + "required": ["scope", "url", "token"] + } +}` diff --git a/internal/metricdispatch/dataLoader.go b/internal/metricdispatch/dataLoader.go index 91f5ecde..c52088ab 100644 --- a/internal/metricdispatch/dataLoader.go +++ b/internal/metricdispatch/dataLoader.go @@ -44,7 +44,6 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/pkg/archive" - "github.com/ClusterCockpit/cc-backend/pkg/metricstore" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/lrucache" "github.com/ClusterCockpit/cc-lib/v2/resampler" @@ -96,6 +95,13 @@ func LoadData(job *schema.Job, if job.State == schema.JobStateRunning || job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving { + ms, err := GetMetricDataRepo(job.Cluster, job.SubCluster) + if err != nil { + cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return err, 0, 0 + } + if scopes == nil { scopes = append(scopes, schema.MetricScopeNode) } @@ -107,7 +113,7 @@ func LoadData(job *schema.Job, } } - jd, err = metricstore.LoadData(job, metrics, scopes, ctx, resolution) + jd, err = ms.LoadData(job, metrics, scopes, ctx, resolution) if err != nil { if len(jd) != 0 { cclog.Warnf("partial error loading metrics from store for job %d (user: %s, project: %s): %s", @@ -236,7 +242,14 @@ func LoadAverages( return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here? } - stats, err := metricstore.LoadStats(job, metrics, ctx) + ms, err := GetMetricDataRepo(job.Cluster, job.SubCluster) + if err != nil { + cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return err + } + + stats, err := ms.LoadStats(job, metrics, ctx) if err != nil { cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s", job.JobID, job.User, job.Project, err.Error()) @@ -273,7 +286,14 @@ func LoadScopedJobStats( return archive.LoadScopedStatsFromArchive(job, metrics, scopes) } - scopedStats, err := metricstore.LoadScopedStats(job, metrics, scopes, ctx) + ms, err := GetMetricDataRepo(job.Cluster, job.SubCluster) + if err != nil { + cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return nil, err + } + + scopedStats, err := ms.LoadScopedStats(job, metrics, scopes, ctx) if err != nil { cclog.Errorf("failed to load scoped statistics from metric store for job %d (user: %s, project: %s): %s", job.JobID, job.User, job.Project, err.Error()) @@ -295,9 +315,16 @@ func LoadJobStats( return archive.LoadStatsFromArchive(job, metrics) } + ms, err := GetMetricDataRepo(job.Cluster, job.SubCluster) + if err != nil { + cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + return nil, err + } + data := make(map[string]schema.MetricStatistics, len(metrics)) - stats, err := metricstore.LoadStats(job, metrics, ctx) + stats, err := ms.LoadStats(job, metrics, ctx) if err != nil { cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s", job.JobID, job.User, job.Project, err.Error()) @@ -333,6 +360,7 @@ func LoadJobStats( // the metric store (not the archive) since it's for current/recent node status monitoring. // // Returns a nested map structure: node -> metric -> scoped data. +// FIXME: Add support for subcluster specific cc-metric-stores func LoadNodeData( cluster string, metrics, nodes []string, @@ -346,7 +374,14 @@ func LoadNodeData( } } - data, err := metricstore.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) + ms, err := GetMetricDataRepo(cluster, "") + if err != nil { + cclog.Errorf("failed to load node data from metric store: %s", + err.Error()) + return nil, err + } + + data, err := ms.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) if err != nil { if len(data) != 0 { cclog.Warnf("partial error loading node data from metric store for cluster %s: %s", cluster, err.Error()) @@ -383,7 +418,14 @@ func LoadNodeListData( } } - data, err := metricstore.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx) + ms, err := GetMetricDataRepo(cluster, subCluster) + if err != nil { + cclog.Errorf("failed to load node data from metric store: %s", + err.Error()) + return nil, err + } + + data, err := ms.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx) if err != nil { if len(data) != 0 { cclog.Warnf("partial error loading node list data from metric store for cluster %s, subcluster %s: %s", diff --git a/internal/metricdispatch/metricdata.go b/internal/metricdispatch/metricdata.go new file mode 100755 index 00000000..9626ac86 --- /dev/null +++ b/internal/metricdispatch/metricdata.go @@ -0,0 +1,112 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package metricdispatch + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/config" + ccms "github.com/ClusterCockpit/cc-backend/internal/metricstoreclient" + "github.com/ClusterCockpit/cc-backend/pkg/metricstore" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" +) + +type MetricDataRepository interface { + // Return the JobData for the given job, only with the requested metrics. + LoadData(job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context, + resolution int) (schema.JobData, error) + + // Return a map of metrics to a map of nodes to the metric statistics of the job. node scope only. + LoadStats(job *schema.Job, + metrics []string, + ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) + + // Return a map of metrics to a map of scopes to the scoped metric statistics of the job. + LoadScopedStats(job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context) (schema.ScopedJobStats, error) + + // Return a map of hosts to a map of metrics at the requested scopes (currently only node) for that node. + LoadNodeData(cluster string, + metrics, nodes []string, + scopes []schema.MetricScope, + from, to time.Time, + ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) + + // Return a map of hosts to a map of metrics to a map of scopes for multiple nodes. + LoadNodeListData(cluster, subCluster string, + nodes []string, + metrics []string, + scopes []schema.MetricScope, + resolution int, + from, to time.Time, + ctx context.Context) (map[string]schema.JobData, error) +} + +type CCMetricStoreConfig struct { + Scope string `json:"scope"` + URL string `json:"url"` + Token string `json:"token"` +} + +var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} + +func Init(rawConfig json.RawMessage) error { + if rawConfig != nil { + var configs []CCMetricStoreConfig + config.Validate(configSchema, rawConfig) + dec := json.NewDecoder(bytes.NewReader(rawConfig)) + dec.DisallowUnknownFields() + if err := dec.Decode(&configs); err != nil { + return fmt.Errorf("[METRICDISPATCH]> Metric Store Config Init: Could not decode config file '%s' Error: %s", rawConfig, err.Error()) + } + + if len(configs) == 0 { + return fmt.Errorf("[METRICDISPATCH]> No metric store configurations found in config file") + } + + for _, config := range configs { + metricDataRepos[config.Scope] = ccms.NewCCMetricStore(config.URL, config.Token) + } + } + + return nil +} + +func GetMetricDataRepo(cluster string, subcluster string) (MetricDataRepository, error) { + var repo MetricDataRepository + var ok bool + + key := cluster + "-" + subcluster + repo, ok = metricDataRepos[key] + + if !ok { + repo, ok = metricDataRepos[cluster] + + if !ok { + repo, ok = metricDataRepos["*"] + + if !ok { + if metricstore.MetricStoreHandle == nil { + return nil, fmt.Errorf("[METRICDISPATCH]> no metric data repository configured '%s'", key) + } + + repo = metricstore.MetricStoreHandle + cclog.Debugf("[METRICDISPATCH]> Using internal metric data repository for '%s'", key) + } + } + } + + return repo, nil +} diff --git a/internal/metricstoreclient/cc-metric-store.go b/internal/metricstoreclient/cc-metric-store.go index c1505fdc..ded644ea 100644 --- a/internal/metricstoreclient/cc-metric-store.go +++ b/internal/metricstoreclient/cc-metric-store.go @@ -17,8 +17,7 @@ // // # Basic Usage // -// store := &CCMetricStore{} -// store.Init("http://localhost:8080", "jwt-token") +// store := NewCCMetricStore("http://localhost:8080", "jwt-token") // // // Load job data // jobData, err := store.LoadData(job, metrics, scopes, ctx, resolution) @@ -60,11 +59,9 @@ import ( "encoding/json" "fmt" "net/http" - "sort" "strings" "time" - "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" @@ -124,15 +121,17 @@ type APIMetricData struct { Max schema.Float `json:"max"` // Maximum value in time range } -// Init initializes the CCMetricStore client with connection details. +// NewCCMetricStore creates and initializes a new CCMetricStore client. // The url parameter should include the protocol and port (e.g., "http://localhost:8080"). // The token parameter is a JWT used for Bearer authentication; pass empty string if auth is disabled. -func (ccms *CCMetricStore) Init(url string, token string) { - ccms.url = url - ccms.queryEndpoint = fmt.Sprintf("%s/api/query", url) - ccms.jwt = token - ccms.client = http.Client{ - Timeout: 10 * time.Second, +func NewCCMetricStore(url string, token string) *CCMetricStore { + return &CCMetricStore{ + url: url, + queryEndpoint: fmt.Sprintf("%s/api/query", url), + jwt: token, + client: http.Client{ + Timeout: 10 * time.Second, + }, } } @@ -547,64 +546,18 @@ func (ccms *CCMetricStore) LoadNodeData( // - HasNextPage flag indicating if more pages are available // - Error (may be partial error with some data returned) func (ccms *CCMetricStore) LoadNodeListData( - cluster, subCluster, nodeFilter string, + cluster, subCluster string, + nodes []string, metrics []string, scopes []schema.MetricScope, resolution int, from, to time.Time, - page *model.PageRequest, ctx context.Context, -) (map[string]schema.JobData, int, bool, error) { - // 0) Init additional vars - totalNodes := 0 - hasNextPage := false - - // 1) Get list of all nodes - var nodes []string - if subCluster != "" { - scNodes := archive.NodeLists[cluster][subCluster] - nodes = scNodes.PrintList() - } else { - subClusterNodeLists := archive.NodeLists[cluster] - for _, nodeList := range subClusterNodeLists { - nodes = append(nodes, nodeList.PrintList()...) - } - } - - // 2) Filter nodes - if nodeFilter != "" { - filteredNodes := []string{} - for _, node := range nodes { - if strings.Contains(node, nodeFilter) { - filteredNodes = append(filteredNodes, node) - } - } - nodes = filteredNodes - } - - // 2.1) Count total nodes && Sort nodes -> Sorting invalidated after ccms return ... - totalNodes = len(nodes) - sort.Strings(nodes) - - // 3) Apply paging - if len(nodes) > page.ItemsPerPage { - start := (page.Page - 1) * page.ItemsPerPage - end := start + page.ItemsPerPage - if end > len(nodes) { - end = len(nodes) - hasNextPage = false - } else { - hasNextPage = true - } - nodes = nodes[start:end] - } - - // Note: Order of node data is not guaranteed after this point, but contents match page and filter criteria - +) (map[string]schema.JobData, error) { queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, resolution) if err != nil { cclog.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error()) - return nil, totalNodes, hasNextPage, err + return nil, err } req := APIQueryRequest{ @@ -619,7 +572,7 @@ func (ccms *CCMetricStore) LoadNodeListData( resBody, err := ccms.doRequest(ctx, &req) if err != nil { cclog.Errorf("Error while performing request for cluster %s: %s", cluster, err.Error()) - return nil, totalNodes, hasNextPage, err + return nil, err } var errors []string @@ -694,10 +647,10 @@ func (ccms *CCMetricStore) LoadNodeListData( if len(errors) != 0 { /* Returns list of "partial errors" */ - return data, totalNodes, hasNextPage, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) + return data, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) } - return data, totalNodes, hasNextPage, nil + return data, nil } // sanitizeStats replaces NaN values in statistics with 0 to enable JSON marshaling. diff --git a/internal/taskmanager/updateFootprintService.go b/internal/taskmanager/updateFootprintService.go index fad278e2..71bf4089 100644 --- a/internal/taskmanager/updateFootprintService.go +++ b/internal/taskmanager/updateFootprintService.go @@ -10,8 +10,8 @@ import ( "math" "time" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatch" "github.com/ClusterCockpit/cc-backend/pkg/archive" - "github.com/ClusterCockpit/cc-backend/pkg/metricstore" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" sq "github.com/Masterminds/squirrel" @@ -66,7 +66,14 @@ func RegisterFootprintWorker() { sJob := time.Now() - jobStats, err := metricstore.LoadStats(job, allMetrics, context.Background()) + ms, err := metricdispatch.GetMetricDataRepo(job.Cluster, job.SubCluster) + if err != nil { + cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s", + job.JobID, job.User, job.Project, err.Error()) + continue + } + + jobStats, err := ms.LoadStats(job, allMetrics, context.Background()) if err != nil { cclog.Errorf("error wile loading job data stats for footprint update: %v", err) ce++ diff --git a/pkg/metricstore/query.go b/pkg/metricstore/query.go index a031cb1e..62216e59 100644 --- a/pkg/metricstore/query.go +++ b/pkg/metricstore/query.go @@ -38,6 +38,10 @@ import ( "github.com/ClusterCockpit/cc-lib/v2/schema" ) +type InternalMetricStore struct{} + +var MetricStoreHandle *InternalMetricStore + // TestLoadDataCallback allows tests to override LoadData behavior for testing purposes. // When set to a non-nil function, LoadData will call this function instead of the default implementation. var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) @@ -65,7 +69,7 @@ var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema // Example: // // jobData, err := LoadData(job, []string{"cpu_load", "mem_used"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 60) -func LoadData( +func (ccms *InternalMetricStore) LoadData( job *schema.Job, metrics []string, scopes []schema.MetricScope, @@ -502,7 +506,7 @@ func buildQueries( // Returns: // - Map of metric → hostname → statistics // - Error on query building or fetching failure -func LoadStats( +func (ccms *InternalMetricStore) LoadStats( job *schema.Job, metrics []string, ctx context.Context, @@ -574,7 +578,7 @@ func LoadStats( // Returns: // - ScopedJobStats: Map of metric → scope → []ScopedStats (with hostname and ID) // - Error or partial error listing failed queries -func LoadScopedStats( +func (ccms *InternalMetricStore) LoadScopedStats( job *schema.Job, metrics []string, scopes []schema.MetricScope, @@ -675,7 +679,7 @@ func LoadScopedStats( // Returns: // - Map of hostname → metric → []JobMetric // - Error or partial error listing failed queries -func LoadNodeData( +func (ccms *InternalMetricStore) LoadNodeData( cluster string, metrics, nodes []string, scopes []schema.MetricScope, @@ -778,7 +782,7 @@ func LoadNodeData( // Returns: // - Map of hostname → JobData (metric → scope → JobMetric) // - Error or partial error listing failed queries -func LoadNodeListData( +func (ccms *InternalMetricStore) LoadNodeListData( cluster, subCluster string, nodes []string, metrics []string, @@ -912,7 +916,6 @@ func buildNodeQueries( scopes []schema.MetricScope, resolution int64, ) ([]APIQuery, []schema.MetricScope, error) { - queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes)) assignedScope := []schema.MetricScope{}