feat: Add support for multiple external metric stores

This commit is contained in:
2026-01-27 10:02:07 +01:00
parent 4853814228
commit b307e885ce
9 changed files with 280 additions and 87 deletions

View File

@@ -45,6 +45,13 @@ func setup(t *testing.T) *api.RestAPI {
"api-allowed-ips": [
"*"
]
},
"metric-store": {
"checkpoints": {
"interval": "12h"
},
"retention-in-memory": "48h",
"memory-cap": 100
},
"archive": {
"kind": "file",
@@ -143,6 +150,7 @@ func setup(t *testing.T) *api.RestAPI {
}
ccconf.Init(cfgFilePath)
metricstore.MetricStoreHandle = &metricstore.InternalMetricStore{}
// Load and check main configuration
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {

View File

@@ -0,0 +1,29 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package metricdispatch
const configSchema = `{
"type": "array",
"description": "Array of metric store configurations with scope-based routing.",
"items": {
"type": "object",
"properties": {
"scope": {
"description": "Scope identifier for routing metrics (e.g., cluster name, '*' for default)",
"type": "string"
},
"url": {
"description": "URL of the metric store endpoint",
"type": "string"
},
"token": {
"description": "Authentication token for the metric store",
"type": "string"
}
},
"required": ["scope", "url", "token"]
}
}`

View File

@@ -44,7 +44,6 @@ import (
"time"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/lrucache"
"github.com/ClusterCockpit/cc-lib/v2/resampler"
@@ -96,6 +95,13 @@ func LoadData(job *schema.Job,
if job.State == schema.JobStateRunning ||
job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving {
ms, err := GetMetricDataRepo(job.Cluster, job.SubCluster)
if err != nil {
cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
return err, 0, 0
}
if scopes == nil {
scopes = append(scopes, schema.MetricScopeNode)
}
@@ -107,7 +113,7 @@ func LoadData(job *schema.Job,
}
}
jd, err = metricstore.LoadData(job, metrics, scopes, ctx, resolution)
jd, err = ms.LoadData(job, metrics, scopes, ctx, resolution)
if err != nil {
if len(jd) != 0 {
cclog.Warnf("partial error loading metrics from store for job %d (user: %s, project: %s): %s",
@@ -236,7 +242,14 @@ func LoadAverages(
return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here?
}
stats, err := metricstore.LoadStats(job, metrics, ctx)
ms, err := GetMetricDataRepo(job.Cluster, job.SubCluster)
if err != nil {
cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
return err
}
stats, err := ms.LoadStats(job, metrics, ctx)
if err != nil {
cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
@@ -273,7 +286,14 @@ func LoadScopedJobStats(
return archive.LoadScopedStatsFromArchive(job, metrics, scopes)
}
scopedStats, err := metricstore.LoadScopedStats(job, metrics, scopes, ctx)
ms, err := GetMetricDataRepo(job.Cluster, job.SubCluster)
if err != nil {
cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
return nil, err
}
scopedStats, err := ms.LoadScopedStats(job, metrics, scopes, ctx)
if err != nil {
cclog.Errorf("failed to load scoped statistics from metric store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
@@ -295,9 +315,16 @@ func LoadJobStats(
return archive.LoadStatsFromArchive(job, metrics)
}
ms, err := GetMetricDataRepo(job.Cluster, job.SubCluster)
if err != nil {
cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
return nil, err
}
data := make(map[string]schema.MetricStatistics, len(metrics))
stats, err := metricstore.LoadStats(job, metrics, ctx)
stats, err := ms.LoadStats(job, metrics, ctx)
if err != nil {
cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
@@ -333,6 +360,7 @@ func LoadJobStats(
// the metric store (not the archive) since it's for current/recent node status monitoring.
//
// Returns a nested map structure: node -> metric -> scoped data.
// FIXME: Add support for subcluster specific cc-metric-stores
func LoadNodeData(
cluster string,
metrics, nodes []string,
@@ -346,7 +374,14 @@ func LoadNodeData(
}
}
data, err := metricstore.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
ms, err := GetMetricDataRepo(cluster, "")
if err != nil {
cclog.Errorf("failed to load node data from metric store: %s",
err.Error())
return nil, err
}
data, err := ms.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
if err != nil {
if len(data) != 0 {
cclog.Warnf("partial error loading node data from metric store for cluster %s: %s", cluster, err.Error())
@@ -383,7 +418,14 @@ func LoadNodeListData(
}
}
data, err := metricstore.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx)
ms, err := GetMetricDataRepo(cluster, subCluster)
if err != nil {
cclog.Errorf("failed to load node data from metric store: %s",
err.Error())
return nil, err
}
data, err := ms.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx)
if err != nil {
if len(data) != 0 {
cclog.Warnf("partial error loading node list data from metric store for cluster %s, subcluster %s: %s",

View File

@@ -0,0 +1,112 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package metricdispatch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
ccms "github.com/ClusterCockpit/cc-backend/internal/metricstoreclient"
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
)
type MetricDataRepository interface {
// Return the JobData for the given job, only with the requested metrics.
LoadData(job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
resolution int) (schema.JobData, error)
// Return a map of metrics to a map of nodes to the metric statistics of the job. node scope only.
LoadStats(job *schema.Job,
metrics []string,
ctx context.Context) (map[string]map[string]schema.MetricStatistics, error)
// Return a map of metrics to a map of scopes to the scoped metric statistics of the job.
LoadScopedStats(job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context) (schema.ScopedJobStats, error)
// Return a map of hosts to a map of metrics at the requested scopes (currently only node) for that node.
LoadNodeData(cluster string,
metrics, nodes []string,
scopes []schema.MetricScope,
from, to time.Time,
ctx context.Context) (map[string]map[string][]*schema.JobMetric, error)
// Return a map of hosts to a map of metrics to a map of scopes for multiple nodes.
LoadNodeListData(cluster, subCluster string,
nodes []string,
metrics []string,
scopes []schema.MetricScope,
resolution int,
from, to time.Time,
ctx context.Context) (map[string]schema.JobData, error)
}
type CCMetricStoreConfig struct {
Scope string `json:"scope"`
URL string `json:"url"`
Token string `json:"token"`
}
var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{}
func Init(rawConfig json.RawMessage) error {
if rawConfig != nil {
var configs []CCMetricStoreConfig
config.Validate(configSchema, rawConfig)
dec := json.NewDecoder(bytes.NewReader(rawConfig))
dec.DisallowUnknownFields()
if err := dec.Decode(&configs); err != nil {
return fmt.Errorf("[METRICDISPATCH]> Metric Store Config Init: Could not decode config file '%s' Error: %s", rawConfig, err.Error())
}
if len(configs) == 0 {
return fmt.Errorf("[METRICDISPATCH]> No metric store configurations found in config file")
}
for _, config := range configs {
metricDataRepos[config.Scope] = ccms.NewCCMetricStore(config.URL, config.Token)
}
}
return nil
}
func GetMetricDataRepo(cluster string, subcluster string) (MetricDataRepository, error) {
var repo MetricDataRepository
var ok bool
key := cluster + "-" + subcluster
repo, ok = metricDataRepos[key]
if !ok {
repo, ok = metricDataRepos[cluster]
if !ok {
repo, ok = metricDataRepos["*"]
if !ok {
if metricstore.MetricStoreHandle == nil {
return nil, fmt.Errorf("[METRICDISPATCH]> no metric data repository configured '%s'", key)
}
repo = metricstore.MetricStoreHandle
cclog.Debugf("[METRICDISPATCH]> Using internal metric data repository for '%s'", key)
}
}
}
return repo, nil
}

View File

@@ -17,8 +17,7 @@
//
// # Basic Usage
//
// store := &CCMetricStore{}
// store.Init("http://localhost:8080", "jwt-token")
// store := NewCCMetricStore("http://localhost:8080", "jwt-token")
//
// // Load job data
// jobData, err := store.LoadData(job, metrics, scopes, ctx, resolution)
@@ -60,11 +59,9 @@ import (
"encoding/json"
"fmt"
"net/http"
"sort"
"strings"
"time"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
@@ -124,15 +121,17 @@ type APIMetricData struct {
Max schema.Float `json:"max"` // Maximum value in time range
}
// Init initializes the CCMetricStore client with connection details.
// NewCCMetricStore creates and initializes a new CCMetricStore client.
// The url parameter should include the protocol and port (e.g., "http://localhost:8080").
// The token parameter is a JWT used for Bearer authentication; pass empty string if auth is disabled.
func (ccms *CCMetricStore) Init(url string, token string) {
ccms.url = url
ccms.queryEndpoint = fmt.Sprintf("%s/api/query", url)
ccms.jwt = token
ccms.client = http.Client{
Timeout: 10 * time.Second,
func NewCCMetricStore(url string, token string) *CCMetricStore {
return &CCMetricStore{
url: url,
queryEndpoint: fmt.Sprintf("%s/api/query", url),
jwt: token,
client: http.Client{
Timeout: 10 * time.Second,
},
}
}
@@ -547,64 +546,18 @@ func (ccms *CCMetricStore) LoadNodeData(
// - HasNextPage flag indicating if more pages are available
// - Error (may be partial error with some data returned)
func (ccms *CCMetricStore) LoadNodeListData(
cluster, subCluster, nodeFilter string,
cluster, subCluster string,
nodes []string,
metrics []string,
scopes []schema.MetricScope,
resolution int,
from, to time.Time,
page *model.PageRequest,
ctx context.Context,
) (map[string]schema.JobData, int, bool, error) {
// 0) Init additional vars
totalNodes := 0
hasNextPage := false
// 1) Get list of all nodes
var nodes []string
if subCluster != "" {
scNodes := archive.NodeLists[cluster][subCluster]
nodes = scNodes.PrintList()
} else {
subClusterNodeLists := archive.NodeLists[cluster]
for _, nodeList := range subClusterNodeLists {
nodes = append(nodes, nodeList.PrintList()...)
}
}
// 2) Filter nodes
if nodeFilter != "" {
filteredNodes := []string{}
for _, node := range nodes {
if strings.Contains(node, nodeFilter) {
filteredNodes = append(filteredNodes, node)
}
}
nodes = filteredNodes
}
// 2.1) Count total nodes && Sort nodes -> Sorting invalidated after ccms return ...
totalNodes = len(nodes)
sort.Strings(nodes)
// 3) Apply paging
if len(nodes) > page.ItemsPerPage {
start := (page.Page - 1) * page.ItemsPerPage
end := start + page.ItemsPerPage
if end > len(nodes) {
end = len(nodes)
hasNextPage = false
} else {
hasNextPage = true
}
nodes = nodes[start:end]
}
// Note: Order of node data is not guaranteed after this point, but contents match page and filter criteria
) (map[string]schema.JobData, error) {
queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, resolution)
if err != nil {
cclog.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error())
return nil, totalNodes, hasNextPage, err
return nil, err
}
req := APIQueryRequest{
@@ -619,7 +572,7 @@ func (ccms *CCMetricStore) LoadNodeListData(
resBody, err := ccms.doRequest(ctx, &req)
if err != nil {
cclog.Errorf("Error while performing request for cluster %s: %s", cluster, err.Error())
return nil, totalNodes, hasNextPage, err
return nil, err
}
var errors []string
@@ -694,10 +647,10 @@ func (ccms *CCMetricStore) LoadNodeListData(
if len(errors) != 0 {
/* Returns list of "partial errors" */
return data, totalNodes, hasNextPage, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
return data, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
}
return data, totalNodes, hasNextPage, nil
return data, nil
}
// sanitizeStats replaces NaN values in statistics with 0 to enable JSON marshaling.

View File

@@ -10,8 +10,8 @@ import (
"math"
"time"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
sq "github.com/Masterminds/squirrel"
@@ -66,7 +66,14 @@ func RegisterFootprintWorker() {
sJob := time.Now()
jobStats, err := metricstore.LoadStats(job, allMetrics, context.Background())
ms, err := metricdispatch.GetMetricDataRepo(job.Cluster, job.SubCluster)
if err != nil {
cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
continue
}
jobStats, err := ms.LoadStats(job, allMetrics, context.Background())
if err != nil {
cclog.Errorf("error wile loading job data stats for footprint update: %v", err)
ce++