Add queries to metric data repositories

This commit is contained in:
Lou Knauer 2021-12-08 11:50:16 +01:00
parent bc8ad593fd
commit eb2df5aa1c
3 changed files with 199 additions and 29 deletions

View File

@ -56,7 +56,7 @@ func (ccms *CCMetricStore) Init(url string) error {
return nil
}
func (ccms *CCMetricStore) LoadData(job *model.Job, metrics []string, ctx context.Context) (schema.JobData, error) {
func (ccms *CCMetricStore) doRequest(job *model.Job, suffix string, metrics []string, ctx context.Context) (*http.Response, error) {
from, to := job.StartTime.Unix(), job.StartTime.Add(time.Duration(job.Duration)*time.Second).Unix()
reqBody := ApiRequestBody{}
reqBody.Metrics = metrics
@ -69,18 +69,21 @@ func (ccms *CCMetricStore) LoadData(job *model.Job, metrics []string, ctx contex
return nil, err
}
authHeader := fmt.Sprintf("Bearer %s", ccms.jwt)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s/api/%d/%d/timeseries?with-stats=true", ccms.url, from, to), bytes.NewReader(reqBodyBytes))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s/api/%d/%d/%s", ccms.url, from, to, suffix), bytes.NewReader(reqBodyBytes))
if err != nil {
return nil, err
}
req.Header.Add("Authorization", authHeader)
res, err := ccms.client.Do(req)
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt))
return ccms.client.Do(req)
}
func (ccms *CCMetricStore) LoadData(job *model.Job, metrics []string, ctx context.Context) (schema.JobData, error) {
res, err := ccms.doRequest(job, "timeseries?with-stats=true", metrics, ctx)
if err != nil {
return nil, err
}
resdata := make([]map[string]ApiMetricData, 0, len(reqBody.Selectors))
resdata := make([]map[string]ApiMetricData, 0, len(job.Nodes))
if err := json.NewDecoder(res.Body).Decode(&resdata); err != nil {
return nil, err
}
@ -101,7 +104,7 @@ func (ccms *CCMetricStore) LoadData(job *model.Job, metrics []string, ctx contex
}
if data.Avg == nil || data.Min == nil || data.Max == nil {
return nil, errors.New("no data")
return nil, fmt.Errorf("no data for node '%s' and metric '%s'", node, metric)
}
metricData.Series = append(metricData.Series, &schema.MetricSeries{
@ -119,3 +122,108 @@ func (ccms *CCMetricStore) LoadData(job *model.Job, metrics []string, ctx contex
return jobData, nil
}
func (ccms *CCMetricStore) LoadStats(job *model.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) {
res, err := ccms.doRequest(job, "stats", metrics, ctx)
if err != nil {
return nil, err
}
resdata := make([]map[string]ApiStatsData, 0, len(job.Nodes))
if err := json.NewDecoder(res.Body).Decode(&resdata); err != nil {
return nil, err
}
stats := map[string]map[string]schema.MetricStatistics{}
for _, metric := range metrics {
nodestats := map[string]schema.MetricStatistics{}
for i, node := range job.Nodes {
data := resdata[i][metric]
if data.Error != nil {
return nil, errors.New(*data.Error)
}
if data.Samples == 0 {
return nil, fmt.Errorf("no data for node '%s' and metric '%s'", node, metric)
}
nodestats[node] = schema.MetricStatistics{
Avg: float64(data.Avg),
Min: float64(data.Min),
Max: float64(data.Max),
}
}
stats[metric] = nodestats
}
return stats, nil
}
func (ccms *CCMetricStore) LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx context.Context) (map[string]map[string][]schema.Float, error) {
reqBody := ApiRequestBody{}
reqBody.Metrics = metrics
for _, node := range nodes {
reqBody.Selectors = append(reqBody.Selectors, []string{clusterId, node})
}
reqBodyBytes, err := json.Marshal(reqBody)
if err != nil {
return nil, err
}
var req *http.Request
if nodes == nil {
req, err = http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s/api/%d/%d/all-nodes", ccms.url, from, to), bytes.NewReader(reqBodyBytes))
} else {
req, err = http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s/api/%d/%d/timeseries", ccms.url, from, to), bytes.NewReader(reqBodyBytes))
}
if err != nil {
return nil, err
}
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt))
res, err := ccms.client.Do(req)
if err != nil {
return nil, err
}
data := map[string]map[string][]schema.Float{}
if nodes == nil {
resdata := map[string]map[string]ApiMetricData{}
if err := json.NewDecoder(res.Body).Decode(&resdata); err != nil {
return nil, err
}
for node, metrics := range resdata {
nodedata := map[string][]schema.Float{}
for metric, data := range metrics {
if data.Error != nil {
return nil, errors.New(*data.Error)
}
nodedata[metric] = data.Data
}
data[node] = nodedata
}
} else {
resdata := make([]map[string]ApiMetricData, 0, len(nodes))
if err := json.NewDecoder(res.Body).Decode(&resdata); err != nil {
return nil, err
}
for i, node := range nodes {
metricsData := map[string][]schema.Float{}
for metric, data := range resdata[i] {
if data.Error != nil {
return nil, errors.New(*data.Error)
}
metricsData[metric] = data.Data
}
data[node] = metricsData
}
}
return data, nil
}

View File

@ -94,10 +94,33 @@ func (idb *InfluxDBv2DataRepository) LoadData(job *model.Job, metrics []string,
currentSeries.Data = append(currentSeries.Data, schema.Float(val))
}
return jobData, idb.addStats(job, jobData, metrics, hostsCond, ctx)
stats, err := idb.LoadStats(job, metrics, ctx)
if err != nil {
return nil, err
}
for metric, nodes := range stats {
jobMetric := jobData[metric]
for node, stats := range nodes {
for _, series := range jobMetric.Series {
if series.NodeID == node {
series.Statistics = &stats
}
}
}
}
func (idb *InfluxDBv2DataRepository) addStats(job *model.Job, jobData schema.JobData, metrics []string, hostsCond string, ctx context.Context) error {
return jobData, nil
}
func (idb *InfluxDBv2DataRepository) LoadStats(job *model.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) {
stats := map[string]map[string]schema.MetricStatistics{}
hostsConds := make([]string, 0, len(job.Nodes))
for _, h := range job.Nodes {
hostsConds = append(hostsConds, fmt.Sprintf(`r.host == "%s"`, h))
}
hostsCond := strings.Join(hostsConds, " or ")
for _, metric := range metrics {
query := fmt.Sprintf(`
data = from(bucket: "%s")
@ -115,10 +138,10 @@ func (idb *InfluxDBv2DataRepository) addStats(job *model.Job, jobData schema.Job
idb.measurement, metric, hostsCond)
rows, err := idb.queryClient.Query(ctx, query)
if err != nil {
return err
return nil, err
}
jobMetric := jobData[metric]
nodes := map[string]schema.MetricStatistics{}
for rows.Next() {
row := rows.Record()
host := row.ValueByKey("host").(string)
@ -126,18 +149,18 @@ func (idb *InfluxDBv2DataRepository) addStats(job *model.Job, jobData schema.Job
row.ValueByKey("min").(float64),
row.ValueByKey("max").(float64)
for _, s := range jobMetric.Series {
if s.NodeID == host {
s.Statistics = &schema.MetricStatistics{
nodes[host] = schema.MetricStatistics{
Avg: avg,
Min: min,
Max: max,
}
break
}
}
}
stats[metric] = nodes
}
return nil
return stats, nil
}
func (idb *InfluxDBv2DataRepository) LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx context.Context) (map[string]map[string][]schema.Float, error) {
return nil, nil
}

View File

@ -2,7 +2,6 @@ package metricdata
import (
"context"
"errors"
"fmt"
"github.com/ClusterCockpit/cc-jobarchive/config"
@ -13,6 +12,8 @@ import (
type MetricDataRepository interface {
Init(url string) error
LoadData(job *model.Job, metrics []string, ctx context.Context) (schema.JobData, error)
LoadStats(job *model.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error)
LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx context.Context) (map[string]map[string][]schema.Float, error)
}
var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{}
@ -55,10 +56,6 @@ func LoadData(job *model.Job, metrics []string, ctx context.Context) (schema.Job
return repo.LoadData(job, metrics, ctx)
}
if job.State != model.JobStateCompleted {
return nil, fmt.Errorf("job of state '%s' is not supported", job.State)
}
data, err := loadFromArchive(job)
if err != nil {
return nil, err
@ -78,9 +75,51 @@ func LoadData(job *model.Job, metrics []string, ctx context.Context) (schema.Job
// Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize.
func LoadAverages(job *model.Job, metrics []string, data [][]schema.Float, ctx context.Context) error {
if job.State != model.JobStateCompleted {
return errors.New("only completed jobs are supported")
}
if job.State != model.JobStateRunning {
return loadAveragesFromArchive(job, metrics, data)
}
repo, ok := metricDataRepos[job.ClusterID]
if !ok {
return fmt.Errorf("no metric data repository configured for '%s'", job.ClusterID)
}
stats, err := repo.LoadStats(job, metrics, ctx)
if err != nil {
return err
}
for i, m := range metrics {
nodes, ok := stats[m]
if !ok {
data[i] = append(data[i], schema.NaN)
continue
}
sum := 0.0
for _, node := range nodes {
sum += node.Avg
}
data[i] = append(data[i], schema.Float(sum))
}
return nil
}
func LoadNodeData(clusterId string, metrics, nodes []string, from, to int64, ctx context.Context) (map[string]map[string][]schema.Float, error) {
repo, ok := metricDataRepos[clusterId]
if !ok {
return nil, fmt.Errorf("no metric data repository configured for '%s'", clusterId)
}
data, err := repo.LoadNodeData(clusterId, metrics, nodes, from, to, ctx)
if err != nil {
return nil, err
}
if data == nil {
return nil, fmt.Errorf("the metric data repository for '%s' does not support this query", clusterId)
}
return data, nil
}