From 6743d94b0ea30de9cbc1c15ac70dfd1157e2cbbf Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Thu, 20 Jan 2022 10:43:46 +0100 Subject: [PATCH] Use new simpler cc-metric-store API --- metricdata/cc-metric-store.go | 488 ++++++++++++++++------------------ 1 file changed, 231 insertions(+), 257 deletions(-) diff --git a/metricdata/cc-metric-store.go b/metricdata/cc-metric-store.go index c0cef01..0a4d6f3 100644 --- a/metricdata/cc-metric-store.go +++ b/metricdata/cc-metric-store.go @@ -5,10 +5,8 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "net/http" - "strconv" "time" "github.com/ClusterCockpit/cc-jobarchive/config" @@ -16,23 +14,30 @@ import ( ) type CCMetricStore struct { - jwt string - url string - client http.Client + jwt string + url string + queryEndpoint string + client http.Client } -type ApiRequestBody struct { - Metrics []string `json:"metrics"` - Selectors [][]string `json:"selectors"` +type ApiQueryRequest struct { + Cluster string `json:"cluster"` + From int64 `json:"from"` + To int64 `json:"to"` + WithStats bool `json:"with-stats"` + WithData bool `json:"with-data"` + Queries []ApiQuery `json:"queries"` + ForAllNodes []string `json:"for-all-nodes"` } type ApiQuery struct { - Metric string `json:"metric"` - Hostname string `json:"hostname"` - Type *string `json:"type,omitempty"` - TypeIds []string `json:"type-ids,omitempty"` - SubType *string `json:"subtype,omitempty"` - SubTypeIds []string `json:"subtype-ids,omitempty"` + Metric string `json:"metric"` + Hostname string `json:"host"` + Aggregate bool `json:"aggreg"` + Type *string `json:"type,omitempty"` + TypeIds []int `json:"type-ids,omitempty"` + SubType *string `json:"subtype,omitempty"` + SubTypeIds []int `json:"subtype-ids,omitempty"` } type ApiMetricData struct { @@ -45,18 +50,9 @@ type ApiMetricData struct { Max schema.Float `json:"max"` } -type ApiStatsData struct { - Error *string `json:"error"` - From int64 `json:"from"` - To int64 `json:"to"` - Samples int `json:"samples"` - Avg schema.Float `json:"avg"` - Min schema.Float `json:"min"` - Max schema.Float `json:"max"` -} - func (ccms *CCMetricStore) Init(url, token string) error { ccms.url = url + ccms.queryEndpoint = fmt.Sprintf("%s/api/query", url) ccms.jwt = token ccms.client = http.Client{ Timeout: 5 * time.Second, @@ -64,100 +60,67 @@ func (ccms *CCMetricStore) Init(url, token string) error { return nil } -func (ccms *CCMetricStore) doRequest(job *schema.Job, suffix string, metrics []string, ctx context.Context) (*http.Response, error) { - from, to := job.StartTime.Unix(), job.StartTime.Add(time.Duration(job.Duration)*time.Second).Unix() - reqBody := ApiRequestBody{} - reqBody.Metrics = metrics - for _, node := range job.Resources { - if node.Accelerators != nil || node.HWThreads != nil { - // TODO/FIXME: - return nil, errors.New("todo: cc-metric-store resources: Accelerator/HWThreads") - } - - reqBody.Selectors = append(reqBody.Selectors, []string{job.Cluster, node.Hostname}) - } - - reqBodyBytes, err := json.Marshal(reqBody) - if err != nil { +func (ccms *CCMetricStore) doRequest(ctx context.Context, body *ApiQueryRequest) ([][]ApiMetricData, error) { + buf := &bytes.Buffer{} + if err := json.NewEncoder(buf).Encode(body); err != nil { return nil, err } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s/api/%d/%d/%s", ccms.url, from, to, suffix), bytes.NewReader(reqBodyBytes)) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, ccms.queryEndpoint, buf) if err != nil { return nil, err } if ccms.jwt != "" { req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt)) } - return ccms.client.Do(req) + + res, err := ccms.client.Do(req) + if err != nil { + return nil, err + } + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("'%s': HTTP Status: %s", ccms.queryEndpoint, res.Status) + } + + var resBody [][]ApiMetricData + if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&resBody); err != nil { + return nil, err + } + + return resBody, nil } func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { - type ApiQueryRequest struct { - Cluster string `json:"cluster"` - From int64 `json:"from"` - To int64 `json:"to"` - Queries []ApiQuery `json:"queries"` - } - - type ApiQueryResponse struct { - ApiMetricData - Query *ApiQuery `json:"query"` - } - queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes) if err != nil { return nil, err } - reqBody := ApiQueryRequest{ - Cluster: job.Cluster, - From: job.StartTime.Unix(), - To: job.StartTime.Add(time.Duration(job.Duration) * time.Second).Unix(), - Queries: queries, + req := ApiQueryRequest{ + Cluster: job.Cluster, + From: job.StartTime.Unix(), + To: job.StartTime.Add(time.Duration(job.Duration) * time.Second).Unix(), + Queries: queries, + WithStats: true, + WithData: true, } - buf := &bytes.Buffer{} - if err := json.NewEncoder(buf).Encode(reqBody); err != nil { - return nil, err - } - - req, err := http.NewRequestWithContext(ctx, http.MethodPost, ccms.url+"/api/query", buf) + resBody, err := ccms.doRequest(ctx, &req) if err != nil { return nil, err } - if ccms.jwt != "" { - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt)) - } - res, err := ccms.client.Do(req) - if err != nil { - return nil, err - } - if res.StatusCode != http.StatusOK { - return nil, fmt.Errorf("cc-metric-store replied with: %s", res.Status) - } - - var resBody []ApiQueryResponse - if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&resBody); err != nil { - return nil, err - } - - // log.Printf("response: %#v", resBody) var jobData schema.JobData = make(schema.JobData) - for i, res := range resBody { - metric := res.Query.Metric - if _, ok := jobData[metric]; !ok { - jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric) - } - - if res.Error != nil { - return nil, fmt.Errorf("cc-metric-store error while fetching %s: %s", metric, *res.Error) - } - + for i, row := range resBody { + query := req.Queries[i] scope := assignedScope[i] - mc := config.GetMetricConfig(job.Cluster, metric) - jobMetric, ok := jobData[metric][scope] + mc := config.GetMetricConfig(job.Cluster, query.Metric) + if _, ok := jobData[query.Metric]; !ok { + jobData[query.Metric] = make(map[schema.MetricScope]*schema.JobMetric) + } + + jobMetric, ok := jobData[query.Metric][scope] if !ok { jobMetric = &schema.JobMetric{ Unit: mc.Unit, @@ -165,41 +128,47 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes [] Timestep: mc.Timestep, Series: make([]schema.Series, 0), } - jobData[metric][scope] = jobMetric + jobData[query.Metric][scope] = jobMetric } - id := (*int)(nil) - if res.Query.Type != nil { - id = new(int) - *id, _ = strconv.Atoi(res.Query.TypeIds[0]) - } + for _, res := range row { + if res.Error != nil { + return nil, fmt.Errorf("cc-metric-store error while fetching %s: %s", query.Metric, *res.Error) + } - if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { - // TODO: use schema.Float instead of float64? - // This is done because regular float64 can not be JSONed when NaN. - res.Avg = schema.Float(0) - res.Min = schema.Float(0) - res.Max = schema.Float(0) - } + id := (*int)(nil) + if query.Type != nil { + id = new(int) + *id = query.TypeIds[0] + } - jobMetric.Series = append(jobMetric.Series, schema.Series{ - Hostname: res.Query.Hostname, - Id: id, - Statistics: &schema.MetricStatistics{ - Avg: float64(res.Avg), - Min: float64(res.Min), - Max: float64(res.Max), - }, - Data: res.Data, - }) + if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() { + // TODO: use schema.Float instead of float64? + // This is done because regular float64 can not be JSONed when NaN. + res.Avg = schema.Float(0) + res.Min = schema.Float(0) + res.Max = schema.Float(0) + } + + jobMetric.Series = append(jobMetric.Series, schema.Series{ + Hostname: query.Hostname, + Id: id, + Statistics: &schema.MetricStatistics{ + Avg: float64(res.Avg), + Min: float64(res.Min), + Max: float64(res.Max), + }, + Data: res.Data, + }) + } } return jobData, nil } var ( - hwthreadString = string("cpu") // TODO/FIXME: inconsistency between cc-metric-collector and ClusterCockpit - // coreString = string(schema.MetricScopeCore) + hwthreadString = string("cpu") // TODO/FIXME: inconsistency between cc-metric-collector and ClusterCockpit + coreString = string(schema.MetricScopeCore) socketString = string(schema.MetricScopeSocket) acceleratorString = string(schema.MetricScopeAccelerator) ) @@ -239,15 +208,14 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope // Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node) if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) { - for _, accel := range host.Accelerators { - queries = append(queries, ApiQuery{ - Metric: metric, - Hostname: host.Hostname, - Type: &acceleratorString, - TypeIds: []string{strconv.Itoa(accel)}, - }) - assignedScope = append(assignedScope, schema.MetricScopeAccelerator) - } + queries = append(queries, ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: false, + Type: &acceleratorString, + TypeIds: host.Accelerators, + }) + assignedScope = append(assignedScope, schema.MetricScopeAccelerator) continue } @@ -258,26 +226,26 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope } queries = append(queries, ApiQuery{ - Metric: metric, - Hostname: host.Hostname, - Type: &acceleratorString, - TypeIds: toStringSlice(host.Accelerators), + Metric: metric, + Hostname: host.Hostname, + Aggregate: true, + Type: &acceleratorString, + TypeIds: host.Accelerators, }) - assignedScope = append(assignedScope, schema.MetricScopeNode) + assignedScope = append(assignedScope, scope) continue } // HWThread -> HWThead if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { - for _, hwthread := range hwthreads { - queries = append(queries, ApiQuery{ - Metric: metric, - Hostname: host.Hostname, - Type: &hwthreadString, - TypeIds: []string{strconv.Itoa(hwthread)}, - }) - assignedScope = append(assignedScope, schema.MetricScopeHWThread) - } + queries = append(queries, ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: false, + Type: &hwthreadString, + TypeIds: hwthreads, + }) + assignedScope = append(assignedScope, scope) continue } @@ -286,12 +254,13 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope cores, _ := topology.GetCoresFromHWThreads(hwthreads) for _, core := range cores { queries = append(queries, ApiQuery{ - Metric: metric, - Hostname: host.Hostname, - Type: &hwthreadString, - TypeIds: toStringSlice(topology.Core[core]), + Metric: metric, + Hostname: host.Hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: topology.Core[core], }) - assignedScope = append(assignedScope, schema.MetricScopeCore) + assignedScope = append(assignedScope, scope) } continue } @@ -301,12 +270,13 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) for _, socket := range sockets { queries = append(queries, ApiQuery{ - Metric: metric, - Hostname: host.Hostname, - Type: &hwthreadString, - TypeIds: toStringSlice(topology.Socket[socket]), + Metric: metric, + Hostname: host.Hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: topology.Socket[socket], }) - assignedScope = append(assignedScope, schema.MetricScopeSocket) + assignedScope = append(assignedScope, scope) } continue } @@ -314,27 +284,55 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope // HWThread -> Node if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { queries = append(queries, ApiQuery{ - Metric: metric, - Hostname: host.Hostname, - Type: &hwthreadString, - TypeIds: toStringSlice(hwthreads), + Metric: metric, + Hostname: host.Hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: hwthreads, }) - assignedScope = append(assignedScope, schema.MetricScopeNode) + assignedScope = append(assignedScope, scope) + continue + } + + // Core -> Core + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { + cores, _ := topology.GetCoresFromHWThreads(hwthreads) + queries = append(queries, ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: false, + Type: &coreString, + TypeIds: cores, + }) + assignedScope = append(assignedScope, scope) + continue + } + + // Core -> Node + if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { + cores, _ := topology.GetCoresFromHWThreads(hwthreads) + queries = append(queries, ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: true, + Type: &coreString, + TypeIds: cores, + }) + assignedScope = append(assignedScope, scope) continue } // Socket -> Socket if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) - for _, socket := range sockets { - queries = append(queries, ApiQuery{ - Metric: metric, - Hostname: host.Hostname, - Type: &acceleratorString, - TypeIds: []string{strconv.Itoa(socket)}, - }) - assignedScope = append(assignedScope, schema.MetricScopeSocket) - } + queries = append(queries, ApiQuery{ + Metric: metric, + Hostname: host.Hostname, + Aggregate: false, + Type: &socketString, + TypeIds: sockets, + }) + assignedScope = append(assignedScope, scope) continue } @@ -342,12 +340,13 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: metric, - Hostname: host.Hostname, - Type: &socketString, - TypeIds: toStringSlice(sockets), + Metric: metric, + Hostname: host.Hostname, + Aggregate: true, + Type: &socketString, + TypeIds: sockets, }) - assignedScope = append(assignedScope, schema.MetricScopeNode) + assignedScope = append(assignedScope, scope) continue } @@ -357,7 +356,7 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope Metric: metric, Hostname: host.Hostname, }) - assignedScope = append(assignedScope, schema.MetricScopeNode) + assignedScope = append(assignedScope, scope) continue } @@ -369,121 +368,96 @@ func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scope return queries, assignedScope, nil } -func toStringSlice(s []int) []string { - ret := make([]string, len(s)) - for i, val := range s { - ret[i] = strconv.Itoa(val) - } - return ret -} - func (ccms *CCMetricStore) LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) { - res, err := ccms.doRequest(job, "stats", metrics, ctx) + queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}) if err != nil { return nil, err } - resdata := make([]map[string]ApiStatsData, 0, len(job.Resources)) - if err := json.NewDecoder(res.Body).Decode(&resdata); err != nil { + req := ApiQueryRequest{ + Cluster: job.Cluster, + From: job.StartTime.Unix(), + To: job.StartTime.Add(time.Duration(job.Duration) * time.Second).Unix(), + Queries: queries, + WithStats: true, + WithData: false, + } + + resBody, err := ccms.doRequest(ctx, &req) + if err != nil { return nil, err } - stats := map[string]map[string]schema.MetricStatistics{} - for _, metric := range metrics { - nodestats := map[string]schema.MetricStatistics{} - for i, node := range job.Resources { - if node.Accelerators != nil || node.HWThreads != nil { - // TODO/FIXME: - return nil, errors.New("todo: cc-metric-store resources: Accelerator/HWThreads") - } - - data := resdata[i][metric] - if data.Error != nil { - return nil, errors.New(*data.Error) - } - - if data.Samples == 0 { - return nil, fmt.Errorf("no data for node '%s' and metric '%s'", node.Hostname, metric) - } - - nodestats[node.Hostname] = schema.MetricStatistics{ - Avg: float64(data.Avg), - Min: float64(data.Min), - Max: float64(data.Max), - } + stats := make(map[string]map[string]schema.MetricStatistics, len(metrics)) + for i, res := range resBody { + query := req.Queries[i] + data := res[0] + if data.Error != nil { + return nil, fmt.Errorf("fetching %s for node %s failed: %s", query.Metric, query.Hostname, *data.Error) } - stats[metric] = nodestats + metricdata, ok := stats[query.Metric] + if !ok { + metricdata = make(map[string]schema.MetricStatistics, job.NumNodes) + stats[query.Metric] = metricdata + } + + if data.Avg.IsNaN() || data.Min.IsNaN() || data.Max.IsNaN() { + return nil, fmt.Errorf("fetching %s for node %s failed: %s", query.Metric, query.Hostname, "avg/min/max is NaN") + } + + metricdata[query.Hostname] = schema.MetricStatistics{ + Avg: float64(data.Avg), + Min: float64(data.Min), + Max: float64(data.Max), + } } return stats, nil } func (ccms *CCMetricStore) LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx context.Context) (map[string]map[string][]schema.Float, error) { - reqBody := ApiRequestBody{} - reqBody.Metrics = metrics - for _, node := range nodes { - reqBody.Selectors = append(reqBody.Selectors, []string{clusterId, node}) + req := ApiQueryRequest{ + Cluster: clusterId, + From: from, + To: to, + WithStats: false, + WithData: true, } - reqBodyBytes, err := json.Marshal(reqBody) - if err != nil { - return nil, err - } - - var req *http.Request if nodes == nil { - req, err = http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s/api/%s/%d/%d/all-nodes", ccms.url, clusterId, from, to), bytes.NewReader(reqBodyBytes)) + req.ForAllNodes = metrics } else { - req, err = http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s/api/%d/%d/timeseries", ccms.url, from, to), bytes.NewReader(reqBodyBytes)) + for _, node := range nodes { + for _, metric := range metrics { + req.Queries = append(req.Queries, ApiQuery{ + Hostname: node, + Metric: metric, + }) + } + } } - if err != nil { - return nil, err - } - if ccms.jwt != "" { - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt)) - } - res, err := ccms.client.Do(req) + + resBody, err := ccms.doRequest(ctx, &req) if err != nil { return nil, err } - data := map[string]map[string][]schema.Float{} - if nodes == nil { - resdata := map[string]map[string]ApiMetricData{} - if err := json.NewDecoder(res.Body).Decode(&resdata); err != nil { - return nil, err + data := make(map[string]map[string][]schema.Float) + for i, res := range resBody { + query := req.Queries[i] + qdata := res[0] + if qdata.Error != nil { + return nil, fmt.Errorf("fetching %s for node %s failed: %s", query.Metric, query.Hostname, *qdata.Error) } - for node, metrics := range resdata { - nodedata := map[string][]schema.Float{} - for metric, data := range metrics { - if data.Error != nil { - return nil, errors.New(*data.Error) - } - - nodedata[metric] = data.Data - } - data[node] = nodedata - } - } else { - resdata := make([]map[string]ApiMetricData, 0, len(nodes)) - if err := json.NewDecoder(res.Body).Decode(&resdata); err != nil { - return nil, err + nodedata, ok := data[query.Hostname] + if !ok { + nodedata = make(map[string][]schema.Float) + data[query.Hostname] = nodedata } - for i, node := range nodes { - metricsData := map[string][]schema.Float{} - for metric, data := range resdata[i] { - if data.Error != nil { - return nil, errors.New(*data.Error) - } - - metricsData[metric] = data.Data - } - - data[node] = metricsData - } + nodedata[query.Metric] = qdata.Data } return data, nil