start working on non-node-scoped metrics in node/system view

This commit is contained in:
Lou Knauer
2022-01-31 15:16:05 +01:00
parent 84431585f9
commit 29a25dbaff
8 changed files with 107 additions and 264 deletions

View File

@@ -449,11 +449,12 @@ func (ccms *CCMetricStore) LoadStats(job *schema.Job, metrics []string, ctx cont
return stats, nil
}
func (ccms *CCMetricStore) LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx context.Context) (map[string]map[string][]schema.Float, error) {
// TODO: Support sub-node-scope metrics! For this, the partition of a node needs to be known!
func (ccms *CCMetricStore) LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
req := ApiQueryRequest{
Cluster: clusterId,
From: from,
To: to,
Cluster: cluster,
From: from.Unix(),
To: to.Unix(),
WithStats: false,
WithData: true,
}
@@ -476,21 +477,34 @@ func (ccms *CCMetricStore) LoadNodeData(clusterId string, metrics, nodes []strin
return nil, err
}
data := make(map[string]map[string][]schema.Float)
_ = resBody
data := make(map[string]map[string][]*schema.JobMetric)
for i, res := range resBody {
query := req.Queries[i]
metric := ccms.toLocalName(query.Metric)
qdata := res[0]
if qdata.Error != nil {
return nil, fmt.Errorf("fetching %s for node %s failed: %s", query.Metric, query.Hostname, *qdata.Error)
}
nodedata, ok := data[query.Hostname]
hostdata, ok := data[query.Hostname]
if !ok {
nodedata = make(map[string][]schema.Float)
data[query.Hostname] = nodedata
hostdata = make(map[string][]*schema.JobMetric)
data[query.Hostname] = hostdata
}
nodedata[ccms.toLocalName(query.Metric)] = qdata.Data
mc := config.GetMetricConfig(cluster, metric)
hostdata[query.Metric] = append(hostdata[query.Metric], &schema.JobMetric{
Unit: mc.Unit,
Scope: schema.MetricScopeNode,
Timestep: mc.Timestep,
Series: []schema.Series{
{
Hostname: query.Hostname,
Data: qdata.Data,
},
},
})
}
return data, nil

View File

@@ -21,8 +21,8 @@ type MetricDataRepository interface {
// Return a map of metrics to a map of nodes to the metric statistics of the job. node scope assumed for now.
LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error)
// Return a map of nodes to a map of metrics to the data for the requested time.
LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx context.Context) (map[string]map[string][]schema.Float, error)
// Return a map of hosts to a map of metrics at the requested scopes for that node.
LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error)
}
var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{}
@@ -152,26 +152,26 @@ func LoadAverages(job *schema.Job, metrics []string, data [][]schema.Float, ctx
return nil
}
// Used for the node/system view. Returns a map of nodes to a map of metrics (at node scope).
func LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx context.Context) (map[string]map[string][]schema.Float, error) {
repo, ok := metricDataRepos[clusterId]
// Used for the node/system view. Returns a map of nodes to a map of metrics.
func LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
repo, ok := metricDataRepos[cluster]
if !ok {
return nil, fmt.Errorf("no metric data repository configured for '%s'", clusterId)
return nil, fmt.Errorf("no metric data repository configured for '%s'", cluster)
}
if metrics == nil {
for _, m := range config.GetClusterConfig(clusterId).MetricConfig {
for _, m := range config.GetClusterConfig(cluster).MetricConfig {
metrics = append(metrics, m.Name)
}
}
data, err := repo.LoadNodeData(clusterId, metrics, nodes, from, to, ctx)
data, err := repo.LoadNodeData(cluster, partition, metrics, nodes, scopes, from, to, ctx)
if err != nil {
return nil, err
}
if data == nil {
return nil, fmt.Errorf("the metric data repository for '%s' does not support this query", clusterId)
return nil, fmt.Errorf("the metric data repository for '%s' does not support this query", cluster)
}
return data, nil

View File

@@ -2,6 +2,7 @@ package metricdata
import (
"context"
"time"
"github.com/ClusterCockpit/cc-backend/schema"
)
@@ -31,6 +32,6 @@ func (tmdr *TestMetricDataRepository) LoadStats(job *schema.Job, metrics []strin
panic("TODO")
}
func (tmdr *TestMetricDataRepository) LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx context.Context) (map[string]map[string][]schema.Float, error) {
func (tmdr *TestMetricDataRepository) LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
panic("TODO")
}