Add metric renaming map and mock-repo

This commit is contained in:
Lou Knauer 2022-01-24 10:06:25 +01:00
parent 6743d94b0e
commit ae3e03f9b9
4 changed files with 111 additions and 36 deletions

View File

@ -14,6 +14,11 @@ type MetricDataRepository struct {
Kind string `json:"kind"`
Url string `json:"url"`
Token string `json:"token"`
// If metrics are known to this MetricDataRepository under a different
// name than in the `metricConfig` section of the 'cluster.json',
// provide this optional mapping of local to remote name for this metric.
Renamings map[string]string `json:"metricRenamings"`
}
// Return a list of socket IDs given a list of hwthread IDs.

View File

@ -18,6 +18,8 @@ type CCMetricStore struct {
url string
queryEndpoint string
client http.Client
here2there map[string]string
there2here map[string]string
}
type ApiQueryRequest struct {
@ -50,16 +52,44 @@ type ApiMetricData struct {
Max schema.Float `json:"max"`
}
func (ccms *CCMetricStore) Init(url, token string) error {
func (ccms *CCMetricStore) Init(url, token string, renamings map[string]string) error {
ccms.url = url
ccms.queryEndpoint = fmt.Sprintf("%s/api/query", url)
ccms.jwt = token
ccms.client = http.Client{
Timeout: 5 * time.Second,
}
if renamings != nil {
ccms.here2there = renamings
ccms.there2here = make(map[string]string, len(renamings))
for k, v := range ccms.here2there {
ccms.there2here[v] = k
}
} else {
ccms.here2there = make(map[string]string)
ccms.there2here = make(map[string]string)
}
return nil
}
func (ccms *CCMetricStore) toRemoteName(metric string) string {
if renamed, ok := ccms.here2there[metric]; ok {
return renamed
}
return metric
}
func (ccms *CCMetricStore) toLocalName(metric string) string {
if renamed, ok := ccms.there2here[metric]; ok {
return renamed
}
return metric
}
func (ccms *CCMetricStore) doRequest(ctx context.Context, body *ApiQueryRequest) ([][]ApiMetricData, error) {
buf := &bytes.Buffer{}
if err := json.NewEncoder(buf).Encode(body); err != nil {
@ -114,13 +144,14 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []
var jobData schema.JobData = make(schema.JobData)
for i, row := range resBody {
query := req.Queries[i]
metric := ccms.toLocalName(query.Metric)
scope := assignedScope[i]
mc := config.GetMetricConfig(job.Cluster, query.Metric)
if _, ok := jobData[query.Metric]; !ok {
jobData[query.Metric] = make(map[schema.MetricScope]*schema.JobMetric)
mc := config.GetMetricConfig(job.Cluster, metric)
if _, ok := jobData[metric]; !ok {
jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric)
}
jobMetric, ok := jobData[query.Metric][scope]
jobMetric, ok := jobData[metric][scope]
if !ok {
jobMetric = &schema.JobMetric{
Unit: mc.Unit,
@ -128,12 +159,12 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []
Timestep: mc.Timestep,
Series: make([]schema.Series, 0),
}
jobData[query.Metric][scope] = jobMetric
jobData[metric][scope] = jobMetric
}
for _, res := range row {
if res.Error != nil {
return nil, fmt.Errorf("cc-metric-store error while fetching %s: %s", query.Metric, *res.Error)
return nil, fmt.Errorf("cc-metric-store error while fetching %s: %s", metric, *res.Error)
}
id := (*int)(nil)
@ -179,6 +210,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope
assignedScope := []schema.MetricScope{}
for _, metric := range metrics {
remoteName := ccms.toRemoteName(metric)
mc := config.GetMetricConfig(job.Cluster, metric)
if mc == nil {
// return nil, fmt.Errorf("metric '%s' is not specified for cluster '%s'", metric, job.Cluster)
@ -209,7 +241,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope
// Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node)
if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) {
queries = append(queries, ApiQuery{
Metric: metric,
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
Type: &acceleratorString,
@ -226,7 +258,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope
}
queries = append(queries, ApiQuery{
Metric: metric,
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &acceleratorString,
@ -239,7 +271,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope
// HWThread -> HWThead
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread {
queries = append(queries, ApiQuery{
Metric: metric,
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
Type: &hwthreadString,
@ -254,7 +286,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
for _, core := range cores {
queries = append(queries, ApiQuery{
Metric: metric,
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &hwthreadString,
@ -270,7 +302,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
for _, socket := range sockets {
queries = append(queries, ApiQuery{
Metric: metric,
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &hwthreadString,
@ -284,7 +316,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope
// HWThread -> Node
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode {
queries = append(queries, ApiQuery{
Metric: metric,
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &hwthreadString,
@ -298,7 +330,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
Metric: metric,
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
Type: &coreString,
@ -312,7 +344,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
Metric: metric,
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &coreString,
@ -326,7 +358,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
Metric: metric,
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
Type: &socketString,
@ -340,7 +372,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
Metric: metric,
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
Type: &socketString,
@ -353,7 +385,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope
// Node -> Node
if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode {
queries = append(queries, ApiQuery{
Metric: metric,
Metric: remoteName,
Hostname: host.Hostname,
})
assignedScope = append(assignedScope, scope)
@ -391,19 +423,20 @@ func (ccms *CCMetricStore) LoadStats(job *schema.Job, metrics []string, ctx cont
stats := make(map[string]map[string]schema.MetricStatistics, len(metrics))
for i, res := range resBody {
query := req.Queries[i]
metric := ccms.toLocalName(query.Metric)
data := res[0]
if data.Error != nil {
return nil, fmt.Errorf("fetching %s for node %s failed: %s", query.Metric, query.Hostname, *data.Error)
return nil, fmt.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error)
}
metricdata, ok := stats[query.Metric]
metricdata, ok := stats[metric]
if !ok {
metricdata = make(map[string]schema.MetricStatistics, job.NumNodes)
stats[query.Metric] = metricdata
stats[metric] = metricdata
}
if data.Avg.IsNaN() || data.Min.IsNaN() || data.Max.IsNaN() {
return nil, fmt.Errorf("fetching %s for node %s failed: %s", query.Metric, query.Hostname, "avg/min/max is NaN")
return nil, fmt.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN")
}
metricdata[query.Hostname] = schema.MetricStatistics{
@ -432,7 +465,7 @@ func (ccms *CCMetricStore) LoadNodeData(clusterId string, metrics, nodes []strin
for _, metric := range metrics {
req.Queries = append(req.Queries, ApiQuery{
Hostname: node,
Metric: metric,
Metric: ccms.toRemoteName(metric),
})
}
}
@ -457,7 +490,7 @@ func (ccms *CCMetricStore) LoadNodeData(clusterId string, metrics, nodes []strin
data[query.Hostname] = nodedata
}
nodedata[query.Metric] = qdata.Data
nodedata[ccms.toLocalName(query.Metric)] = qdata.Data
}
return data, nil

View File

@ -13,7 +13,7 @@ import (
type MetricDataRepository interface {
// Initialize this MetricDataRepository. One instance of
// this interface will only ever be responsible for one cluster.
Init(url, token string) error
Init(url, token string, renamings map[string]string) error
// Return the JobData for the given job, only with the requested metrics.
LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error)
@ -36,22 +36,23 @@ func Init(jobArchivePath string, disableArchive bool) error {
JobArchivePath = jobArchivePath
for _, cluster := range config.Clusters {
if cluster.MetricDataRepository != nil {
var mdr MetricDataRepository
switch cluster.MetricDataRepository.Kind {
case "cc-metric-store":
ccms := &CCMetricStore{}
if err := ccms.Init(cluster.MetricDataRepository.Url, cluster.MetricDataRepository.Token); err != nil {
return err
}
metricDataRepos[cluster.Name] = ccms
// case "influxdb-v2":
// idb := &InfluxDBv2DataRepository{}
// if err := idb.Init(cluster.MetricDataRepository.Url); err != nil {
// return err
// }
// metricDataRepos[cluster.Name] = idb
mdr = &CCMetricStore{}
case "test":
mdr = &TestMetricDataRepository{}
default:
return fmt.Errorf("unkown metric data repository '%s' for cluster '%s'", cluster.MetricDataRepository.Kind, cluster.Name)
}
if err := mdr.Init(
cluster.MetricDataRepository.Url,
cluster.MetricDataRepository.Token,
cluster.MetricDataRepository.Renamings); err != nil {
return err
}
metricDataRepos[cluster.Name] = mdr
}
}
return nil

36
metricdata/utils.go Normal file
View File

@ -0,0 +1,36 @@
package metricdata
import (
"context"
"github.com/ClusterCockpit/cc-jobarchive/schema"
)
var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
panic("TODO")
}
// Only a mock for unit-testing.
type TestMetricDataRepository struct {
url, token string
renamings map[string]string
}
func (tmdr *TestMetricDataRepository) Init(url, token string, renamings map[string]string) error {
tmdr.url = url
tmdr.token = token
tmdr.renamings = renamings
return nil
}
func (tmdr *TestMetricDataRepository) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
return TestLoadDataCallback(job, metrics, scopes, ctx)
}
func (tmdr *TestMetricDataRepository) LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) {
panic("TODO")
}
func (tmdr *TestMetricDataRepository) LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx context.Context) (map[string]map[string][]schema.Float, error) {
panic("TODO")
}