Update cc-metric-store client; Bugfix in LoadNodeData

This commit is contained in:
Lou Knauer 2022-02-02 13:04:38 +01:00
parent 3d5aa9f904
commit f7959cb1bd

View File

@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
@ -42,6 +43,11 @@ type ApiQuery struct {
SubTypeIds []int `json:"subtype-ids,omitempty"`
}
type ApiQueryResponse struct {
Queries []ApiQuery `json:"queries,omitempty"`
Results [][]ApiMetricData `json:"results"`
}
type ApiMetricData struct {
Error *string `json:"error"`
From int64 `json:"from"`
@ -90,7 +96,7 @@ func (ccms *CCMetricStore) toLocalName(metric string) string {
return metric
}
func (ccms *CCMetricStore) doRequest(ctx context.Context, body *ApiQueryRequest) ([][]ApiMetricData, error) {
func (ccms *CCMetricStore) doRequest(ctx context.Context, body *ApiQueryRequest) (*ApiQueryResponse, error) {
buf := &bytes.Buffer{}
if err := json.NewEncoder(buf).Encode(body); err != nil {
return nil, err
@ -113,12 +119,12 @@ func (ccms *CCMetricStore) doRequest(ctx context.Context, body *ApiQueryRequest)
return nil, fmt.Errorf("'%s': HTTP Status: %s", ccms.queryEndpoint, res.Status)
}
var resBody [][]ApiMetricData
var resBody ApiQueryResponse
if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&resBody); err != nil {
return nil, err
}
return resBody, nil
return &resBody, nil
}
func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
@ -142,7 +148,7 @@ func (ccms *CCMetricStore) LoadData(job *schema.Job, metrics []string, scopes []
}
var jobData schema.JobData = make(schema.JobData)
for i, row := range resBody {
for i, row := range resBody.Results {
query := req.Queries[i]
metric := ccms.toLocalName(query.Metric)
scope := assignedScope[i]
@ -421,7 +427,7 @@ func (ccms *CCMetricStore) LoadStats(job *schema.Job, metrics []string, ctx cont
}
stats := make(map[string]map[string]schema.MetricStatistics, len(metrics))
for i, res := range resBody {
for i, res := range resBody.Results {
query := req.Queries[i]
metric := ccms.toLocalName(query.Metric)
data := res[0]
@ -455,12 +461,15 @@ func (ccms *CCMetricStore) LoadNodeData(cluster, partition string, metrics, node
Cluster: cluster,
From: from.Unix(),
To: to.Unix(),
WithStats: false,
WithStats: true,
WithData: true,
}
if nodes == nil {
req.ForAllNodes = metrics
for _, metric := range metrics {
log.Printf("local-name: %s, remote-name: %s, renamings: %#v", metric, ccms.toRemoteName(metric), ccms.here2there)
req.ForAllNodes = append(req.ForAllNodes, ccms.toRemoteName(metric))
}
} else {
for _, node := range nodes {
for _, metric := range metrics {
@ -479,14 +488,26 @@ func (ccms *CCMetricStore) LoadNodeData(cluster, partition string, metrics, node
_ = resBody
data := make(map[string]map[string][]*schema.JobMetric)
for i, res := range resBody {
query := req.Queries[i]
for i, res := range resBody.Results {
var query ApiQuery
if resBody.Queries != nil {
query = resBody.Queries[i]
} else {
query = req.Queries[i]
}
log.Printf("results for %#v: %#v\n", query, res)
metric := ccms.toLocalName(query.Metric)
qdata := res[0]
if qdata.Error != nil {
return nil, fmt.Errorf("fetching %s for node %s failed: %s", query.Metric, query.Hostname, *qdata.Error)
}
if qdata.Avg.IsNaN() || qdata.Min.IsNaN() || qdata.Max.IsNaN() {
return nil, fmt.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN")
}
hostdata, ok := data[query.Hostname]
if !ok {
hostdata = make(map[string][]*schema.JobMetric)
@ -502,6 +523,11 @@ func (ccms *CCMetricStore) LoadNodeData(cluster, partition string, metrics, node
{
Hostname: query.Hostname,
Data: qdata.Data,
Statistics: &schema.MetricStatistics{
Avg: float64(qdata.Avg),
Min: float64(qdata.Min),
Max: float64(qdata.Max),
},
},
},
})