From 129dd13fc8d1eec957f99f4d61d6a7a0e3b8048f Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 11 Apr 2023 16:26:09 +0200 Subject: [PATCH] Fix merge errors --- internal/graph/schema.resolvers.go | 4 ---- internal/metricdata/influxdb-v2.go | 18 +----------------- internal/metricdata/prometheus.go | 6 ++---- pkg/archive/fsBackend.go | 3 +-- pkg/schema/job.go | 14 -------------- tools/archive-manager/main.go | 2 +- 6 files changed, 5 insertions(+), 42 deletions(-) diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index e28a8d0..e3cbd08 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -194,10 +194,6 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str res := []*model.JobMetricWithName{} for name, md := range data { for scope, metric := range md { - if metric.Scope != schema.MetricScope(scope) { - log.Panic("metric.Scope != schema.MetricScope(scope) : Should not happen!") - } - res = append(res, &model.JobMetricWithName{ Name: name, Scope: scope, diff --git a/internal/metricdata/influxdb-v2.go b/internal/metricdata/influxdb-v2.go index d5d866c..8055baf 100644 --- a/internal/metricdata/influxdb-v2.go +++ b/internal/metricdata/influxdb-v2.go @@ -211,15 +211,10 @@ func (idb *InfluxDBv2DataRepository) LoadData( for _, scope := range scopes { if scope == "node" { // No 'socket/core' support yet for metric, nodes := range stats { - // log.Debugf("<< Add Stats for : Field %s >>", metric) for node, stats := range nodes { - // log.Debugf("<< Add Stats for : Host %s : Min %.2f, Max %.2f, Avg %.2f >>", node, stats.Min, stats.Max, stats.Avg ) for index, _ := range jobData[metric][scope].Series { - // log.Debugf("<< Try to add Stats to Series in Position %d >>", index) if jobData[metric][scope].Series[index].Hostname == node { - // log.Debugf("<< Match for Series in Position %d : Host %s >>", index, jobData[metric][scope].Series[index].Hostname) - jobData[metric][scope].Series[index].Statistics = &schema.MetricStatistics{Avg: stats.Avg, Min: stats.Min, Max: stats.Max} - // log.Debugf("<< Result Inner: Min %.2f, Max %.2f, Avg %.2f >>", jobData[metric][scope].Series[index].Statistics.Min, jobData[metric][scope].Series[index].Statistics.Max, jobData[metric][scope].Series[index].Statistics.Avg) + jobData[metric][scope].Series[index].Statistics = schema.MetricStatistics{Avg: stats.Avg, Min: stats.Min, Max: stats.Max} } } } @@ -227,17 +222,6 @@ func (idb *InfluxDBv2DataRepository) LoadData( } } - // DEBUG: - // for _, scope := range scopes { - // for _, met := range metrics { - // for _, series := range jobData[met][scope].Series { - // log.Debugf("<< Result: %d data points for metric %s on %s with scope %s, Stats: Min %.2f, Max %.2f, Avg %.2f >>", - // len(series.Data), met, series.Hostname, scope, - // series.Statistics.Min, series.Statistics.Max, series.Statistics.Avg) - // } - // } - // } - return jobData, nil } diff --git a/internal/metricdata/prometheus.go b/internal/metricdata/prometheus.go index 2c31943..d93a3c5 100644 --- a/internal/metricdata/prometheus.go +++ b/internal/metricdata/prometheus.go @@ -251,7 +251,7 @@ func (pdb *PrometheusDataRepository) RowToSeries( return schema.Series{ Hostname: hostname, Data: values, - Statistics: &schema.MetricStatistics{ + Statistics: schema.MetricStatistics{ Avg: mean, Min: min, Max: max, @@ -323,7 +323,6 @@ func (pdb *PrometheusDataRepository) LoadData( if !ok { jobMetric = &schema.JobMetric{ Unit: metricConfig.Unit, - Scope: scope, Timestep: metricConfig.Timestep, Series: make([]schema.Series, 0), } @@ -362,7 +361,7 @@ func (pdb *PrometheusDataRepository) LoadStats( for metric, metricData := range data { stats[metric] = make(map[string]schema.MetricStatistics) for _, series := range metricData[schema.MetricScopeNode].Series { - stats[metric][series.Hostname] = *series.Statistics + stats[metric][series.Hostname] = series.Statistics } } @@ -432,7 +431,6 @@ func (pdb *PrometheusDataRepository) LoadNodeData( // output per host and metric hostdata[metric] = append(hostdata[metric], &schema.JobMetric{ Unit: metricConfig.Unit, - Scope: scope, Timestep: metricConfig.Timestep, Series: []schema.Series{pdb.RowToSeries(from, step, steps, row)}, }, diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index e67cc6f..76e8d47 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -153,7 +153,6 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { if !checkFileExists(filename) { filename = getPath(job, fsa.path, "data.json") isCompressed = false - return nil, err } return loadJobData(filename, isCompressed) @@ -170,7 +169,6 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) { b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json")) if err != nil { log.Errorf("LoadClusterCfg() > open file error: %v", err) - return &schema.Cluster{}, err // if config.Keys.Validate { if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil { log.Warnf("Validate cluster config: %v\n", err) @@ -219,6 +217,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { for _, startTimeDir := range startTimeDirs { if startTimeDir.IsDir() { + job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json")) if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) { log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) } diff --git a/pkg/schema/job.go b/pkg/schema/job.go index c51b24e..ecb1a2b 100644 --- a/pkg/schema/job.go +++ b/pkg/schema/job.go @@ -58,15 +58,6 @@ type Job struct { NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` // NetDataVolTotal as Float64 FileBwAvg float64 `json:"-" db:"file_bw_avg"` // FileBwAvg as Float64 FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64 - StartTime time.Time `json:"startTime"` // Start time as 'time.Time' data type - MemUsedMax float64 `json:"-" db:"mem_used_max"` // MemUsedMax as Float64 - FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"` // FlopsAnyAvg as Float64 - MemBwAvg float64 `json:"-" db:"mem_bw_avg"` // MemBwAvg as Float64 - LoadAvg float64 `json:"-" db:"load_avg"` // LoadAvg as Float64 - NetBwAvg float64 `json:"-" db:"net_bw_avg"` // NetBwAvg as Float64 - NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` // NetDataVolTotal as Float64 - FileBwAvg float64 `json:"-" db:"file_bw_avg"` // FileBwAvg as Float64 - FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64 } // Non-Swaggered Comment: JobMeta @@ -84,7 +75,6 @@ type JobMeta struct { BaseJob StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` // Start epoch time stamp in seconds (Min > 0) Statistics map[string]JobStatistics `json:"statistics,omitempty"` // Metric statistics of job - Statistics map[string]JobStatistics `json:"statistics,omitempty"` // Metric statistics of job } const ( @@ -120,16 +110,12 @@ type Tag struct { // The unique DB identifier of a tag ID int64 `json:"id" db:"id"` Type string `json:"type" db:"tag_type" example:"Debug"` // Tag Type - Type string `json:"type" db:"tag_type" example:"Debug"` // Tag Type Name string `json:"name" db:"tag_name" example:"Testjob"` // Tag Name } // Resource model // @Description A resource used by a job type Resource struct { - Hostname string `json:"hostname"` // Name of the host (= node) - HWThreads []int `json:"hwthreads,omitempty"` // List of OS processor ids - Accelerators []string `json:"accelerators,omitempty"` // List of of accelerator device ids Hostname string `json:"hostname"` // Name of the host (= node) HWThreads []int `json:"hwthreads,omitempty"` // List of OS processor ids Accelerators []string `json:"accelerators,omitempty"` // List of of accelerator device ids diff --git a/tools/archive-manager/main.go b/tools/archive-manager/main.go index 087204c..03b4e3a 100644 --- a/tools/archive-manager/main.go +++ b/tools/archive-manager/main.go @@ -25,7 +25,7 @@ func main() { config.Init(flagConfigFile) config.Keys.Validate = true - if err := archive.Init(json.RawMessage(archiveCfg)); err != nil { + if err := archive.Init(json.RawMessage(archiveCfg), false); err != nil { log.Fatal(err) } ar := archive.GetHandle()