mirror of
				https://github.com/ClusterCockpit/cc-backend
				synced 2025-10-24 22:35:06 +02:00 
			
		
		
		
	Merge pull request #287 from ClusterCockpit/refactor-archiving
Refactor archiving
This commit is contained in:
		| @@ -181,7 +181,7 @@ func main() { | |||||||
| 		log.Fatalf("failed to initialize archive: %s", err.Error()) | 		log.Fatalf("failed to initialize archive: %s", err.Error()) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := metricdata.Init(config.Keys.DisableArchive); err != nil { | 	if err := metricdata.Init(); err != nil { | ||||||
| 		log.Fatalf("failed to initialize metricdata repository: %s", err.Error()) | 		log.Fatalf("failed to initialize metricdata repository: %s", err.Error()) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -22,6 +22,7 @@ import ( | |||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/auth" | 	"github.com/ClusterCockpit/cc-backend/internal/auth" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/config" | 	"github.com/ClusterCockpit/cc-backend/internal/config" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/graph" | 	"github.com/ClusterCockpit/cc-backend/internal/graph" | ||||||
|  | 	"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/metricdata" | 	"github.com/ClusterCockpit/cc-backend/internal/metricdata" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/repository" | 	"github.com/ClusterCockpit/cc-backend/internal/repository" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/archive" | 	"github.com/ClusterCockpit/cc-backend/pkg/archive" | ||||||
| @@ -150,7 +151,7 @@ func setup(t *testing.T) *api.RestApi { | |||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := metricdata.Init(config.Keys.DisableArchive); err != nil { | 	if err := metricdata.Init(); err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -341,7 +342,7 @@ func TestRestApi(t *testing.T) { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	t.Run("CheckArchive", func(t *testing.T) { | 	t.Run("CheckArchive", func(t *testing.T) { | ||||||
| 		data, err := metricdata.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background()) | 		data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background()) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			t.Fatal(err) | 			t.Fatal(err) | ||||||
| 		} | 		} | ||||||
|   | |||||||
| @@ -24,7 +24,7 @@ import ( | |||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/graph" | 	"github.com/ClusterCockpit/cc-backend/internal/graph" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/graph/model" | 	"github.com/ClusterCockpit/cc-backend/internal/graph/model" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/importer" | 	"github.com/ClusterCockpit/cc-backend/internal/importer" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/metricdata" | 	"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/repository" | 	"github.com/ClusterCockpit/cc-backend/internal/repository" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/util" | 	"github.com/ClusterCockpit/cc-backend/internal/util" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/archive" | 	"github.com/ClusterCockpit/cc-backend/pkg/archive" | ||||||
| @@ -515,7 +515,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) | |||||||
| 	var data schema.JobData | 	var data schema.JobData | ||||||
|  |  | ||||||
| 	if r.URL.Query().Get("all-metrics") == "true" { | 	if r.URL.Query().Get("all-metrics") == "true" { | ||||||
| 		data, err = metricdata.LoadData(job, nil, scopes, r.Context()) | 		data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context()) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			log.Warn("Error while loading job data") | 			log.Warn("Error while loading job data") | ||||||
| 			return | 			return | ||||||
| @@ -604,7 +604,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { | |||||||
| 		scopes = []schema.MetricScope{"node"} | 		scopes = []schema.MetricScope{"node"} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	data, err := metricdata.LoadData(job, metrics, scopes, r.Context()) | 	data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context()) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Warn("Error while loading job data") | 		log.Warn("Error while loading job data") | ||||||
| 		return | 		return | ||||||
|   | |||||||
							
								
								
									
										81
									
								
								internal/archiver/archiver.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										81
									
								
								internal/archiver/archiver.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,81 @@ | |||||||
|  | // Copyright (C) NHR@FAU, University Erlangen-Nuremberg. | ||||||
|  | // All rights reserved. | ||||||
|  | // Use of this source code is governed by a MIT-style | ||||||
|  | // license that can be found in the LICENSE file. | ||||||
|  | package archiver | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"math" | ||||||
|  |  | ||||||
|  | 	"github.com/ClusterCockpit/cc-backend/internal/config" | ||||||
|  | 	"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" | ||||||
|  | 	"github.com/ClusterCockpit/cc-backend/pkg/archive" | ||||||
|  | 	"github.com/ClusterCockpit/cc-backend/pkg/log" | ||||||
|  | 	"github.com/ClusterCockpit/cc-backend/pkg/schema" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // Writes a running job to the job-archive | ||||||
|  | func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { | ||||||
|  | 	allMetrics := make([]string, 0) | ||||||
|  | 	metricConfigs := archive.GetCluster(job.Cluster).MetricConfig | ||||||
|  | 	for _, mc := range metricConfigs { | ||||||
|  | 		allMetrics = append(allMetrics, mc.Name) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// TODO: Talk about this! What resolutions to store data at... | ||||||
|  | 	scopes := []schema.MetricScope{schema.MetricScopeNode} | ||||||
|  | 	if job.NumNodes <= 8 { | ||||||
|  | 		scopes = append(scopes, schema.MetricScopeCore) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if job.NumAcc > 0 { | ||||||
|  | 		scopes = append(scopes, schema.MetricScopeAccelerator) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Error("Error wile loading job data for archiving") | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	jobMeta := &schema.JobMeta{ | ||||||
|  | 		BaseJob:    job.BaseJob, | ||||||
|  | 		StartTime:  job.StartTime.Unix(), | ||||||
|  | 		Statistics: make(map[string]schema.JobStatistics), | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for metric, data := range jobData { | ||||||
|  | 		avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 | ||||||
|  | 		nodeData, ok := data["node"] | ||||||
|  | 		if !ok { | ||||||
|  | 			// TODO/FIXME: Calc average for non-node metrics as well! | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		for _, series := range nodeData.Series { | ||||||
|  | 			avg += series.Statistics.Avg | ||||||
|  | 			min = math.Min(min, series.Statistics.Min) | ||||||
|  | 			max = math.Max(max, series.Statistics.Max) | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		jobMeta.Statistics[metric] = schema.JobStatistics{ | ||||||
|  | 			Unit: schema.Unit{ | ||||||
|  | 				Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix, | ||||||
|  | 				Base:   archive.GetMetricConfig(job.Cluster, metric).Unit.Base, | ||||||
|  | 			}, | ||||||
|  | 			Avg: avg / float64(job.NumNodes), | ||||||
|  | 			Min: min, | ||||||
|  | 			Max: max, | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// If the file based archive is disabled, | ||||||
|  | 	// only return the JobMeta structure as the | ||||||
|  | 	// statistics in there are needed. | ||||||
|  | 	if config.Keys.DisableArchive { | ||||||
|  | 		return jobMeta, nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData) | ||||||
|  | } | ||||||
| @@ -15,7 +15,7 @@ import ( | |||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/config" | 	"github.com/ClusterCockpit/cc-backend/internal/config" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/graph/generated" | 	"github.com/ClusterCockpit/cc-backend/internal/graph/generated" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/graph/model" | 	"github.com/ClusterCockpit/cc-backend/internal/graph/model" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/metricdata" | 	"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/repository" | 	"github.com/ClusterCockpit/cc-backend/internal/repository" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/archive" | 	"github.com/ClusterCockpit/cc-backend/pkg/archive" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/log" | 	"github.com/ClusterCockpit/cc-backend/pkg/log" | ||||||
| @@ -231,7 +231,7 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str | |||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	data, err := metricdata.LoadData(job, metrics, scopes, ctx) | 	data, err := metricDataDispatcher.LoadData(job, metrics, scopes, ctx) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Warn("Error while loading job data") | 		log.Warn("Error while loading job data") | ||||||
| 		return nil, err | 		return nil, err | ||||||
| @@ -383,7 +383,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes [ | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	data, err := metricdata.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) | 	data, err := metricDataDispatcher.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Warn("Error while loading node data") | 		log.Warn("Error while loading node data") | ||||||
| 		return nil, err | 		return nil, err | ||||||
| @@ -440,9 +440,11 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } | |||||||
| // SubCluster returns generated.SubClusterResolver implementation. | // SubCluster returns generated.SubClusterResolver implementation. | ||||||
| func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} } | func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} } | ||||||
|  |  | ||||||
| type clusterResolver struct{ *Resolver } | type ( | ||||||
| type jobResolver struct{ *Resolver } | 	clusterResolver     struct{ *Resolver } | ||||||
| type metricValueResolver struct{ *Resolver } | 	jobResolver         struct{ *Resolver } | ||||||
| type mutationResolver struct{ *Resolver } | 	metricValueResolver struct{ *Resolver } | ||||||
| type queryResolver struct{ *Resolver } | 	mutationResolver    struct{ *Resolver } | ||||||
| type subClusterResolver struct{ *Resolver } | 	queryResolver       struct{ *Resolver } | ||||||
|  | 	subClusterResolver  struct{ *Resolver } | ||||||
|  | ) | ||||||
|   | |||||||
| @@ -11,7 +11,7 @@ import ( | |||||||
|  |  | ||||||
| 	"github.com/99designs/gqlgen/graphql" | 	"github.com/99designs/gqlgen/graphql" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/graph/model" | 	"github.com/ClusterCockpit/cc-backend/internal/graph/model" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/metricdata" | 	"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/log" | 	"github.com/ClusterCockpit/cc-backend/pkg/log" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/schema" | 	"github.com/ClusterCockpit/cc-backend/pkg/schema" | ||||||
| 	// "github.com/ClusterCockpit/cc-backend/pkg/archive" | 	// "github.com/ClusterCockpit/cc-backend/pkg/archive" | ||||||
| @@ -24,8 +24,8 @@ func (r *queryResolver) rooflineHeatmap( | |||||||
| 	ctx context.Context, | 	ctx context.Context, | ||||||
| 	filter []*model.JobFilter, | 	filter []*model.JobFilter, | ||||||
| 	rows int, cols int, | 	rows int, cols int, | ||||||
| 	minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) { | 	minX float64, minY float64, maxX float64, maxY float64, | ||||||
|  | ) ([][]float64, error) { | ||||||
| 	jobs, err := r.Repo.QueryJobs(ctx, filter, &model.PageRequest{Page: 1, ItemsPerPage: MAX_JOBS_FOR_ANALYSIS + 1}, nil) | 	jobs, err := r.Repo.QueryJobs(ctx, filter, &model.PageRequest{Page: 1, ItemsPerPage: MAX_JOBS_FOR_ANALYSIS + 1}, nil) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Error("Error while querying jobs for roofline") | 		log.Error("Error while querying jobs for roofline") | ||||||
| @@ -47,7 +47,7 @@ func (r *queryResolver) rooflineHeatmap( | |||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx) | 		jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			log.Errorf("Error while loading roofline metrics for job %d", job.ID) | 			log.Errorf("Error while loading roofline metrics for job %d", job.ID) | ||||||
| 			return nil, err | 			return nil, err | ||||||
| @@ -120,7 +120,7 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF | |||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil { | 		if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil { | ||||||
| 			log.Error("Error while loading averages for footprint") | 			log.Error("Error while loading averages for footprint") | ||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
|   | |||||||
							
								
								
									
										231
									
								
								internal/metricDataDispatcher/dataLoader.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										231
									
								
								internal/metricDataDispatcher/dataLoader.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,231 @@ | |||||||
|  | // Copyright (C) NHR@FAU, University Erlangen-Nuremberg. | ||||||
|  | // All rights reserved. | ||||||
|  | // Use of this source code is governed by a MIT-style | ||||||
|  | // license that can be found in the LICENSE file. | ||||||
|  | package metricDataDispatcher | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"fmt" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/ClusterCockpit/cc-backend/internal/config" | ||||||
|  | 	"github.com/ClusterCockpit/cc-backend/internal/metricdata" | ||||||
|  | 	"github.com/ClusterCockpit/cc-backend/pkg/archive" | ||||||
|  | 	"github.com/ClusterCockpit/cc-backend/pkg/log" | ||||||
|  | 	"github.com/ClusterCockpit/cc-backend/pkg/lrucache" | ||||||
|  | 	"github.com/ClusterCockpit/cc-backend/pkg/schema" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) | ||||||
|  |  | ||||||
|  | func cacheKey( | ||||||
|  | 	job *schema.Job, | ||||||
|  | 	metrics []string, | ||||||
|  | 	scopes []schema.MetricScope, | ||||||
|  | ) string { | ||||||
|  | 	// Duration and StartTime do not need to be in the cache key as StartTime is less unique than | ||||||
|  | 	// job.ID and the TTL of the cache entry makes sure it does not stay there forever. | ||||||
|  | 	return fmt.Sprintf("%d(%s):[%v],[%v]", | ||||||
|  | 		job.ID, job.State, metrics, scopes) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Fetches the metric data for a job. | ||||||
|  | func LoadData(job *schema.Job, | ||||||
|  | 	metrics []string, | ||||||
|  | 	scopes []schema.MetricScope, | ||||||
|  | 	ctx context.Context, | ||||||
|  | ) (schema.JobData, error) { | ||||||
|  | 	data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) { | ||||||
|  | 		var jd schema.JobData | ||||||
|  | 		var err error | ||||||
|  |  | ||||||
|  | 		if job.State == schema.JobStateRunning || | ||||||
|  | 			job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || | ||||||
|  | 			config.Keys.DisableArchive { | ||||||
|  |  | ||||||
|  | 			repo, err := metricdata.GetMetricDataRepo(job.Cluster) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0 | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			if scopes == nil { | ||||||
|  | 				scopes = append(scopes, schema.MetricScopeNode) | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			if metrics == nil { | ||||||
|  | 				cluster := archive.GetCluster(job.Cluster) | ||||||
|  | 				for _, mc := range cluster.MetricConfig { | ||||||
|  | 					metrics = append(metrics, mc.Name) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			jd, err = repo.LoadData(job, metrics, scopes, ctx) | ||||||
|  | 			if err != nil { | ||||||
|  | 				if len(jd) != 0 { | ||||||
|  | 					log.Warnf("partial error: %s", err.Error()) | ||||||
|  | 					// return err, 0, 0 // Reactivating will block archiving on one partial error | ||||||
|  | 				} else { | ||||||
|  | 					log.Error("Error while loading job data from metric repository") | ||||||
|  | 					return err, 0, 0 | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 			size = jd.Size() | ||||||
|  | 		} else { | ||||||
|  | 			jd, err = archive.GetHandle().LoadJobData(job) | ||||||
|  | 			if err != nil { | ||||||
|  | 				log.Error("Error while loading job data from archive") | ||||||
|  | 				return err, 0, 0 | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			// Avoid sending unrequested data to the client: | ||||||
|  | 			if metrics != nil || scopes != nil { | ||||||
|  | 				if metrics == nil { | ||||||
|  | 					metrics = make([]string, 0, len(jd)) | ||||||
|  | 					for k := range jd { | ||||||
|  | 						metrics = append(metrics, k) | ||||||
|  | 					} | ||||||
|  | 				} | ||||||
|  |  | ||||||
|  | 				res := schema.JobData{} | ||||||
|  | 				for _, metric := range metrics { | ||||||
|  | 					if perscope, ok := jd[metric]; ok { | ||||||
|  | 						if len(perscope) > 1 { | ||||||
|  | 							subset := make(map[schema.MetricScope]*schema.JobMetric) | ||||||
|  | 							for _, scope := range scopes { | ||||||
|  | 								if jm, ok := perscope[scope]; ok { | ||||||
|  | 									subset[scope] = jm | ||||||
|  | 								} | ||||||
|  | 							} | ||||||
|  |  | ||||||
|  | 							if len(subset) > 0 { | ||||||
|  | 								perscope = subset | ||||||
|  | 							} | ||||||
|  | 						} | ||||||
|  |  | ||||||
|  | 						res[metric] = perscope | ||||||
|  | 					} | ||||||
|  | 				} | ||||||
|  | 				jd = res | ||||||
|  | 			} | ||||||
|  | 			size = jd.Size() | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		ttl = 5 * time.Hour | ||||||
|  | 		if job.State == schema.JobStateRunning { | ||||||
|  | 			ttl = 2 * time.Minute | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// FIXME: Review: Is this really necessary or correct. | ||||||
|  | 		// For /monitoring/job/<job> and some other places, flops_any and mem_bw need | ||||||
|  | 		// to be available at the scope 'node'. If a job has a lot of nodes, | ||||||
|  | 		// statisticsSeries should be available so that a min/median/max Graph can be | ||||||
|  | 		// used instead of a lot of single lines. | ||||||
|  | 		const maxSeriesSize int = 15 | ||||||
|  | 		for _, scopes := range jd { | ||||||
|  | 			for _, jm := range scopes { | ||||||
|  | 				if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize { | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  |  | ||||||
|  | 				jm.AddStatisticsSeries() | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		nodeScopeRequested := false | ||||||
|  | 		for _, scope := range scopes { | ||||||
|  | 			if scope == schema.MetricScopeNode { | ||||||
|  | 				nodeScopeRequested = true | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		if nodeScopeRequested { | ||||||
|  | 			jd.AddNodeScope("flops_any") | ||||||
|  | 			jd.AddNodeScope("mem_bw") | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		return jd, ttl, size | ||||||
|  | 	}) | ||||||
|  |  | ||||||
|  | 	if err, ok := data.(error); ok { | ||||||
|  | 		log.Error("Error in returned dataset") | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return data.(schema.JobData), nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize. | ||||||
|  | func LoadAverages( | ||||||
|  | 	job *schema.Job, | ||||||
|  | 	metrics []string, | ||||||
|  | 	data [][]schema.Float, | ||||||
|  | 	ctx context.Context, | ||||||
|  | ) error { | ||||||
|  | 	if job.State != schema.JobStateRunning && !config.Keys.DisableArchive { | ||||||
|  | 		return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here? | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	repo, err := metricdata.GetMetricDataRepo(job.Cluster) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	stats, err := repo.LoadStats(job, metrics, ctx) // #166 how to handle stats for acc normalizazion? | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project) | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for i, m := range metrics { | ||||||
|  | 		nodes, ok := stats[m] | ||||||
|  | 		if !ok { | ||||||
|  | 			data[i] = append(data[i], schema.NaN) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		sum := 0.0 | ||||||
|  | 		for _, node := range nodes { | ||||||
|  | 			sum += node.Avg | ||||||
|  | 		} | ||||||
|  | 		data[i] = append(data[i], schema.Float(sum)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Used for the node/system view. Returns a map of nodes to a map of metrics. | ||||||
|  | func LoadNodeData( | ||||||
|  | 	cluster string, | ||||||
|  | 	metrics, nodes []string, | ||||||
|  | 	scopes []schema.MetricScope, | ||||||
|  | 	from, to time.Time, | ||||||
|  | 	ctx context.Context, | ||||||
|  | ) (map[string]map[string][]*schema.JobMetric, error) { | ||||||
|  | 	repo, err := metricdata.GetMetricDataRepo(cluster) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if metrics == nil { | ||||||
|  | 		for _, m := range archive.GetCluster(cluster).MetricConfig { | ||||||
|  | 			metrics = append(metrics, m.Name) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) | ||||||
|  | 	if err != nil { | ||||||
|  | 		if len(data) != 0 { | ||||||
|  | 			log.Warnf("partial error: %s", err.Error()) | ||||||
|  | 		} else { | ||||||
|  | 			log.Error("Error while loading node data from metric repository") | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if data == nil { | ||||||
|  | 		return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return data, nil | ||||||
|  | } | ||||||
| @@ -8,13 +8,10 @@ import ( | |||||||
| 	"context" | 	"context" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"math" |  | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/config" | 	"github.com/ClusterCockpit/cc-backend/internal/config" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/archive" |  | ||||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/log" | 	"github.com/ClusterCockpit/cc-backend/pkg/log" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/lrucache" |  | ||||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/schema" | 	"github.com/ClusterCockpit/cc-backend/pkg/schema" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -35,10 +32,7 @@ type MetricDataRepository interface { | |||||||
|  |  | ||||||
| var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} | var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} | ||||||
|  |  | ||||||
| var useArchive bool | func Init() error { | ||||||
|  |  | ||||||
| func Init(disableArchive bool) error { |  | ||||||
| 	useArchive = !disableArchive |  | ||||||
| 	for _, cluster := range config.Keys.Clusters { | 	for _, cluster := range config.Keys.Clusters { | ||||||
| 		if cluster.MetricDataRepository != nil { | 		if cluster.MetricDataRepository != nil { | ||||||
| 			var kind struct { | 			var kind struct { | ||||||
| @@ -73,287 +67,13 @@ func Init(disableArchive bool) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) | func GetMetricDataRepo(cluster string) (MetricDataRepository, error) { | ||||||
|  | 	var err error | ||||||
| // Fetches the metric data for a job. |  | ||||||
| func LoadData(job *schema.Job, |  | ||||||
| 	metrics []string, |  | ||||||
| 	scopes []schema.MetricScope, |  | ||||||
| 	ctx context.Context, |  | ||||||
| ) (schema.JobData, error) { |  | ||||||
| 	data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) { |  | ||||||
| 		var jd schema.JobData |  | ||||||
| 		var err error |  | ||||||
|  |  | ||||||
| 		if job.State == schema.JobStateRunning || |  | ||||||
| 			job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving || |  | ||||||
| 			!useArchive { |  | ||||||
|  |  | ||||||
| 			repo, ok := metricDataRepos[job.Cluster] |  | ||||||
|  |  | ||||||
| 			if !ok { |  | ||||||
| 				return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0 |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			if scopes == nil { |  | ||||||
| 				scopes = append(scopes, schema.MetricScopeNode) |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			if metrics == nil { |  | ||||||
| 				cluster := archive.GetCluster(job.Cluster) |  | ||||||
| 				for _, mc := range cluster.MetricConfig { |  | ||||||
| 					metrics = append(metrics, mc.Name) |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			jd, err = repo.LoadData(job, metrics, scopes, ctx) |  | ||||||
| 			if err != nil { |  | ||||||
| 				if len(jd) != 0 { |  | ||||||
| 					log.Warnf("partial error: %s", err.Error()) |  | ||||||
| 					// return err, 0, 0 // Reactivating will block archiving on one partial error |  | ||||||
| 				} else { |  | ||||||
| 					log.Error("Error while loading job data from metric repository") |  | ||||||
| 					return err, 0, 0 |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 			size = jd.Size() |  | ||||||
| 		} else { |  | ||||||
| 			jd, err = archive.GetHandle().LoadJobData(job) |  | ||||||
| 			if err != nil { |  | ||||||
| 				log.Error("Error while loading job data from archive") |  | ||||||
| 				return err, 0, 0 |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			// Avoid sending unrequested data to the client: |  | ||||||
| 			if metrics != nil || scopes != nil { |  | ||||||
| 				if metrics == nil { |  | ||||||
| 					metrics = make([]string, 0, len(jd)) |  | ||||||
| 					for k := range jd { |  | ||||||
| 						metrics = append(metrics, k) |  | ||||||
| 					} |  | ||||||
| 				} |  | ||||||
|  |  | ||||||
| 				res := schema.JobData{} |  | ||||||
| 				for _, metric := range metrics { |  | ||||||
| 					if perscope, ok := jd[metric]; ok { |  | ||||||
| 						if len(perscope) > 1 { |  | ||||||
| 							subset := make(map[schema.MetricScope]*schema.JobMetric) |  | ||||||
| 							for _, scope := range scopes { |  | ||||||
| 								if jm, ok := perscope[scope]; ok { |  | ||||||
| 									subset[scope] = jm |  | ||||||
| 								} |  | ||||||
| 							} |  | ||||||
|  |  | ||||||
| 							if len(subset) > 0 { |  | ||||||
| 								perscope = subset |  | ||||||
| 							} |  | ||||||
| 						} |  | ||||||
|  |  | ||||||
| 						res[metric] = perscope |  | ||||||
| 					} |  | ||||||
| 				} |  | ||||||
| 				jd = res |  | ||||||
| 			} |  | ||||||
| 			size = jd.Size() |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		ttl = 5 * time.Hour |  | ||||||
| 		if job.State == schema.JobStateRunning { |  | ||||||
| 			ttl = 2 * time.Minute |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		prepareJobData(jd, scopes) |  | ||||||
|  |  | ||||||
| 		return jd, ttl, size |  | ||||||
| 	}) |  | ||||||
|  |  | ||||||
| 	if err, ok := data.(error); ok { |  | ||||||
| 		log.Error("Error in returned dataset") |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return data.(schema.JobData), nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize. |  | ||||||
| func LoadAverages( |  | ||||||
| 	job *schema.Job, |  | ||||||
| 	metrics []string, |  | ||||||
| 	data [][]schema.Float, |  | ||||||
| 	ctx context.Context, |  | ||||||
| ) error { |  | ||||||
| 	if job.State != schema.JobStateRunning && useArchive { |  | ||||||
| 		return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here? |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	repo, ok := metricDataRepos[job.Cluster] |  | ||||||
| 	if !ok { |  | ||||||
| 		return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	stats, err := repo.LoadStats(job, metrics, ctx) // #166 how to handle stats for acc normalizazion? |  | ||||||
| 	if err != nil { |  | ||||||
| 		log.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project) |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	for i, m := range metrics { |  | ||||||
| 		nodes, ok := stats[m] |  | ||||||
| 		if !ok { |  | ||||||
| 			data[i] = append(data[i], schema.NaN) |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		sum := 0.0 |  | ||||||
| 		for _, node := range nodes { |  | ||||||
| 			sum += node.Avg |  | ||||||
| 		} |  | ||||||
| 		data[i] = append(data[i], schema.Float(sum)) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Used for the node/system view. Returns a map of nodes to a map of metrics. |  | ||||||
| func LoadNodeData( |  | ||||||
| 	cluster string, |  | ||||||
| 	metrics, nodes []string, |  | ||||||
| 	scopes []schema.MetricScope, |  | ||||||
| 	from, to time.Time, |  | ||||||
| 	ctx context.Context, |  | ||||||
| ) (map[string]map[string][]*schema.JobMetric, error) { |  | ||||||
| 	repo, ok := metricDataRepos[cluster] | 	repo, ok := metricDataRepos[cluster] | ||||||
|  |  | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) | 		err = fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if metrics == nil { | 	return repo, err | ||||||
| 		for _, m := range archive.GetCluster(cluster).MetricConfig { |  | ||||||
| 			metrics = append(metrics, m.Name) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx) |  | ||||||
| 	if err != nil { |  | ||||||
| 		if len(data) != 0 { |  | ||||||
| 			log.Warnf("partial error: %s", err.Error()) |  | ||||||
| 		} else { |  | ||||||
| 			log.Error("Error while loading node data from metric repository") |  | ||||||
| 			return nil, err |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if data == nil { |  | ||||||
| 		return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return data, nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func cacheKey( |  | ||||||
| 	job *schema.Job, |  | ||||||
| 	metrics []string, |  | ||||||
| 	scopes []schema.MetricScope, |  | ||||||
| ) string { |  | ||||||
| 	// Duration and StartTime do not need to be in the cache key as StartTime is less unique than |  | ||||||
| 	// job.ID and the TTL of the cache entry makes sure it does not stay there forever. |  | ||||||
| 	return fmt.Sprintf("%d(%s):[%v],[%v]", |  | ||||||
| 		job.ID, job.State, metrics, scopes) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // For /monitoring/job/<job> and some other places, flops_any and mem_bw need |  | ||||||
| // to be available at the scope 'node'. If a job has a lot of nodes, |  | ||||||
| // statisticsSeries should be available so that a min/median/max Graph can be |  | ||||||
| // used instead of a lot of single lines. |  | ||||||
| func prepareJobData( |  | ||||||
| 	jobData schema.JobData, |  | ||||||
| 	scopes []schema.MetricScope, |  | ||||||
| ) { |  | ||||||
| 	const maxSeriesSize int = 15 |  | ||||||
| 	for _, scopes := range jobData { |  | ||||||
| 		for _, jm := range scopes { |  | ||||||
| 			if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize { |  | ||||||
| 				continue |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			jm.AddStatisticsSeries() |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	nodeScopeRequested := false |  | ||||||
| 	for _, scope := range scopes { |  | ||||||
| 		if scope == schema.MetricScopeNode { |  | ||||||
| 			nodeScopeRequested = true |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if nodeScopeRequested { |  | ||||||
| 		jobData.AddNodeScope("flops_any") |  | ||||||
| 		jobData.AddNodeScope("mem_bw") |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Writes a running job to the job-archive |  | ||||||
| func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { |  | ||||||
| 	allMetrics := make([]string, 0) |  | ||||||
| 	metricConfigs := archive.GetCluster(job.Cluster).MetricConfig |  | ||||||
| 	for _, mc := range metricConfigs { |  | ||||||
| 		allMetrics = append(allMetrics, mc.Name) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// TODO: Talk about this! What resolutions to store data at... |  | ||||||
| 	scopes := []schema.MetricScope{schema.MetricScopeNode} |  | ||||||
| 	if job.NumNodes <= 8 { |  | ||||||
| 		scopes = append(scopes, schema.MetricScopeCore) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if job.NumAcc > 0 { |  | ||||||
| 		scopes = append(scopes, schema.MetricScopeAccelerator) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	jobData, err := LoadData(job, allMetrics, scopes, ctx) |  | ||||||
| 	if err != nil { |  | ||||||
| 		log.Error("Error wile loading job data for archiving") |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	jobMeta := &schema.JobMeta{ |  | ||||||
| 		BaseJob:    job.BaseJob, |  | ||||||
| 		StartTime:  job.StartTime.Unix(), |  | ||||||
| 		Statistics: make(map[string]schema.JobStatistics), |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	for metric, data := range jobData { |  | ||||||
| 		avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 |  | ||||||
| 		nodeData, ok := data["node"] |  | ||||||
| 		if !ok { |  | ||||||
| 			// TODO/FIXME: Calc average for non-node metrics as well! |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		for _, series := range nodeData.Series { |  | ||||||
| 			avg += series.Statistics.Avg |  | ||||||
| 			min = math.Min(min, series.Statistics.Min) |  | ||||||
| 			max = math.Max(max, series.Statistics.Max) |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		jobMeta.Statistics[metric] = schema.JobStatistics{ |  | ||||||
| 			Unit: schema.Unit{ |  | ||||||
| 				Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix, |  | ||||||
| 				Base:   archive.GetMetricConfig(job.Cluster, metric).Unit.Base, |  | ||||||
| 			}, |  | ||||||
| 			Avg: avg / float64(job.NumNodes), |  | ||||||
| 			Min: min, |  | ||||||
| 			Max: max, |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// If the file based archive is disabled, |  | ||||||
| 	// only return the JobMeta structure as the |  | ||||||
| 	// statistics in there are needed. |  | ||||||
| 	if !useArchive { |  | ||||||
| 		return jobMeta, nil |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData) |  | ||||||
| } | } | ||||||
|   | |||||||
| @@ -9,7 +9,7 @@ import ( | |||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/metricdata" | 	"github.com/ClusterCockpit/cc-backend/internal/archiver" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/archive" | 	"github.com/ClusterCockpit/cc-backend/pkg/archive" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/log" | 	"github.com/ClusterCockpit/cc-backend/pkg/log" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/schema" | 	"github.com/ClusterCockpit/cc-backend/pkg/schema" | ||||||
| @@ -35,7 +35,7 @@ func (r *JobRepository) archivingWorker() { | |||||||
|  |  | ||||||
| 			// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend | 			// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend | ||||||
| 			// TODO: Maybe use context with cancel/timeout here | 			// TODO: Maybe use context with cancel/timeout here | ||||||
| 			jobMeta, err := metricdata.ArchiveJob(job, context.Background()) | 			jobMeta, err := archiver.ArchiveJob(job, context.Background()) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) | 				log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) | ||||||
| 				r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) | 				r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) | ||||||
|   | |||||||
| @@ -13,7 +13,7 @@ import ( | |||||||
|  |  | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/config" | 	"github.com/ClusterCockpit/cc-backend/internal/config" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/graph/model" | 	"github.com/ClusterCockpit/cc-backend/internal/graph/model" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/internal/metricdata" | 	"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/archive" | 	"github.com/ClusterCockpit/cc-backend/pkg/archive" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/log" | 	"github.com/ClusterCockpit/cc-backend/pkg/log" | ||||||
| 	"github.com/ClusterCockpit/cc-backend/pkg/schema" | 	"github.com/ClusterCockpit/cc-backend/pkg/schema" | ||||||
| @@ -691,7 +691,7 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram( | |||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil { | 		if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil { | ||||||
| 			log.Errorf("Error while loading averages for histogram: %s", err) | 			log.Errorf("Error while loading averages for histogram: %s", err) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user