Major refactor of metric data handling

- make the  internal memory store required and default
- Rename memorystore to metricstore
- Rename metricDataDispatcher to metricdispatch
- Remove metricdata package
- Introduce metricsync package for upstream metric data pull
This commit is contained in:
2025-12-25 08:42:54 +01:00
parent 8576ae458d
commit 11ec2267da
39 changed files with 815 additions and 2578 deletions

View File

@@ -19,7 +19,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/generated"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
@@ -484,7 +484,7 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str
return nil, err
}
data, err := metricDataDispatcher.LoadData(job, metrics, scopes, ctx, *resolution)
data, err := metricdispatch.LoadData(job, metrics, scopes, ctx, *resolution)
if err != nil {
cclog.Warn("Error while loading job data")
return nil, err
@@ -512,7 +512,7 @@ func (r *queryResolver) JobStats(ctx context.Context, id string, metrics []strin
return nil, err
}
data, err := metricDataDispatcher.LoadJobStats(job, metrics, ctx)
data, err := metricdispatch.LoadJobStats(job, metrics, ctx)
if err != nil {
cclog.Warnf("Error while loading jobStats data for job id %s", id)
return nil, err
@@ -537,7 +537,7 @@ func (r *queryResolver) ScopedJobStats(ctx context.Context, id string, metrics [
return nil, err
}
data, err := metricDataDispatcher.LoadScopedJobStats(job, metrics, scopes, ctx)
data, err := metricdispatch.LoadScopedJobStats(job, metrics, scopes, ctx)
if err != nil {
cclog.Warnf("Error while loading scopedJobStats data for job id %s", id)
return nil, err
@@ -702,7 +702,7 @@ func (r *queryResolver) JobsMetricStats(ctx context.Context, filter []*model.Job
res := []*model.JobStats{}
for _, job := range jobs {
data, err := metricDataDispatcher.LoadJobStats(job, metrics, ctx)
data, err := metricdispatch.LoadJobStats(job, metrics, ctx)
if err != nil {
cclog.Warnf("Error while loading comparison jobStats data for job id %d", job.JobID)
continue
@@ -759,7 +759,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes [
}
}
data, err := metricDataDispatcher.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
data, err := metricdispatch.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
if err != nil {
cclog.Warn("error while loading node data")
return nil, err
@@ -825,7 +825,7 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub
}
}
data, err := metricDataDispatcher.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, *resolution, from, to, ctx)
data, err := metricdispatch.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, *resolution, from, to, ctx)
if err != nil {
cclog.Warn("error while loading node data (Resolver.NodeMetricsList")
return nil, err
@@ -880,7 +880,7 @@ func (r *queryResolver) ClusterMetrics(ctx context.Context, cluster string, metr
// 'nodes' == nil -> Defaults to all nodes of cluster for existing query workflow
scopes := []schema.MetricScope{"node"}
data, err := metricDataDispatcher.LoadNodeData(cluster, metrics, nil, scopes, from, to, ctx)
data, err := metricdispatch.LoadNodeData(cluster, metrics, nil, scopes, from, to, ctx)
if err != nil {
cclog.Warn("error while loading node data")
return nil, err