Fix merge errors

This commit is contained in:
Jan Eitzinger 2023-04-11 16:26:09 +02:00
parent f8ba79e9e7
commit 129dd13fc8
6 changed files with 5 additions and 42 deletions

View File

@ -194,10 +194,6 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str
res := []*model.JobMetricWithName{} res := []*model.JobMetricWithName{}
for name, md := range data { for name, md := range data {
for scope, metric := range md { for scope, metric := range md {
if metric.Scope != schema.MetricScope(scope) {
log.Panic("metric.Scope != schema.MetricScope(scope) : Should not happen!")
}
res = append(res, &model.JobMetricWithName{ res = append(res, &model.JobMetricWithName{
Name: name, Name: name,
Scope: scope, Scope: scope,

View File

@ -211,15 +211,10 @@ func (idb *InfluxDBv2DataRepository) LoadData(
for _, scope := range scopes { for _, scope := range scopes {
if scope == "node" { // No 'socket/core' support yet if scope == "node" { // No 'socket/core' support yet
for metric, nodes := range stats { for metric, nodes := range stats {
// log.Debugf("<< Add Stats for : Field %s >>", metric)
for node, stats := range nodes { for node, stats := range nodes {
// log.Debugf("<< Add Stats for : Host %s : Min %.2f, Max %.2f, Avg %.2f >>", node, stats.Min, stats.Max, stats.Avg )
for index, _ := range jobData[metric][scope].Series { for index, _ := range jobData[metric][scope].Series {
// log.Debugf("<< Try to add Stats to Series in Position %d >>", index)
if jobData[metric][scope].Series[index].Hostname == node { if jobData[metric][scope].Series[index].Hostname == node {
// log.Debugf("<< Match for Series in Position %d : Host %s >>", index, jobData[metric][scope].Series[index].Hostname) jobData[metric][scope].Series[index].Statistics = schema.MetricStatistics{Avg: stats.Avg, Min: stats.Min, Max: stats.Max}
jobData[metric][scope].Series[index].Statistics = &schema.MetricStatistics{Avg: stats.Avg, Min: stats.Min, Max: stats.Max}
// log.Debugf("<< Result Inner: Min %.2f, Max %.2f, Avg %.2f >>", jobData[metric][scope].Series[index].Statistics.Min, jobData[metric][scope].Series[index].Statistics.Max, jobData[metric][scope].Series[index].Statistics.Avg)
} }
} }
} }
@ -227,17 +222,6 @@ func (idb *InfluxDBv2DataRepository) LoadData(
} }
} }
// DEBUG:
// for _, scope := range scopes {
// for _, met := range metrics {
// for _, series := range jobData[met][scope].Series {
// log.Debugf("<< Result: %d data points for metric %s on %s with scope %s, Stats: Min %.2f, Max %.2f, Avg %.2f >>",
// len(series.Data), met, series.Hostname, scope,
// series.Statistics.Min, series.Statistics.Max, series.Statistics.Avg)
// }
// }
// }
return jobData, nil return jobData, nil
} }

View File

@ -251,7 +251,7 @@ func (pdb *PrometheusDataRepository) RowToSeries(
return schema.Series{ return schema.Series{
Hostname: hostname, Hostname: hostname,
Data: values, Data: values,
Statistics: &schema.MetricStatistics{ Statistics: schema.MetricStatistics{
Avg: mean, Avg: mean,
Min: min, Min: min,
Max: max, Max: max,
@ -323,7 +323,6 @@ func (pdb *PrometheusDataRepository) LoadData(
if !ok { if !ok {
jobMetric = &schema.JobMetric{ jobMetric = &schema.JobMetric{
Unit: metricConfig.Unit, Unit: metricConfig.Unit,
Scope: scope,
Timestep: metricConfig.Timestep, Timestep: metricConfig.Timestep,
Series: make([]schema.Series, 0), Series: make([]schema.Series, 0),
} }
@ -362,7 +361,7 @@ func (pdb *PrometheusDataRepository) LoadStats(
for metric, metricData := range data { for metric, metricData := range data {
stats[metric] = make(map[string]schema.MetricStatistics) stats[metric] = make(map[string]schema.MetricStatistics)
for _, series := range metricData[schema.MetricScopeNode].Series { for _, series := range metricData[schema.MetricScopeNode].Series {
stats[metric][series.Hostname] = *series.Statistics stats[metric][series.Hostname] = series.Statistics
} }
} }
@ -432,7 +431,6 @@ func (pdb *PrometheusDataRepository) LoadNodeData(
// output per host and metric // output per host and metric
hostdata[metric] = append(hostdata[metric], &schema.JobMetric{ hostdata[metric] = append(hostdata[metric], &schema.JobMetric{
Unit: metricConfig.Unit, Unit: metricConfig.Unit,
Scope: scope,
Timestep: metricConfig.Timestep, Timestep: metricConfig.Timestep,
Series: []schema.Series{pdb.RowToSeries(from, step, steps, row)}, Series: []schema.Series{pdb.RowToSeries(from, step, steps, row)},
}, },

View File

@ -153,7 +153,6 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
if !checkFileExists(filename) { if !checkFileExists(filename) {
filename = getPath(job, fsa.path, "data.json") filename = getPath(job, fsa.path, "data.json")
isCompressed = false isCompressed = false
return nil, err
} }
return loadJobData(filename, isCompressed) return loadJobData(filename, isCompressed)
@ -170,7 +169,6 @@ func (fsa *FsArchive) LoadClusterCfg(name string) (*schema.Cluster, error) {
b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json")) b, err := os.ReadFile(filepath.Join(fsa.path, name, "cluster.json"))
if err != nil { if err != nil {
log.Errorf("LoadClusterCfg() > open file error: %v", err) log.Errorf("LoadClusterCfg() > open file error: %v", err)
return &schema.Cluster{}, err
// if config.Keys.Validate { // if config.Keys.Validate {
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil { if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
log.Warnf("Validate cluster config: %v\n", err) log.Warnf("Validate cluster config: %v\n", err)
@ -219,6 +217,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
for _, startTimeDir := range startTimeDirs { for _, startTimeDir := range startTimeDirs {
if startTimeDir.IsDir() { if startTimeDir.IsDir() {
job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) { if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
} }

View File

@ -58,15 +58,6 @@ type Job struct {
NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` // NetDataVolTotal as Float64 NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` // NetDataVolTotal as Float64
FileBwAvg float64 `json:"-" db:"file_bw_avg"` // FileBwAvg as Float64 FileBwAvg float64 `json:"-" db:"file_bw_avg"` // FileBwAvg as Float64
FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64 FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64
StartTime time.Time `json:"startTime"` // Start time as 'time.Time' data type
MemUsedMax float64 `json:"-" db:"mem_used_max"` // MemUsedMax as Float64
FlopsAnyAvg float64 `json:"-" db:"flops_any_avg"` // FlopsAnyAvg as Float64
MemBwAvg float64 `json:"-" db:"mem_bw_avg"` // MemBwAvg as Float64
LoadAvg float64 `json:"-" db:"load_avg"` // LoadAvg as Float64
NetBwAvg float64 `json:"-" db:"net_bw_avg"` // NetBwAvg as Float64
NetDataVolTotal float64 `json:"-" db:"net_data_vol_total"` // NetDataVolTotal as Float64
FileBwAvg float64 `json:"-" db:"file_bw_avg"` // FileBwAvg as Float64
FileDataVolTotal float64 `json:"-" db:"file_data_vol_total"` // FileDataVolTotal as Float64
} }
// Non-Swaggered Comment: JobMeta // Non-Swaggered Comment: JobMeta
@ -84,7 +75,6 @@ type JobMeta struct {
BaseJob BaseJob
StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` // Start epoch time stamp in seconds (Min > 0) StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` // Start epoch time stamp in seconds (Min > 0)
Statistics map[string]JobStatistics `json:"statistics,omitempty"` // Metric statistics of job Statistics map[string]JobStatistics `json:"statistics,omitempty"` // Metric statistics of job
Statistics map[string]JobStatistics `json:"statistics,omitempty"` // Metric statistics of job
} }
const ( const (
@ -120,16 +110,12 @@ type Tag struct {
// The unique DB identifier of a tag // The unique DB identifier of a tag
ID int64 `json:"id" db:"id"` ID int64 `json:"id" db:"id"`
Type string `json:"type" db:"tag_type" example:"Debug"` // Tag Type Type string `json:"type" db:"tag_type" example:"Debug"` // Tag Type
Type string `json:"type" db:"tag_type" example:"Debug"` // Tag Type
Name string `json:"name" db:"tag_name" example:"Testjob"` // Tag Name Name string `json:"name" db:"tag_name" example:"Testjob"` // Tag Name
} }
// Resource model // Resource model
// @Description A resource used by a job // @Description A resource used by a job
type Resource struct { type Resource struct {
Hostname string `json:"hostname"` // Name of the host (= node)
HWThreads []int `json:"hwthreads,omitempty"` // List of OS processor ids
Accelerators []string `json:"accelerators,omitempty"` // List of of accelerator device ids
Hostname string `json:"hostname"` // Name of the host (= node) Hostname string `json:"hostname"` // Name of the host (= node)
HWThreads []int `json:"hwthreads,omitempty"` // List of OS processor ids HWThreads []int `json:"hwthreads,omitempty"` // List of OS processor ids
Accelerators []string `json:"accelerators,omitempty"` // List of of accelerator device ids Accelerators []string `json:"accelerators,omitempty"` // List of of accelerator device ids

View File

@ -25,7 +25,7 @@ func main() {
config.Init(flagConfigFile) config.Init(flagConfigFile)
config.Keys.Validate = true config.Keys.Validate = true
if err := archive.Init(json.RawMessage(archiveCfg)); err != nil { if err := archive.Init(json.RawMessage(archiveCfg), false); err != nil {
log.Fatal(err) log.Fatal(err)
} }
ar := archive.GetHandle() ar := archive.GetHandle()