Start implementing improved MetricDataRepository configuration

This commit is contained in:
Christoph Kluge 2022-03-17 16:15:35 +01:00
parent 3e0de24307
commit 3f65065874
5 changed files with 58 additions and 38 deletions

View File

@ -1,6 +1,9 @@
package model package model
import "strconv" import (
"encoding/json"
"strconv"
)
type Cluster struct { type Cluster struct {
Name string `json:"name"` Name string `json:"name"`
@ -8,19 +11,9 @@ type Cluster struct {
FilterRanges *FilterRanges `json:"filterRanges"` FilterRanges *FilterRanges `json:"filterRanges"`
Partitions []*Partition `json:"partitions"` Partitions []*Partition `json:"partitions"`
// NOT part of the API: // NOT part of the GraphQL API. This has to be a JSON object with a field `"kind"`.
MetricDataRepository *MetricDataRepository `json:"metricDataRepository"` // All other fields depend on that kind (e.g. "cc-metric-store", "influxdb-v2").
} MetricDataRepository json.RawMessage `json:"metricDataRepository"`
type MetricDataRepository struct {
Kind string `json:"kind"`
Url string `json:"url"`
Token string `json:"token"`
// If metrics are known to this MetricDataRepository under a different
// name than in the `metricConfig` section of the 'cluster.json',
// provide this optional mapping of local to remote name for this metric.
Renamings map[string]string `json:"metricRenamings"`
} }
// Return a list of socket IDs given a list of hwthread IDs. // Return a list of socket IDs given a list of hwthread IDs.

View File

@ -14,6 +14,17 @@ import (
"github.com/ClusterCockpit/cc-backend/schema" "github.com/ClusterCockpit/cc-backend/schema"
) )
type CCMetricStoreConfig struct {
Kind string `json:"kind"`
Url string `json:"url"`
Token string `json:"token"`
// If metrics are known to this MetricDataRepository under a different
// name than in the `metricConfig` section of the 'cluster.json',
// provide this optional mapping of local to remote name for this metric.
Renamings map[string]string `json:"metricRenamings"`
}
type CCMetricStore struct { type CCMetricStore struct {
jwt string jwt string
url string url string
@ -58,17 +69,22 @@ type ApiMetricData struct {
Max schema.Float `json:"max"` Max schema.Float `json:"max"`
} }
func (ccms *CCMetricStore) Init(url, token string, renamings map[string]string) error { func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error {
ccms.url = url var config CCMetricStoreConfig
ccms.queryEndpoint = fmt.Sprintf("%s/api/query", url) if err := json.Unmarshal(rawConfig, &config); err != nil {
ccms.jwt = token return err
}
ccms.url = config.Url
ccms.queryEndpoint = fmt.Sprintf("%s/api/query", config.Url)
ccms.jwt = config.Token
ccms.client = http.Client{ ccms.client = http.Client{
Timeout: 5 * time.Second, Timeout: 5 * time.Second,
} }
if renamings != nil { if config.Renamings != nil {
ccms.here2there = renamings ccms.here2there = config.Renamings
ccms.there2here = make(map[string]string, len(renamings)) ccms.there2here = make(map[string]string, len(config.Renamings))
for k, v := range ccms.here2there { for k, v := range ccms.here2there {
ccms.there2here[v] = k ccms.there2here[v] = k
} }

View File

@ -8,6 +8,7 @@ import (
"strings" "strings"
"time" "time"
"crypto/tls" "crypto/tls"
"encoding/json"
"github.com/ClusterCockpit/cc-backend/config" "github.com/ClusterCockpit/cc-backend/config"
"github.com/ClusterCockpit/cc-backend/schema" "github.com/ClusterCockpit/cc-backend/schema"
@ -15,15 +16,25 @@ import (
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
) )
type InfluxDBv2DataRepositoryConfig struct {
Url string `json:"url"`
Token string `json:"token"`
// TODO: bucket, ...
}
type InfluxDBv2DataRepository struct { type InfluxDBv2DataRepository struct {
client influxdb2.Client client influxdb2.Client
queryClient influxdb2Api.QueryAPI queryClient influxdb2Api.QueryAPI
bucket, measurement string bucket, measurement string
} }
func (idb *InfluxDBv2DataRepository) Init(url string, token string, renamings map[string]string) error { func (idb *InfluxDBv2DataRepository) Init(rawConfig json.RawMessage) error {
var config InfluxDBv2DataRepositoryConfig
if err := json.Unmarshal(rawConfig, &config); err != nil {
return err
}
idb.client = influxdb2.NewClientWithOptions(url, token, influxdb2.DefaultOptions().SetTLSConfig(&tls.Config {InsecureSkipVerify: true,} )) idb.client = influxdb2.NewClientWithOptions(config.Url, config.Token, influxdb2.DefaultOptions().SetTLSConfig(&tls.Config {InsecureSkipVerify: true,} ))
idb.queryClient = idb.client.QueryAPI("ClusterCockpit") // TODO: Make configurable idb.queryClient = idb.client.QueryAPI("ClusterCockpit") // TODO: Make configurable
return nil return nil

View File

@ -2,6 +2,7 @@ package metricdata
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"time" "time"
@ -14,7 +15,7 @@ import (
type MetricDataRepository interface { type MetricDataRepository interface {
// Initialize this MetricDataRepository. One instance of // Initialize this MetricDataRepository. One instance of
// this interface will only ever be responsible for one cluster. // this interface will only ever be responsible for one cluster.
Init(url, token string, renamings map[string]string) error Init(rawConfig json.RawMessage) error
// Return the JobData for the given job, only with the requested metrics. // Return the JobData for the given job, only with the requested metrics.
LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error)
@ -37,8 +38,15 @@ func Init(jobArchivePath string, disableArchive bool) error {
JobArchivePath = jobArchivePath JobArchivePath = jobArchivePath
for _, cluster := range config.Clusters { for _, cluster := range config.Clusters {
if cluster.MetricDataRepository != nil { if cluster.MetricDataRepository != nil {
var kind struct {
Kind string `json:"kind"`
}
if err := json.Unmarshal(cluster.MetricDataRepository, &kind); err != nil {
return err
}
var mdr MetricDataRepository var mdr MetricDataRepository
switch cluster.MetricDataRepository.Kind { switch kind.Kind {
case "cc-metric-store": case "cc-metric-store":
mdr = &CCMetricStore{} mdr = &CCMetricStore{}
case "influxdb": case "influxdb":
@ -46,13 +54,10 @@ func Init(jobArchivePath string, disableArchive bool) error {
case "test": case "test":
mdr = &TestMetricDataRepository{} mdr = &TestMetricDataRepository{}
default: default:
return fmt.Errorf("unkown metric data repository '%s' for cluster '%s'", cluster.MetricDataRepository.Kind, cluster.Name) return fmt.Errorf("unkown metric data repository '%s' for cluster '%s'", kind.Kind, cluster.Name)
} }
if err := mdr.Init( if err := mdr.Init(cluster.MetricDataRepository); err != nil {
cluster.MetricDataRepository.Url,
cluster.MetricDataRepository.Token,
cluster.MetricDataRepository.Renamings); err != nil {
return err return err
} }
metricDataRepos[cluster.Name] = mdr metricDataRepos[cluster.Name] = mdr

View File

@ -2,6 +2,7 @@ package metricdata
import ( import (
"context" "context"
"encoding/json"
"time" "time"
"github.com/ClusterCockpit/cc-backend/schema" "github.com/ClusterCockpit/cc-backend/schema"
@ -12,15 +13,9 @@ var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema
} }
// Only a mock for unit-testing. // Only a mock for unit-testing.
type TestMetricDataRepository struct { type TestMetricDataRepository struct{}
url, token string
renamings map[string]string
}
func (tmdr *TestMetricDataRepository) Init(url, token string, renamings map[string]string) error { func (tmdr *TestMetricDataRepository) Init(_ json.RawMessage) error {
tmdr.url = url
tmdr.token = token
tmdr.renamings = renamings
return nil return nil
} }