diff --git a/metricdata/cc-metric-store.go b/metricdata/cc-metric-store.go index f9d5d62..b0cfedd 100644 --- a/metricdata/cc-metric-store.go +++ b/metricdata/cc-metric-store.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "fmt" + "log" "net/http" "time" @@ -42,6 +43,11 @@ type ApiQuery struct { SubTypeIds []int `json:"subtype-ids,omitempty"` } +type ApiQueryResponse struct { + Queries []ApiQuery `json:"queries,omitempty"` + Results [][]ApiMetricData `json:"results"` +} + type ApiMetricData struct { Error *string `json:"error"` From int64 `json:"from"` @@ -90,7 +96,7 @@ func (ccms *CCMetricStore) toLocalName(metric string) string { return metric } -func (ccms *CCMetricStore) doRequest(ctx context.Context, body *ApiQueryRequest) ([][]ApiMetricData, error) { +func (ccms *CCMetricStore) doRequest(ctx context.Context, body *ApiQueryRequest) (*ApiQueryResponse, error) { buf := &bytes.Buffer{} if err := json.NewEncoder(buf).Encode(body); err != nil { return nil, err @@ -113,12 +119,12 @@ func (ccms *CCMetricStore) doRequest(ctx context.Context, body *ApiQueryRequest) return nil, fmt.Errorf("'%s': HTTP Status: %s", ccms.queryEndpoint, res.Status) } - var resBody [][]ApiMetricData + var resBody ApiQueryResponse if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&resBody); err != nil { return nil, err } - return resBody, nil + return &resBody, nil } func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { @@ -142,7 +148,7 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes [] } var jobData schema.JobData = make(schema.JobData) - for i, row := range resBody { + for i, row := range resBody.Results { query := req.Queries[i] metric := ccms.toLocalName(query.Metric) scope := assignedScope[i] @@ -421,7 +427,7 @@ func (ccms *CCMetricStore) LoadStats(job *schema.Job, metrics []string, ctx cont } stats := make(map[string]map[string]schema.MetricStatistics, len(metrics)) - for i, res := range resBody { + for i, res := range resBody.Results { query := req.Queries[i] metric := ccms.toLocalName(query.Metric) data := res[0] @@ -455,12 +461,15 @@ func (ccms *CCMetricStore) LoadNodeData(cluster, partition string, metrics, node Cluster: cluster, From: from.Unix(), To: to.Unix(), - WithStats: false, + WithStats: true, WithData: true, } if nodes == nil { - req.ForAllNodes = metrics + for _, metric := range metrics { + log.Printf("local-name: %s, remote-name: %s, renamings: %#v", metric, ccms.toRemoteName(metric), ccms.here2there) + req.ForAllNodes = append(req.ForAllNodes, ccms.toRemoteName(metric)) + } } else { for _, node := range nodes { for _, metric := range metrics { @@ -479,14 +488,26 @@ func (ccms *CCMetricStore) LoadNodeData(cluster, partition string, metrics, node _ = resBody data := make(map[string]map[string][]*schema.JobMetric) - for i, res := range resBody { - query := req.Queries[i] + for i, res := range resBody.Results { + var query ApiQuery + if resBody.Queries != nil { + query = resBody.Queries[i] + } else { + query = req.Queries[i] + } + + log.Printf("results for %#v: %#v\n", query, res) + metric := ccms.toLocalName(query.Metric) qdata := res[0] if qdata.Error != nil { return nil, fmt.Errorf("fetching %s for node %s failed: %s", query.Metric, query.Hostname, *qdata.Error) } + if qdata.Avg.IsNaN() || qdata.Min.IsNaN() || qdata.Max.IsNaN() { + return nil, fmt.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN") + } + hostdata, ok := data[query.Hostname] if !ok { hostdata = make(map[string][]*schema.JobMetric) @@ -502,6 +523,11 @@ func (ccms *CCMetricStore) LoadNodeData(cluster, partition string, metrics, node { Hostname: query.Hostname, Data: qdata.Data, + Statistics: &schema.MetricStatistics{ + Avg: float64(qdata.Avg), + Min: float64(qdata.Min), + Max: float64(qdata.Max), + }, }, }, })