From 56ebb301ca5361e8930370a2f5543d4cddae58f3 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Aug 2024 10:14:33 +0200 Subject: [PATCH 1/3] Start to restructure Does not compile --- internal/metricdata/metricdata.go | 66 ------------------------------ pkg/archive/archive.go | 68 +++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 66 deletions(-) diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index eba9dee..3d93102 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -8,7 +8,6 @@ import ( "context" "encoding/json" "fmt" - "math" "time" "github.com/ClusterCockpit/cc-backend/internal/config" @@ -292,68 +291,3 @@ func prepareJobData( jobData.AddNodeScope("mem_bw") } } - -// Writes a running job to the job-archive -func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { - allMetrics := make([]string, 0) - metricConfigs := archive.GetCluster(job.Cluster).MetricConfig - for _, mc := range metricConfigs { - allMetrics = append(allMetrics, mc.Name) - } - - // TODO: Talk about this! What resolutions to store data at... - scopes := []schema.MetricScope{schema.MetricScopeNode} - if job.NumNodes <= 8 { - scopes = append(scopes, schema.MetricScopeCore) - } - - if job.NumAcc > 0 { - scopes = append(scopes, schema.MetricScopeAccelerator) - } - - jobData, err := LoadData(job, allMetrics, scopes, ctx) - if err != nil { - log.Error("Error wile loading job data for archiving") - return nil, err - } - - jobMeta := &schema.JobMeta{ - BaseJob: job.BaseJob, - StartTime: job.StartTime.Unix(), - Statistics: make(map[string]schema.JobStatistics), - } - - for metric, data := range jobData { - avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 - nodeData, ok := data["node"] - if !ok { - // TODO/FIXME: Calc average for non-node metrics as well! - continue - } - - for _, series := range nodeData.Series { - avg += series.Statistics.Avg - min = math.Min(min, series.Statistics.Min) - max = math.Max(max, series.Statistics.Max) - } - - jobMeta.Statistics[metric] = schema.JobStatistics{ - Unit: schema.Unit{ - Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix, - Base: archive.GetMetricConfig(job.Cluster, metric).Unit.Base, - }, - Avg: avg / float64(job.NumNodes), - Min: min, - Max: max, - } - } - - // If the file based archive is disabled, - // only return the JobMeta structure as the - // statistics in there are needed. - if !useArchive { - return jobMeta, nil - } - - return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData) -} diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 56c5d47..765a2ce 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -5,10 +5,13 @@ package archive import ( + "context" "encoding/json" "fmt" + "math" "sync" + "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -102,6 +105,71 @@ func GetHandle() ArchiveBackend { return ar } +// Writes a running job to the job-archive +func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { + allMetrics := make([]string, 0) + metricConfigs := GetCluster(job.Cluster).MetricConfig + for _, mc := range metricConfigs { + allMetrics = append(allMetrics, mc.Name) + } + + // TODO: Talk about this! What resolutions to store data at... + scopes := []schema.MetricScope{schema.MetricScopeNode} + if job.NumNodes <= 8 { + scopes = append(scopes, schema.MetricScopeCore) + } + + if job.NumAcc > 0 { + scopes = append(scopes, schema.MetricScopeAccelerator) + } + + jobData, err := metricdata.LoadData(job, allMetrics, scopes, ctx) + if err != nil { + log.Error("Error wile loading job data for archiving") + return nil, err + } + + jobMeta := &schema.JobMeta{ + BaseJob: job.BaseJob, + StartTime: job.StartTime.Unix(), + Statistics: make(map[string]schema.JobStatistics), + } + + for metric, data := range jobData { + avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 + nodeData, ok := data["node"] + if !ok { + // TODO/FIXME: Calc average for non-node metrics as well! + continue + } + + for _, series := range nodeData.Series { + avg += series.Statistics.Avg + min = math.Min(min, series.Statistics.Min) + max = math.Max(max, series.Statistics.Max) + } + + jobMeta.Statistics[metric] = schema.JobStatistics{ + Unit: schema.Unit{ + Prefix: GetMetricConfig(job.Cluster, metric).Unit.Prefix, + Base: GetMetricConfig(job.Cluster, metric).Unit.Base, + }, + Avg: avg / float64(job.NumNodes), + Min: min, + Max: max, + } + } + + // If the file based archive is disabled, + // only return the JobMeta structure as the + // statistics in there are needed. + if !useArchive { + return jobMeta, nil + } + + return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData) +} + // Helper to metricdata.LoadAverages(). func LoadAveragesFromArchive( job *schema.Job, From f914a312f53362108d63e67d208cd0cec61c4601 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 27 Aug 2024 16:44:16 +0200 Subject: [PATCH 2/3] Introduce metricDataDispatcher Does not compile yet --- cmd/cc-backend/main.go | 2 +- internal/metricDataDispatcher/dataLoader.go | 131 ++++++++++++++++++++ internal/metricdata/metricdata.go | 119 +----------------- 3 files changed, 138 insertions(+), 114 deletions(-) create mode 100644 internal/metricDataDispatcher/dataLoader.go diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 3a49923..9f7e673 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -181,7 +181,7 @@ func main() { log.Fatalf("failed to initialize archive: %s", err.Error()) } - if err := metricdata.Init(config.Keys.DisableArchive); err != nil { + if err := metricdata.Init(); err != nil { log.Fatalf("failed to initialize metricdata repository: %s", err.Error()) } diff --git a/internal/metricDataDispatcher/dataLoader.go b/internal/metricDataDispatcher/dataLoader.go new file mode 100644 index 0000000..a463ada --- /dev/null +++ b/internal/metricDataDispatcher/dataLoader.go @@ -0,0 +1,131 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package metricDataDispatcher + +import ( + "context" + "fmt" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/lrucache" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) + +func cacheKey( + job *schema.Job, + metrics []string, + scopes []schema.MetricScope, +) string { + // Duration and StartTime do not need to be in the cache key as StartTime is less unique than + // job.ID and the TTL of the cache entry makes sure it does not stay there forever. + return fmt.Sprintf("%d(%s):[%v],[%v]", + job.ID, job.State, metrics, scopes) +} + +// Fetches the metric data for a job. +func LoadData(job *schema.Job, + metrics []string, + scopes []schema.MetricScope, + ctx context.Context, +) (schema.JobData, error) { + data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) { + var jd schema.JobData + var err error + + if job.State == schema.JobStateRunning || + job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || + !config.Keys.DisableArchive { + + repo, ok := metricdata.GetMetricDataRepo(job.Cluster) + + if !ok { + return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0 + } + + if scopes == nil { + scopes = append(scopes, schema.MetricScopeNode) + } + + if metrics == nil { + cluster := archive.GetCluster(job.Cluster) + for _, mc := range cluster.MetricConfig { + metrics = append(metrics, mc.Name) + } + } + + jd, err = repo.LoadData(job, metrics, scopes, ctx) + if err != nil { + if len(jd) != 0 { + log.Warnf("partial error: %s", err.Error()) + // return err, 0, 0 // Reactivating will block archiving on one partial error + } else { + log.Error("Error while loading job data from metric repository") + return err, 0, 0 + } + } + size = jd.Size() + } else { + jd, err = archive.GetHandle().LoadJobData(job) + if err != nil { + log.Error("Error while loading job data from archive") + return err, 0, 0 + } + + // Avoid sending unrequested data to the client: + if metrics != nil || scopes != nil { + if metrics == nil { + metrics = make([]string, 0, len(jd)) + for k := range jd { + metrics = append(metrics, k) + } + } + + res := schema.JobData{} + for _, metric := range metrics { + if perscope, ok := jd[metric]; ok { + if len(perscope) > 1 { + subset := make(map[schema.MetricScope]*schema.JobMetric) + for _, scope := range scopes { + if jm, ok := perscope[scope]; ok { + subset[scope] = jm + } + } + + if len(subset) > 0 { + perscope = subset + } + } + + res[metric] = perscope + } + } + jd = res + } + size = jd.Size() + } + + ttl = 5 * time.Hour + if job.State == schema.JobStateRunning { + ttl = 2 * time.Minute + } + + prepareJobData(jd, scopes) + + return jd, ttl, size + }) + + if err, ok := data.(error); ok { + log.Error("Error in returned dataset") + return nil, err + } + + return data.(schema.JobData), nil +} diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index 3d93102..feefb0a 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -13,7 +13,6 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" - "github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) @@ -34,10 +33,7 @@ type MetricDataRepository interface { var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} -var useArchive bool - -func Init(disableArchive bool) error { - useArchive = !disableArchive +func Init() error { for _, cluster := range config.Keys.Clusters { if cluster.MetricDataRepository != nil { var kind struct { @@ -72,106 +68,14 @@ func Init(disableArchive bool) error { return nil } -var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) +func GetMetricDataRepo(cluster string) MetricDataRepository { + repo, ok := metricDataRepos[cluster] -// Fetches the metric data for a job. -func LoadData(job *schema.Job, - metrics []string, - scopes []schema.MetricScope, - ctx context.Context, -) (schema.JobData, error) { - data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) { - var jd schema.JobData - var err error - - if job.State == schema.JobStateRunning || - job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || - !useArchive { - - repo, ok := metricDataRepos[job.Cluster] - - if !ok { - return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0 - } - - if scopes == nil { - scopes = append(scopes, schema.MetricScopeNode) - } - - if metrics == nil { - cluster := archive.GetCluster(job.Cluster) - for _, mc := range cluster.MetricConfig { - metrics = append(metrics, mc.Name) - } - } - - jd, err = repo.LoadData(job, metrics, scopes, ctx) - if err != nil { - if len(jd) != 0 { - log.Warnf("partial error: %s", err.Error()) - // return err, 0, 0 // Reactivating will block archiving on one partial error - } else { - log.Error("Error while loading job data from metric repository") - return err, 0, 0 - } - } - size = jd.Size() - } else { - jd, err = archive.GetHandle().LoadJobData(job) - if err != nil { - log.Error("Error while loading job data from archive") - return err, 0, 0 - } - - // Avoid sending unrequested data to the client: - if metrics != nil || scopes != nil { - if metrics == nil { - metrics = make([]string, 0, len(jd)) - for k := range jd { - metrics = append(metrics, k) - } - } - - res := schema.JobData{} - for _, metric := range metrics { - if perscope, ok := jd[metric]; ok { - if len(perscope) > 1 { - subset := make(map[schema.MetricScope]*schema.JobMetric) - for _, scope := range scopes { - if jm, ok := perscope[scope]; ok { - subset[scope] = jm - } - } - - if len(subset) > 0 { - perscope = subset - } - } - - res[metric] = perscope - } - } - jd = res - } - size = jd.Size() - } - - ttl = 5 * time.Hour - if job.State == schema.JobStateRunning { - ttl = 2 * time.Minute - } - - prepareJobData(jd, scopes) - - return jd, ttl, size - }) - - if err, ok := data.(error); ok { - log.Error("Error in returned dataset") - return nil, err + if !ok { + return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0 } - return data.(schema.JobData), nil + return repo } // Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize. @@ -249,17 +153,6 @@ func LoadNodeData( return data, nil } -func cacheKey( - job *schema.Job, - metrics []string, - scopes []schema.MetricScope, -) string { - // Duration and StartTime do not need to be in the cache key as StartTime is less unique than - // job.ID and the TTL of the cache entry makes sure it does not stay there forever. - return fmt.Sprintf("%d(%s):[%v],[%v]", - job.ID, job.State, metrics, scopes) -} - // For /monitoring/job/ and some other places, flops_any and mem_bw need // to be available at the scope 'node'. If a job has a lot of nodes, // statisticsSeries should be available so that a min/median/max Graph can be From e7231b0e13d039636dbd7741aa8edaa5eb855f5c Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 28 Aug 2024 10:03:04 +0200 Subject: [PATCH 3/3] Finish refactoring Add new packages: - metricDataDispatcher - archiver --- internal/api/api_test.go | 5 +- internal/api/rest.go | 6 +- internal/archiver/archiver.go | 81 ++++++++++++++ internal/graph/schema.resolvers.go | 20 ++-- internal/graph/util.go | 10 +- internal/metricDataDispatcher/dataLoader.go | 110 ++++++++++++++++++- internal/metricdata/metricdata.go | 115 +------------------- internal/repository/archiveWorker.go | 4 +- internal/repository/stats.go | 4 +- pkg/archive/archive.go | 68 ------------ 10 files changed, 216 insertions(+), 207 deletions(-) create mode 100644 internal/archiver/archiver.go diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 80a7e64..a6d183e 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -22,6 +22,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" + "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/archive" @@ -150,7 +151,7 @@ func setup(t *testing.T) *api.RestApi { t.Fatal(err) } - if err := metricdata.Init(config.Keys.DisableArchive); err != nil { + if err := metricdata.Init(); err != nil { t.Fatal(err) } @@ -341,7 +342,7 @@ func TestRestApi(t *testing.T) { } t.Run("CheckArchive", func(t *testing.T) { - data, err := metricdata.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background()) + data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background()) if err != nil { t.Fatal(err) } diff --git a/internal/api/rest.go b/internal/api/rest.go index c8f4e7a..da0f4be 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -24,7 +24,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/importer" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/archive" @@ -515,7 +515,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) var data schema.JobData if r.URL.Query().Get("all-metrics") == "true" { - data, err = metricdata.LoadData(job, nil, scopes, r.Context()) + data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context()) if err != nil { log.Warn("Error while loading job data") return @@ -604,7 +604,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { scopes = []schema.MetricScope{"node"} } - data, err := metricdata.LoadData(job, metrics, scopes, r.Context()) + data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context()) if err != nil { log.Warn("Error while loading job data") return diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go new file mode 100644 index 0000000..e10a994 --- /dev/null +++ b/internal/archiver/archiver.go @@ -0,0 +1,81 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package archiver + +import ( + "context" + "math" + + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +// Writes a running job to the job-archive +func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { + allMetrics := make([]string, 0) + metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + for _, mc := range metricConfigs { + allMetrics = append(allMetrics, mc.Name) + } + + // TODO: Talk about this! What resolutions to store data at... + scopes := []schema.MetricScope{schema.MetricScopeNode} + if job.NumNodes <= 8 { + scopes = append(scopes, schema.MetricScopeCore) + } + + if job.NumAcc > 0 { + scopes = append(scopes, schema.MetricScopeAccelerator) + } + + jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx) + if err != nil { + log.Error("Error wile loading job data for archiving") + return nil, err + } + + jobMeta := &schema.JobMeta{ + BaseJob: job.BaseJob, + StartTime: job.StartTime.Unix(), + Statistics: make(map[string]schema.JobStatistics), + } + + for metric, data := range jobData { + avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 + nodeData, ok := data["node"] + if !ok { + // TODO/FIXME: Calc average for non-node metrics as well! + continue + } + + for _, series := range nodeData.Series { + avg += series.Statistics.Avg + min = math.Min(min, series.Statistics.Min) + max = math.Max(max, series.Statistics.Max) + } + + jobMeta.Statistics[metric] = schema.JobStatistics{ + Unit: schema.Unit{ + Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix, + Base: archive.GetMetricConfig(job.Cluster, metric).Unit.Base, + }, + Avg: avg / float64(job.NumNodes), + Min: min, + Max: max, + } + } + + // If the file based archive is disabled, + // only return the JobMeta structure as the + // statistics in there are needed. + if config.Keys.DisableArchive { + return jobMeta, nil + } + + return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData) +} diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index f36e25a..6177bce 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -15,7 +15,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/generated" "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" @@ -231,7 +231,7 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str return nil, err } - data, err := metricdata.LoadData(job, metrics, scopes, ctx) + data, err := metricDataDispatcher.LoadData(job, metrics, scopes, ctx) if err != nil { log.Warn("Error while loading job data") return nil, err @@ -383,7 +383,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes [ } } - data, err := metricdata.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) + data, err := metricDataDispatcher.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) if err != nil { log.Warn("Error while loading node data") return nil, err @@ -440,9 +440,11 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } // SubCluster returns generated.SubClusterResolver implementation. func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} } -type clusterResolver struct{ *Resolver } -type jobResolver struct{ *Resolver } -type metricValueResolver struct{ *Resolver } -type mutationResolver struct{ *Resolver } -type queryResolver struct{ *Resolver } -type subClusterResolver struct{ *Resolver } +type ( + clusterResolver struct{ *Resolver } + jobResolver struct{ *Resolver } + metricValueResolver struct{ *Resolver } + mutationResolver struct{ *Resolver } + queryResolver struct{ *Resolver } + subClusterResolver struct{ *Resolver } +) diff --git a/internal/graph/util.go b/internal/graph/util.go index 3e65b6c..8296a02 100644 --- a/internal/graph/util.go +++ b/internal/graph/util.go @@ -11,7 +11,7 @@ import ( "github.com/99designs/gqlgen/graphql" "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" // "github.com/ClusterCockpit/cc-backend/pkg/archive" @@ -24,8 +24,8 @@ func (r *queryResolver) rooflineHeatmap( ctx context.Context, filter []*model.JobFilter, rows int, cols int, - minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) { - + minX float64, minY float64, maxX float64, maxY float64, +) ([][]float64, error) { jobs, err := r.Repo.QueryJobs(ctx, filter, &model.PageRequest{Page: 1, ItemsPerPage: MAX_JOBS_FOR_ANALYSIS + 1}, nil) if err != nil { log.Error("Error while querying jobs for roofline") @@ -47,7 +47,7 @@ func (r *queryResolver) rooflineHeatmap( continue } - jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx) + jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx) if err != nil { log.Errorf("Error while loading roofline metrics for job %d", job.ID) return nil, err @@ -120,7 +120,7 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF continue } - if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil { + if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil { log.Error("Error while loading averages for footprint") return nil, err } diff --git a/internal/metricDataDispatcher/dataLoader.go b/internal/metricDataDispatcher/dataLoader.go index a463ada..2c7cfa6 100644 --- a/internal/metricDataDispatcher/dataLoader.go +++ b/internal/metricDataDispatcher/dataLoader.go @@ -42,11 +42,10 @@ func LoadData(job *schema.Job, if job.State == schema.JobStateRunning || job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || - !config.Keys.DisableArchive { + config.Keys.DisableArchive { - repo, ok := metricdata.GetMetricDataRepo(job.Cluster) - - if !ok { + repo, err := metricdata.GetMetricDataRepo(job.Cluster) + if err != nil { return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0 } @@ -117,7 +116,33 @@ func LoadData(job *schema.Job, ttl = 2 * time.Minute } - prepareJobData(jd, scopes) + // FIXME: Review: Is this really necessary or correct. + // For /monitoring/job/ and some other places, flops_any and mem_bw need + // to be available at the scope 'node'. If a job has a lot of nodes, + // statisticsSeries should be available so that a min/median/max Graph can be + // used instead of a lot of single lines. + const maxSeriesSize int = 15 + for _, scopes := range jd { + for _, jm := range scopes { + if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize { + continue + } + + jm.AddStatisticsSeries() + } + } + + nodeScopeRequested := false + for _, scope := range scopes { + if scope == schema.MetricScopeNode { + nodeScopeRequested = true + } + } + + if nodeScopeRequested { + jd.AddNodeScope("flops_any") + jd.AddNodeScope("mem_bw") + } return jd, ttl, size }) @@ -129,3 +154,78 @@ func LoadData(job *schema.Job, return data.(schema.JobData), nil } + +// Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize. +func LoadAverages( + job *schema.Job, + metrics []string, + data [][]schema.Float, + ctx context.Context, +) error { + if job.State != schema.JobStateRunning && !config.Keys.DisableArchive { + return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here? + } + + repo, err := metricdata.GetMetricDataRepo(job.Cluster) + if err != nil { + return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster) + } + + stats, err := repo.LoadStats(job, metrics, ctx) // #166 how to handle stats for acc normalizazion? + if err != nil { + log.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project) + return err + } + + for i, m := range metrics { + nodes, ok := stats[m] + if !ok { + data[i] = append(data[i], schema.NaN) + continue + } + + sum := 0.0 + for _, node := range nodes { + sum += node.Avg + } + data[i] = append(data[i], schema.Float(sum)) + } + + return nil +} + +// Used for the node/system view. Returns a map of nodes to a map of metrics. +func LoadNodeData( + cluster string, + metrics, nodes []string, + scopes []schema.MetricScope, + from, to time.Time, + ctx context.Context, +) (map[string]map[string][]*schema.JobMetric, error) { + repo, err := metricdata.GetMetricDataRepo(cluster) + if err != nil { + return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) + } + + if metrics == nil { + for _, m := range archive.GetCluster(cluster).MetricConfig { + metrics = append(metrics, m.Name) + } + } + + data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) + if err != nil { + if len(data) != 0 { + log.Warnf("partial error: %s", err.Error()) + } else { + log.Error("Error while loading node data from metric repository") + return nil, err + } + } + + if data == nil { + return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster) + } + + return data, nil +} diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index feefb0a..68d8d32 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -11,7 +11,6 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) @@ -68,119 +67,13 @@ func Init() error { return nil } -func GetMetricDataRepo(cluster string) MetricDataRepository { +func GetMetricDataRepo(cluster string) (MetricDataRepository, error) { + var err error repo, ok := metricDataRepos[cluster] if !ok { - return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0 + err = fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) } - return repo -} - -// Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize. -func LoadAverages( - job *schema.Job, - metrics []string, - data [][]schema.Float, - ctx context.Context, -) error { - if job.State != schema.JobStateRunning && useArchive { - return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here? - } - - repo, ok := metricDataRepos[job.Cluster] - if !ok { - return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster) - } - - stats, err := repo.LoadStats(job, metrics, ctx) // #166 how to handle stats for acc normalizazion? - if err != nil { - log.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project) - return err - } - - for i, m := range metrics { - nodes, ok := stats[m] - if !ok { - data[i] = append(data[i], schema.NaN) - continue - } - - sum := 0.0 - for _, node := range nodes { - sum += node.Avg - } - data[i] = append(data[i], schema.Float(sum)) - } - - return nil -} - -// Used for the node/system view. Returns a map of nodes to a map of metrics. -func LoadNodeData( - cluster string, - metrics, nodes []string, - scopes []schema.MetricScope, - from, to time.Time, - ctx context.Context, -) (map[string]map[string][]*schema.JobMetric, error) { - repo, ok := metricDataRepos[cluster] - if !ok { - return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) - } - - if metrics == nil { - for _, m := range archive.GetCluster(cluster).MetricConfig { - metrics = append(metrics, m.Name) - } - } - - data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) - if err != nil { - if len(data) != 0 { - log.Warnf("partial error: %s", err.Error()) - } else { - log.Error("Error while loading node data from metric repository") - return nil, err - } - } - - if data == nil { - return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster) - } - - return data, nil -} - -// For /monitoring/job/ and some other places, flops_any and mem_bw need -// to be available at the scope 'node'. If a job has a lot of nodes, -// statisticsSeries should be available so that a min/median/max Graph can be -// used instead of a lot of single lines. -func prepareJobData( - jobData schema.JobData, - scopes []schema.MetricScope, -) { - const maxSeriesSize int = 15 - for _, scopes := range jobData { - for _, jm := range scopes { - if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize { - continue - } - - jm.AddStatisticsSeries() - } - } - - nodeScopeRequested := false - for _, scope := range scopes { - if scope == schema.MetricScopeNode { - nodeScopeRequested = true - } - } - - if nodeScopeRequested { - jobData.AddNodeScope("flops_any") - jobData.AddNodeScope("mem_bw") - } + return repo, err } diff --git a/internal/repository/archiveWorker.go b/internal/repository/archiveWorker.go index 42febb5..7094b7c 100644 --- a/internal/repository/archiveWorker.go +++ b/internal/repository/archiveWorker.go @@ -9,7 +9,7 @@ import ( "encoding/json" "time" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/archiver" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -35,7 +35,7 @@ func (r *JobRepository) archivingWorker() { // metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend // TODO: Maybe use context with cancel/timeout here - jobMeta, err := metricdata.ArchiveJob(job, context.Background()) + jobMeta, err := archiver.ArchiveJob(job, context.Background()) if err != nil { log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 81ca8d1..5682144 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -13,7 +13,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -691,7 +691,7 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram( continue } - if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil { + if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil { log.Errorf("Error while loading averages for histogram: %s", err) return nil } diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 765a2ce..56c5d47 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -5,13 +5,10 @@ package archive import ( - "context" "encoding/json" "fmt" - "math" "sync" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -105,71 +102,6 @@ func GetHandle() ArchiveBackend { return ar } -// Writes a running job to the job-archive -func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { - allMetrics := make([]string, 0) - metricConfigs := GetCluster(job.Cluster).MetricConfig - for _, mc := range metricConfigs { - allMetrics = append(allMetrics, mc.Name) - } - - // TODO: Talk about this! What resolutions to store data at... - scopes := []schema.MetricScope{schema.MetricScopeNode} - if job.NumNodes <= 8 { - scopes = append(scopes, schema.MetricScopeCore) - } - - if job.NumAcc > 0 { - scopes = append(scopes, schema.MetricScopeAccelerator) - } - - jobData, err := metricdata.LoadData(job, allMetrics, scopes, ctx) - if err != nil { - log.Error("Error wile loading job data for archiving") - return nil, err - } - - jobMeta := &schema.JobMeta{ - BaseJob: job.BaseJob, - StartTime: job.StartTime.Unix(), - Statistics: make(map[string]schema.JobStatistics), - } - - for metric, data := range jobData { - avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 - nodeData, ok := data["node"] - if !ok { - // TODO/FIXME: Calc average for non-node metrics as well! - continue - } - - for _, series := range nodeData.Series { - avg += series.Statistics.Avg - min = math.Min(min, series.Statistics.Min) - max = math.Max(max, series.Statistics.Max) - } - - jobMeta.Statistics[metric] = schema.JobStatistics{ - Unit: schema.Unit{ - Prefix: GetMetricConfig(job.Cluster, metric).Unit.Prefix, - Base: GetMetricConfig(job.Cluster, metric).Unit.Base, - }, - Avg: avg / float64(job.NumNodes), - Min: min, - Max: max, - } - } - - // If the file based archive is disabled, - // only return the JobMeta structure as the - // statistics in there are needed. - if !useArchive { - return jobMeta, nil - } - - return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData) -} - // Helper to metricdata.LoadAverages(). func LoadAveragesFromArchive( job *schema.Job,