diff --git a/graph/model/models.go b/graph/model/models.go index 3eb149e..0151228 100644 --- a/graph/model/models.go +++ b/graph/model/models.go @@ -14,6 +14,11 @@ type MetricDataRepository struct { Kind string `json:"kind"` Url string `json:"url"` Token string `json:"token"` + + // If metrics are known to this MetricDataRepository under a different + // name than in the `metricConfig` section of the 'cluster.json', + // provide this optional mapping of local to remote name for this metric. + Renamings map[string]string `json:"metricRenamings"` } // Return a list of socket IDs given a list of hwthread IDs. diff --git a/metricdata/cc-metric-store.go b/metricdata/cc-metric-store.go index 0a4d6f3..5c5f850 100644 --- a/metricdata/cc-metric-store.go +++ b/metricdata/cc-metric-store.go @@ -18,6 +18,8 @@ type CCMetricStore struct { url string queryEndpoint string client http.Client + here2there map[string]string + there2here map[string]string } type ApiQueryRequest struct { @@ -50,16 +52,44 @@ type ApiMetricData struct { Max schema.Float `json:"max"` } -func (ccms *CCMetricStore) Init(url, token string) error { +func (ccms *CCMetricStore) Init(url, token string, renamings map[string]string) error { ccms.url = url ccms.queryEndpoint = fmt.Sprintf("%s/api/query", url) ccms.jwt = token ccms.client = http.Client{ Timeout: 5 * time.Second, } + + if renamings != nil { + ccms.here2there = renamings + ccms.there2here = make(map[string]string, len(renamings)) + for k, v := range ccms.here2there { + ccms.there2here[v] = k + } + } else { + ccms.here2there = make(map[string]string) + ccms.there2here = make(map[string]string) + } + return nil } +func (ccms *CCMetricStore) toRemoteName(metric string) string { + if renamed, ok := ccms.here2there[metric]; ok { + return renamed + } + + return metric +} + +func (ccms *CCMetricStore) toLocalName(metric string) string { + if renamed, ok := ccms.there2here[metric]; ok { + return renamed + } + + return metric +} + func (ccms *CCMetricStore) doRequest(ctx context.Context, body *ApiQueryRequest) ([][]ApiMetricData, error) { buf := &bytes.Buffer{} if err := json.NewEncoder(buf).Encode(body); err != nil { @@ -114,13 +144,14 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes [] var jobData schema.JobData = make(schema.JobData) for i, row := range resBody { query := req.Queries[i] + metric := ccms.toLocalName(query.Metric) scope := assignedScope[i] - mc := config.GetMetricConfig(job.Cluster, query.Metric) - if _, ok := jobData[query.Metric]; !ok { - jobData[query.Metric] = make(map[schema.MetricScope]*schema.JobMetric) + mc := config.GetMetricConfig(job.Cluster, metric) + if _, ok := jobData[metric]; !ok { + jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric) } - jobMetric, ok := jobData[query.Metric][scope] + jobMetric, ok := jobData[metric][scope] if !ok { jobMetric = &schema.JobMetric{ Unit: mc.Unit, @@ -128,12 +159,12 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes [] Timestep: mc.Timestep, Series: make([]schema.Series, 0), } - jobData[query.Metric][scope] = jobMetric + jobData[metric][scope] = jobMetric } for _, res := range row { if res.Error != nil { - return nil, fmt.Errorf("cc-metric-store error while fetching %s: %s", query.Metric, *res.Error) + return nil, fmt.Errorf("cc-metric-store error while fetching %s: %s", metric, *res.Error) } id := (*int)(nil) @@ -179,6 +210,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope assignedScope := []schema.MetricScope{} for _, metric := range metrics { + remoteName := ccms.toRemoteName(metric) mc := config.GetMetricConfig(job.Cluster, metric) if mc == nil { // return nil, fmt.Errorf("metric '%s' is not specified for cluster '%s'", metric, job.Cluster) @@ -209,7 +241,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { queries = append(queries, ApiQuery{ - Metric: metric, + Metric: remoteName, Hostname: host.Hostname, Aggregate: false, Type: &acceleratorString, @@ -226,7 +258,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope } queries = append(queries, ApiQuery{ - Metric: metric, + Metric: remoteName, Hostname: host.Hostname, Aggregate: true, Type: &acceleratorString, @@ -239,7 +271,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope // HWThread -> HWThead if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { queries = append(queries, ApiQuery{ - Metric: metric, + Metric: remoteName, Hostname: host.Hostname, Aggregate: false, Type: &hwthreadString, @@ -254,7 +286,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope cores, _ := topology.GetCoresFromHWThreads(hwthreads) for _, core := range cores { queries = append(queries, ApiQuery{ - Metric: metric, + Metric: remoteName, Hostname: host.Hostname, Aggregate: true, Type: &hwthreadString, @@ -270,7 +302,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) for _, socket := range sockets { queries = append(queries, ApiQuery{ - Metric: metric, + Metric: remoteName, Hostname: host.Hostname, Aggregate: true, Type: &hwthreadString, @@ -284,7 +316,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope // HWThread -> Node if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { queries = append(queries, ApiQuery{ - Metric: metric, + Metric: remoteName, Hostname: host.Hostname, Aggregate: true, Type: &hwthreadString, @@ -298,7 +330,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: metric, + Metric: remoteName, Hostname: host.Hostname, Aggregate: false, Type: &coreString, @@ -312,7 +344,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { cores, _ := topology.GetCoresFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: metric, + Metric: remoteName, Hostname: host.Hostname, Aggregate: true, Type: &coreString, @@ -326,7 +358,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: metric, + Metric: remoteName, Hostname: host.Hostname, Aggregate: false, Type: &socketString, @@ -340,7 +372,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: metric, + Metric: remoteName, Hostname: host.Hostname, Aggregate: true, Type: &socketString, @@ -353,7 +385,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope // Node -> Node if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { queries = append(queries, ApiQuery{ - Metric: metric, + Metric: remoteName, Hostname: host.Hostname, }) assignedScope = append(assignedScope, scope) @@ -391,19 +423,20 @@ func (ccms *CCMetricStore) LoadStats(job *schema.Job, metrics []string, ctx cont stats := make(map[string]map[string]schema.MetricStatistics, len(metrics)) for i, res := range resBody { query := req.Queries[i] + metric := ccms.toLocalName(query.Metric) data := res[0] if data.Error != nil { - return nil, fmt.Errorf("fetching %s for node %s failed: %s", query.Metric, query.Hostname, *data.Error) + return nil, fmt.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error) } - metricdata, ok := stats[query.Metric] + metricdata, ok := stats[metric] if !ok { metricdata = make(map[string]schema.MetricStatistics, job.NumNodes) - stats[query.Metric] = metricdata + stats[metric] = metricdata } if data.Avg.IsNaN() || data.Min.IsNaN() || data.Max.IsNaN() { - return nil, fmt.Errorf("fetching %s for node %s failed: %s", query.Metric, query.Hostname, "avg/min/max is NaN") + return nil, fmt.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN") } metricdata[query.Hostname] = schema.MetricStatistics{ @@ -432,7 +465,7 @@ func (ccms *CCMetricStore) LoadNodeData(clusterId string, metrics, nodes []strin for _, metric := range metrics { req.Queries = append(req.Queries, ApiQuery{ Hostname: node, - Metric: metric, + Metric: ccms.toRemoteName(metric), }) } } @@ -457,7 +490,7 @@ func (ccms *CCMetricStore) LoadNodeData(clusterId string, metrics, nodes []strin data[query.Hostname] = nodedata } - nodedata[query.Metric] = qdata.Data + nodedata[ccms.toLocalName(query.Metric)] = qdata.Data } return data, nil diff --git a/metricdata/metricdata.go b/metricdata/metricdata.go index 2f95680..8c5da49 100644 --- a/metricdata/metricdata.go +++ b/metricdata/metricdata.go @@ -13,7 +13,7 @@ import ( type MetricDataRepository interface { // Initialize this MetricDataRepository. One instance of // this interface will only ever be responsible for one cluster. - Init(url, token string) error + Init(url, token string, renamings map[string]string) error // Return the JobData for the given job, only with the requested metrics. LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) @@ -36,22 +36,23 @@ func Init(jobArchivePath string, disableArchive bool) error { JobArchivePath = jobArchivePath for _, cluster := range config.Clusters { if cluster.MetricDataRepository != nil { + var mdr MetricDataRepository switch cluster.MetricDataRepository.Kind { case "cc-metric-store": - ccms := &CCMetricStore{} - if err := ccms.Init(cluster.MetricDataRepository.Url, cluster.MetricDataRepository.Token); err != nil { - return err - } - metricDataRepos[cluster.Name] = ccms - // case "influxdb-v2": - // idb := &InfluxDBv2DataRepository{} - // if err := idb.Init(cluster.MetricDataRepository.Url); err != nil { - // return err - // } - // metricDataRepos[cluster.Name] = idb + mdr = &CCMetricStore{} + case "test": + mdr = &TestMetricDataRepository{} default: return fmt.Errorf("unkown metric data repository '%s' for cluster '%s'", cluster.MetricDataRepository.Kind, cluster.Name) } + + if err := mdr.Init( + cluster.MetricDataRepository.Url, + cluster.MetricDataRepository.Token, + cluster.MetricDataRepository.Renamings); err != nil { + return err + } + metricDataRepos[cluster.Name] = mdr } } return nil diff --git a/metricdata/utils.go b/metricdata/utils.go new file mode 100644 index 0000000..bac70c7 --- /dev/null +++ b/metricdata/utils.go @@ -0,0 +1,36 @@ +package metricdata + +import ( + "context" + + "github.com/ClusterCockpit/cc-jobarchive/schema" +) + +var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { + panic("TODO") +} + +// Only a mock for unit-testing. +type TestMetricDataRepository struct { + url, token string + renamings map[string]string +} + +func (tmdr *TestMetricDataRepository) Init(url, token string, renamings map[string]string) error { + tmdr.url = url + tmdr.token = token + tmdr.renamings = renamings + return nil +} + +func (tmdr *TestMetricDataRepository) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { + return TestLoadDataCallback(job, metrics, scopes, ctx) +} + +func (tmdr *TestMetricDataRepository) LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) { + panic("TODO") +} + +func (tmdr *TestMetricDataRepository) LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx context.Context) (map[string]map[string][]schema.Float, error) { + panic("TODO") +}