mirror of
				https://github.com/ClusterCockpit/cc-backend
				synced 2025-11-03 17:15:06 +01:00 
			
		
		
		
	Merge pull request #365 from ClusterCockpit/split_statsTable_query
Split StatsTable DataQuery from JobMetrics Query In Job-View
This commit is contained in:
		@@ -137,11 +137,6 @@ type JobMetricWithName {
 | 
			
		||||
  metric: JobMetric!
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type JobMetricStatWithName {
 | 
			
		||||
  name:   String!
 | 
			
		||||
  stats:  MetricStatistics!
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type JobMetric {
 | 
			
		||||
  unit:             Unit
 | 
			
		||||
  timestep:         Int!
 | 
			
		||||
@@ -156,6 +151,30 @@ type Series {
 | 
			
		||||
  data:       [NullableFloat!]!
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type StatsSeries {
 | 
			
		||||
  mean:   [NullableFloat!]!
 | 
			
		||||
  median: [NullableFloat!]!
 | 
			
		||||
  min:    [NullableFloat!]!
 | 
			
		||||
  max:    [NullableFloat!]!
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type JobStatsWithScope {
 | 
			
		||||
  name:   String!
 | 
			
		||||
  scope:  MetricScope!
 | 
			
		||||
  stats:  [ScopedStats!]!
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type ScopedStats {
 | 
			
		||||
  hostname:   String!
 | 
			
		||||
  id:         String
 | 
			
		||||
  data:       MetricStatistics!
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type JobStats {
 | 
			
		||||
  name:   String!
 | 
			
		||||
  stats:  MetricStatistics!
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Unit {
 | 
			
		||||
  base: String!
 | 
			
		||||
  prefix: String
 | 
			
		||||
@@ -167,13 +186,6 @@ type MetricStatistics {
 | 
			
		||||
  max: Float!
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type StatsSeries {
 | 
			
		||||
  mean:   [NullableFloat!]!
 | 
			
		||||
  median: [NullableFloat!]!
 | 
			
		||||
  min:    [NullableFloat!]!
 | 
			
		||||
  max:    [NullableFloat!]!
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type MetricFootprints {
 | 
			
		||||
  metric: String!
 | 
			
		||||
  data:   [NullableFloat!]!
 | 
			
		||||
@@ -247,7 +259,8 @@ type Query {
 | 
			
		||||
 | 
			
		||||
  job(id: ID!): Job
 | 
			
		||||
  jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!], resolution: Int): [JobMetricWithName!]!
 | 
			
		||||
  jobMetricStats(id: ID!, metrics: [String!]): [JobMetricStatWithName!]!
 | 
			
		||||
  jobStats(id: ID!, metrics: [String!]): [JobStats!]!
 | 
			
		||||
  scopedJobStats(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobStatsWithScope!]!
 | 
			
		||||
  jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints
 | 
			
		||||
 | 
			
		||||
  jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList!
 | 
			
		||||
 
 | 
			
		||||
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							@@ -81,11 +81,6 @@ type JobLinkResultList struct {
 | 
			
		||||
	Count     *int       `json:"count,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type JobMetricStatWithName struct {
 | 
			
		||||
	Name  string                   `json:"name"`
 | 
			
		||||
	Stats *schema.MetricStatistics `json:"stats"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type JobMetricWithName struct {
 | 
			
		||||
	Name   string             `json:"name"`
 | 
			
		||||
	Scope  schema.MetricScope `json:"scope"`
 | 
			
		||||
@@ -100,6 +95,17 @@ type JobResultList struct {
 | 
			
		||||
	HasNextPage *bool         `json:"hasNextPage,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type JobStats struct {
 | 
			
		||||
	Name  string                   `json:"name"`
 | 
			
		||||
	Stats *schema.MetricStatistics `json:"stats"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type JobStatsWithScope struct {
 | 
			
		||||
	Name  string             `json:"name"`
 | 
			
		||||
	Scope schema.MetricScope `json:"scope"`
 | 
			
		||||
	Stats []*ScopedStats     `json:"stats"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type JobsStatistics struct {
 | 
			
		||||
	ID             string               `json:"id"`
 | 
			
		||||
	Name           string               `json:"name"`
 | 
			
		||||
@@ -173,6 +179,12 @@ type PageRequest struct {
 | 
			
		||||
	Page         int `json:"page"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type ScopedStats struct {
 | 
			
		||||
	Hostname string                   `json:"hostname"`
 | 
			
		||||
	ID       *string                  `json:"id,omitempty"`
 | 
			
		||||
	Data     *schema.MetricStatistics `json:"data"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type StringInput struct {
 | 
			
		||||
	Eq         *string  `json:"eq,omitempty"`
 | 
			
		||||
	Neq        *string  `json:"neq,omitempty"`
 | 
			
		||||
 
 | 
			
		||||
@@ -301,24 +301,23 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str
 | 
			
		||||
	return res, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// JobMetricStats is the resolver for the jobMetricStats field.
 | 
			
		||||
func (r *queryResolver) JobMetricStats(ctx context.Context, id string, metrics []string) ([]*model.JobMetricStatWithName, error) {
 | 
			
		||||
 | 
			
		||||
// JobStats is the resolver for the jobStats field.
 | 
			
		||||
func (r *queryResolver) JobStats(ctx context.Context, id string, metrics []string) ([]*model.JobStats, error) {
 | 
			
		||||
	job, err := r.Query().Job(ctx, id)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("Error while querying job for metrics")
 | 
			
		||||
		log.Warnf("Error while querying job %s for metadata", id)
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	data, err := metricDataDispatcher.LoadStatData(job, metrics, ctx)
 | 
			
		||||
	data, err := metricDataDispatcher.LoadJobStats(job, metrics, ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("Error while loading job stat data")
 | 
			
		||||
		log.Warnf("Error while loading jobStats data for job id %s", id)
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	res := []*model.JobMetricStatWithName{}
 | 
			
		||||
	res := []*model.JobStats{}
 | 
			
		||||
	for name, md := range data {
 | 
			
		||||
		res = append(res, &model.JobMetricStatWithName{
 | 
			
		||||
		res = append(res, &model.JobStats{
 | 
			
		||||
			Name:  name,
 | 
			
		||||
			Stats: &md,
 | 
			
		||||
		})
 | 
			
		||||
@@ -327,6 +326,44 @@ func (r *queryResolver) JobMetricStats(ctx context.Context, id string, metrics [
 | 
			
		||||
	return res, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ScopedJobStats is the resolver for the scopedJobStats field.
 | 
			
		||||
func (r *queryResolver) ScopedJobStats(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobStatsWithScope, error) {
 | 
			
		||||
	job, err := r.Query().Job(ctx, id)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warnf("Error while querying job %s for metadata", id)
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	data, err := metricDataDispatcher.LoadScopedJobStats(job, metrics, scopes, ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warnf("Error while loading scopedJobStats data for job id %s", id)
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	res := make([]*model.JobStatsWithScope, 0)
 | 
			
		||||
	for name, scoped := range data {
 | 
			
		||||
		for scope, stats := range scoped {
 | 
			
		||||
 | 
			
		||||
			mdlStats := make([]*model.ScopedStats, 0)
 | 
			
		||||
			for _, stat := range stats {
 | 
			
		||||
				mdlStats = append(mdlStats, &model.ScopedStats{
 | 
			
		||||
					Hostname: stat.Hostname,
 | 
			
		||||
					ID:       stat.Id,
 | 
			
		||||
					Data:     stat.Data,
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			res = append(res, &model.JobStatsWithScope{
 | 
			
		||||
				Name:  name,
 | 
			
		||||
				Scope: scope,
 | 
			
		||||
				Stats: mdlStats,
 | 
			
		||||
			})
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return res, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// JobsFootprints is the resolver for the jobsFootprints field.
 | 
			
		||||
func (r *queryResolver) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) {
 | 
			
		||||
	// NOTE: Legacy Naming! This resolver is for normalized histograms in analysis view only - *Not* related to DB "footprint" column!
 | 
			
		||||
 
 | 
			
		||||
@@ -224,8 +224,34 @@ func LoadAverages(
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Used for polar plots in frontend
 | 
			
		||||
func LoadStatData(
 | 
			
		||||
// Used for statsTable in frontend: Return scoped statistics by metric.
 | 
			
		||||
func LoadScopedJobStats(
 | 
			
		||||
	job *schema.Job,
 | 
			
		||||
	metrics []string,
 | 
			
		||||
	scopes []schema.MetricScope,
 | 
			
		||||
	ctx context.Context,
 | 
			
		||||
) (schema.ScopedJobStats, error) {
 | 
			
		||||
 | 
			
		||||
	if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
 | 
			
		||||
		return archive.LoadScopedStatsFromArchive(job, metrics, scopes)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	repo, err := metricdata.GetMetricDataRepo(job.Cluster)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("job %d: no metric data repository configured for '%s'", job.JobID, job.Cluster)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	scopedStats, err := repo.LoadScopedStats(job, metrics, scopes, ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Errorf("error while loading scoped statistics for job %d (User %s, Project %s)", job.JobID, job.User, job.Project)
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return scopedStats, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Used for polar plots in frontend: Aggregates statistics for all nodes to single values for job per metric.
 | 
			
		||||
func LoadJobStats(
 | 
			
		||||
	job *schema.Job,
 | 
			
		||||
	metrics []string,
 | 
			
		||||
	ctx context.Context,
 | 
			
		||||
@@ -237,12 +263,12 @@ func LoadStatData(
 | 
			
		||||
	data := make(map[string]schema.MetricStatistics, len(metrics))
 | 
			
		||||
	repo, err := metricdata.GetMetricDataRepo(job.Cluster)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return data, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster)
 | 
			
		||||
		return data, fmt.Errorf("job %d: no metric data repository configured for '%s'", job.JobID, job.Cluster)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	stats, err := repo.LoadStats(job, metrics, ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project)
 | 
			
		||||
		log.Errorf("error while loading statistics for job %d (User %s, Project %s)", job.JobID, job.User, job.Project)
 | 
			
		||||
		return data, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -129,13 +129,13 @@ func (ccms *CCMetricStore) doRequest(
 | 
			
		||||
) (*ApiQueryResponse, error) {
 | 
			
		||||
	buf := &bytes.Buffer{}
 | 
			
		||||
	if err := json.NewEncoder(buf).Encode(body); err != nil {
 | 
			
		||||
		log.Warn("Error while encoding request body")
 | 
			
		||||
		log.Errorf("Error while encoding request body: %s", err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, ccms.queryEndpoint, buf)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("Error while building request body")
 | 
			
		||||
		log.Errorf("Error while building request body: %s", err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	if ccms.jwt != "" {
 | 
			
		||||
@@ -151,7 +151,7 @@ func (ccms *CCMetricStore) doRequest(
 | 
			
		||||
 | 
			
		||||
	res, err := ccms.client.Do(req)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Error("Error while performing request")
 | 
			
		||||
		log.Errorf("Error while performing request: %s", err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -161,7 +161,7 @@ func (ccms *CCMetricStore) doRequest(
 | 
			
		||||
 | 
			
		||||
	var resBody ApiQueryResponse
 | 
			
		||||
	if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&resBody); err != nil {
 | 
			
		||||
		log.Warn("Error while decoding result body")
 | 
			
		||||
		log.Errorf("Error while decoding result body: %s", err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -177,7 +177,7 @@ func (ccms *CCMetricStore) LoadData(
 | 
			
		||||
) (schema.JobData, error) {
 | 
			
		||||
	queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, resolution)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("Error while building queries")
 | 
			
		||||
		log.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -192,7 +192,7 @@ func (ccms *CCMetricStore) LoadData(
 | 
			
		||||
 | 
			
		||||
	resBody, err := ccms.doRequest(ctx, &req)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Error("Error while performing request")
 | 
			
		||||
		log.Errorf("Error while performing request: %s", err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -557,16 +557,9 @@ func (ccms *CCMetricStore) LoadStats(
 | 
			
		||||
	ctx context.Context,
 | 
			
		||||
) (map[string]map[string]schema.MetricStatistics, error) {
 | 
			
		||||
 | 
			
		||||
	// metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
 | 
			
		||||
	// resolution := 9000
 | 
			
		||||
 | 
			
		||||
	// for _, mc := range metricConfigs {
 | 
			
		||||
	// 	resolution = min(resolution, mc.Timestep)
 | 
			
		||||
	// }
 | 
			
		||||
 | 
			
		||||
	queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization?
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("Error while building query")
 | 
			
		||||
		log.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -581,7 +574,7 @@ func (ccms *CCMetricStore) LoadStats(
 | 
			
		||||
 | 
			
		||||
	resBody, err := ccms.doRequest(ctx, &req)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Error("Error while performing request")
 | 
			
		||||
		log.Errorf("Error while performing request: %s", err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -591,9 +584,8 @@ func (ccms *CCMetricStore) LoadStats(
 | 
			
		||||
		metric := ccms.toLocalName(query.Metric)
 | 
			
		||||
		data := res[0]
 | 
			
		||||
		if data.Error != nil {
 | 
			
		||||
			log.Infof("fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error)
 | 
			
		||||
			log.Errorf("fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error)
 | 
			
		||||
			continue
 | 
			
		||||
			// return nil, fmt.Errorf("METRICDATA/CCMS > fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		metricdata, ok := stats[metric]
 | 
			
		||||
@@ -603,9 +595,8 @@ func (ccms *CCMetricStore) LoadStats(
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if data.Avg.IsNaN() || data.Min.IsNaN() || data.Max.IsNaN() {
 | 
			
		||||
			log.Infof("fetching %s for node %s failed: one of avg/min/max is NaN", metric, query.Hostname)
 | 
			
		||||
			log.Warnf("fetching %s for node %s failed: one of avg/min/max is NaN", metric, query.Hostname)
 | 
			
		||||
			continue
 | 
			
		||||
			// return nil, fmt.Errorf("METRICDATA/CCMS > fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN")
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		metricdata[query.Hostname] = schema.MetricStatistics{
 | 
			
		||||
@@ -618,7 +609,98 @@ func (ccms *CCMetricStore) LoadStats(
 | 
			
		||||
	return stats, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TODO: Support sub-node-scope metrics! For this, the partition of a node needs to be known!
 | 
			
		||||
// Used for Job-View Statistics Table
 | 
			
		||||
func (ccms *CCMetricStore) LoadScopedStats(
 | 
			
		||||
	job *schema.Job,
 | 
			
		||||
	metrics []string,
 | 
			
		||||
	scopes []schema.MetricScope,
 | 
			
		||||
	ctx context.Context,
 | 
			
		||||
) (schema.ScopedJobStats, error) {
 | 
			
		||||
	queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, 0)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	req := ApiQueryRequest{
 | 
			
		||||
		Cluster:   job.Cluster,
 | 
			
		||||
		From:      job.StartTime.Unix(),
 | 
			
		||||
		To:        job.StartTime.Add(time.Duration(job.Duration) * time.Second).Unix(),
 | 
			
		||||
		Queries:   queries,
 | 
			
		||||
		WithStats: true,
 | 
			
		||||
		WithData:  false,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	resBody, err := ccms.doRequest(ctx, &req)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Errorf("Error while performing request: %s", err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var errors []string
 | 
			
		||||
	scopedJobStats := make(schema.ScopedJobStats)
 | 
			
		||||
 | 
			
		||||
	for i, row := range resBody.Results {
 | 
			
		||||
		query := req.Queries[i]
 | 
			
		||||
		metric := ccms.toLocalName(query.Metric)
 | 
			
		||||
		scope := assignedScope[i]
 | 
			
		||||
 | 
			
		||||
		if _, ok := scopedJobStats[metric]; !ok {
 | 
			
		||||
			scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if _, ok := scopedJobStats[metric][scope]; !ok {
 | 
			
		||||
			scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for ndx, res := range row {
 | 
			
		||||
			if res.Error != nil {
 | 
			
		||||
				/* Build list for "partial errors", if any */
 | 
			
		||||
				errors = append(errors, fmt.Sprintf("failed to fetch '%s' from host '%s': %s", query.Metric, query.Hostname, *res.Error))
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			id := (*string)(nil)
 | 
			
		||||
			if query.Type != nil {
 | 
			
		||||
				id = new(string)
 | 
			
		||||
				*id = query.TypeIds[ndx]
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() {
 | 
			
		||||
				// "schema.Float()" because regular float64 can not be JSONed when NaN.
 | 
			
		||||
				res.Avg = schema.Float(0)
 | 
			
		||||
				res.Min = schema.Float(0)
 | 
			
		||||
				res.Max = schema.Float(0)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{
 | 
			
		||||
				Hostname: query.Hostname,
 | 
			
		||||
				Id:       id,
 | 
			
		||||
				Data: &schema.MetricStatistics{
 | 
			
		||||
					Avg: float64(res.Avg),
 | 
			
		||||
					Min: float64(res.Min),
 | 
			
		||||
					Max: float64(res.Max),
 | 
			
		||||
				},
 | 
			
		||||
			})
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// So that one can later check len(scopedJobStats[metric][scope]): Remove from map if empty
 | 
			
		||||
		if len(scopedJobStats[metric][scope]) == 0 {
 | 
			
		||||
			delete(scopedJobStats[metric], scope)
 | 
			
		||||
			if len(scopedJobStats[metric]) == 0 {
 | 
			
		||||
				delete(scopedJobStats, metric)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(errors) != 0 {
 | 
			
		||||
		/* Returns list for "partial errors" */
 | 
			
		||||
		return scopedJobStats, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
 | 
			
		||||
	}
 | 
			
		||||
	return scopedJobStats, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Used for Systems-View Node-Overview
 | 
			
		||||
func (ccms *CCMetricStore) LoadNodeData(
 | 
			
		||||
	cluster string,
 | 
			
		||||
	metrics, nodes []string,
 | 
			
		||||
@@ -652,7 +734,7 @@ func (ccms *CCMetricStore) LoadNodeData(
 | 
			
		||||
 | 
			
		||||
	resBody, err := ccms.doRequest(ctx, &req)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Error(fmt.Sprintf("Error while performing request %#v\n", err))
 | 
			
		||||
		log.Errorf("Error while performing request: %s", err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -710,6 +792,7 @@ func (ccms *CCMetricStore) LoadNodeData(
 | 
			
		||||
	return data, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Used for Systems-View Node-List
 | 
			
		||||
func (ccms *CCMetricStore) LoadNodeListData(
 | 
			
		||||
	cluster, subCluster, nodeFilter string,
 | 
			
		||||
	metrics []string,
 | 
			
		||||
@@ -768,7 +851,7 @@ func (ccms *CCMetricStore) LoadNodeListData(
 | 
			
		||||
 | 
			
		||||
	queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, resolution)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("Error while building queries")
 | 
			
		||||
		log.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error())
 | 
			
		||||
		return nil, totalNodes, hasNextPage, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -783,7 +866,7 @@ func (ccms *CCMetricStore) LoadNodeListData(
 | 
			
		||||
 | 
			
		||||
	resBody, err := ccms.doRequest(ctx, &req)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Error(fmt.Sprintf("Error while performing request %#v\n", err))
 | 
			
		||||
		log.Errorf("Error while performing request: %s", err.Error())
 | 
			
		||||
		return nil, totalNodes, hasNextPage, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -888,7 +971,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
 | 
			
		||||
	if subCluster != "" {
 | 
			
		||||
		subClusterTopol, scterr = archive.GetSubCluster(cluster, subCluster)
 | 
			
		||||
		if scterr != nil {
 | 
			
		||||
			// TODO: Log
 | 
			
		||||
			log.Errorf("could not load cluster %s subCluster %s topology: %s", cluster, subCluster, scterr.Error())
 | 
			
		||||
			return nil, nil, scterr
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@@ -898,7 +981,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
 | 
			
		||||
		mc := archive.GetMetricConfig(cluster, metric)
 | 
			
		||||
		if mc == nil {
 | 
			
		||||
			// return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, cluster)
 | 
			
		||||
			log.Infof("metric '%s' is not specified for cluster '%s'", metric, cluster)
 | 
			
		||||
			log.Warnf("metric '%s' is not specified for cluster '%s'", metric, cluster)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -10,6 +10,8 @@ import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"math"
 | 
			
		||||
	"sort"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
@@ -64,6 +66,8 @@ func (idb *InfluxDBv2DataRepository) LoadData(
 | 
			
		||||
	ctx context.Context,
 | 
			
		||||
	resolution int) (schema.JobData, error) {
 | 
			
		||||
 | 
			
		||||
	log.Infof("InfluxDB 2 Backend: Resolution Scaling not Implemented, will return default timestep. Requested Resolution %d", resolution)
 | 
			
		||||
 | 
			
		||||
	measurementsConds := make([]string, 0, len(metrics))
 | 
			
		||||
	for _, m := range metrics {
 | 
			
		||||
		measurementsConds = append(measurementsConds, fmt.Sprintf(`r["_measurement"] == "%s"`, m))
 | 
			
		||||
@@ -86,7 +90,7 @@ func (idb *InfluxDBv2DataRepository) LoadData(
 | 
			
		||||
		query := ""
 | 
			
		||||
		switch scope {
 | 
			
		||||
		case "node":
 | 
			
		||||
			// Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean for 60s windows
 | 
			
		||||
			// Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean for 60s windows <-- Resolution could be added here?
 | 
			
		||||
			// log.Info("Scope 'node' requested. ")
 | 
			
		||||
			query = fmt.Sprintf(`
 | 
			
		||||
								from(bucket: "%s")
 | 
			
		||||
@@ -116,6 +120,12 @@ func (idb *InfluxDBv2DataRepository) LoadData(
 | 
			
		||||
			//  	idb.bucket,
 | 
			
		||||
			//  	idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )),
 | 
			
		||||
			//  	measurementsCond, hostsCond)
 | 
			
		||||
		case "hwthread":
 | 
			
		||||
			log.Info(" Scope 'hwthread' requested, but not yet supported: Will return 'node' scope only. ")
 | 
			
		||||
			continue
 | 
			
		||||
		case "accelerator":
 | 
			
		||||
			log.Info(" Scope 'accelerator' requested, but not yet supported: Will return 'node' scope only. ")
 | 
			
		||||
			continue
 | 
			
		||||
		default:
 | 
			
		||||
			log.Infof("Unknown scope '%s' requested: Will return 'node' scope.", scope)
 | 
			
		||||
			continue
 | 
			
		||||
@@ -173,6 +183,11 @@ func (idb *InfluxDBv2DataRepository) LoadData(
 | 
			
		||||
			}
 | 
			
		||||
		case "socket":
 | 
			
		||||
			continue
 | 
			
		||||
		case "accelerator":
 | 
			
		||||
			continue
 | 
			
		||||
		case "hwthread":
 | 
			
		||||
			// See below @ core
 | 
			
		||||
			continue
 | 
			
		||||
		case "core":
 | 
			
		||||
			continue
 | 
			
		||||
			// Include Series.Id in hostSeries
 | 
			
		||||
@@ -301,6 +316,53 @@ func (idb *InfluxDBv2DataRepository) LoadStats(
 | 
			
		||||
	return stats, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Used in Job-View StatsTable
 | 
			
		||||
// UNTESTED
 | 
			
		||||
func (idb *InfluxDBv2DataRepository) LoadScopedStats(
 | 
			
		||||
	job *schema.Job,
 | 
			
		||||
	metrics []string,
 | 
			
		||||
	scopes []schema.MetricScope,
 | 
			
		||||
	ctx context.Context) (schema.ScopedJobStats, error) {
 | 
			
		||||
 | 
			
		||||
	// Assumption: idb.loadData() only returns series node-scope - use node scope for statsTable
 | 
			
		||||
	scopedJobStats := make(schema.ScopedJobStats)
 | 
			
		||||
	data, err := idb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("Error while loading job for scopedJobStats")
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for metric, metricData := range data {
 | 
			
		||||
		for _, scope := range scopes {
 | 
			
		||||
			if scope != schema.MetricScopeNode {
 | 
			
		||||
				logOnce.Do(func() {
 | 
			
		||||
					log.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)
 | 
			
		||||
				})
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if _, ok := scopedJobStats[metric]; !ok {
 | 
			
		||||
				scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if _, ok := scopedJobStats[metric][scope]; !ok {
 | 
			
		||||
				scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			for _, series := range metricData[scope].Series {
 | 
			
		||||
				scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{
 | 
			
		||||
					Hostname: series.Hostname,
 | 
			
		||||
					Data:     &series.Statistics,
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return scopedJobStats, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Used in Systems-View @ Node-Overview
 | 
			
		||||
// UNTESTED
 | 
			
		||||
func (idb *InfluxDBv2DataRepository) LoadNodeData(
 | 
			
		||||
	cluster string,
 | 
			
		||||
	metrics, nodes []string,
 | 
			
		||||
@@ -308,12 +370,123 @@ func (idb *InfluxDBv2DataRepository) LoadNodeData(
 | 
			
		||||
	from, to time.Time,
 | 
			
		||||
	ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
 | 
			
		||||
 | 
			
		||||
	// TODO : Implement to be used in Analysis- und System/Node-View
 | 
			
		||||
	log.Infof("LoadNodeData unimplemented for InfluxDBv2DataRepository, Args: cluster %s, metrics %v, nodes %v, scopes %v", cluster, metrics, nodes, scopes)
 | 
			
		||||
	// Note: scopes[] Array will be ignored, only return node scope
 | 
			
		||||
 | 
			
		||||
	return nil, errors.New("METRICDATA/INFLUXV2 > unimplemented for InfluxDBv2DataRepository")
 | 
			
		||||
	// CONVERT ARGS TO INFLUX
 | 
			
		||||
	measurementsConds := make([]string, 0)
 | 
			
		||||
	for _, m := range metrics {
 | 
			
		||||
		measurementsConds = append(measurementsConds, fmt.Sprintf(`r["_measurement"] == "%s"`, m))
 | 
			
		||||
	}
 | 
			
		||||
	measurementsCond := strings.Join(measurementsConds, " or ")
 | 
			
		||||
 | 
			
		||||
	hostsConds := make([]string, 0)
 | 
			
		||||
	if nodes == nil {
 | 
			
		||||
		var allNodes []string
 | 
			
		||||
		subClusterNodeLists := archive.NodeLists[cluster]
 | 
			
		||||
		for _, nodeList := range subClusterNodeLists {
 | 
			
		||||
			allNodes = append(nodes, nodeList.PrintList()...)
 | 
			
		||||
		}
 | 
			
		||||
		for _, node := range allNodes {
 | 
			
		||||
			nodes = append(nodes, node)
 | 
			
		||||
			hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, node))
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		for _, node := range nodes {
 | 
			
		||||
			hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, node))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	hostsCond := strings.Join(hostsConds, " or ")
 | 
			
		||||
 | 
			
		||||
	// BUILD AND PERFORM QUERY
 | 
			
		||||
	query := fmt.Sprintf(`
 | 
			
		||||
						from(bucket: "%s")
 | 
			
		||||
						|> range(start: %s, stop: %s)
 | 
			
		||||
						|> filter(fn: (r) => (%s) and (%s) )
 | 
			
		||||
						|> drop(columns: ["_start", "_stop"])
 | 
			
		||||
						|> group(columns: ["hostname", "_measurement"])
 | 
			
		||||
			|> aggregateWindow(every: 60s, fn: mean)
 | 
			
		||||
						|> drop(columns: ["_time"])`,
 | 
			
		||||
		idb.bucket,
 | 
			
		||||
		idb.formatTime(from), idb.formatTime(to),
 | 
			
		||||
		measurementsCond, hostsCond)
 | 
			
		||||
 | 
			
		||||
	rows, err := idb.queryClient.Query(ctx, query)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Error("Error while performing query")
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// HANDLE QUERY RETURN
 | 
			
		||||
	// Collect Float Arrays for Node@Metric -> No Scope Handling!
 | 
			
		||||
	influxData := make(map[string]map[string][]schema.Float)
 | 
			
		||||
	for rows.Next() {
 | 
			
		||||
		row := rows.Record()
 | 
			
		||||
		host, field := row.ValueByKey("hostname").(string), row.Measurement()
 | 
			
		||||
 | 
			
		||||
		influxHostData, ok := influxData[host]
 | 
			
		||||
		if !ok {
 | 
			
		||||
			influxHostData = make(map[string][]schema.Float)
 | 
			
		||||
			influxData[host] = influxHostData
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		influxFieldData, ok := influxData[host][field]
 | 
			
		||||
		if !ok {
 | 
			
		||||
			influxFieldData = make([]schema.Float, 0)
 | 
			
		||||
			influxData[host][field] = influxFieldData
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		val, ok := row.Value().(float64)
 | 
			
		||||
		if ok {
 | 
			
		||||
			influxData[host][field] = append(influxData[host][field], schema.Float(val))
 | 
			
		||||
		} else {
 | 
			
		||||
			influxData[host][field] = append(influxData[host][field], schema.Float(0))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// BUILD FUNCTION RETURN
 | 
			
		||||
	data := make(map[string]map[string][]*schema.JobMetric)
 | 
			
		||||
	for node, metricData := range influxData {
 | 
			
		||||
 | 
			
		||||
		nodeData, ok := data[node]
 | 
			
		||||
		if !ok {
 | 
			
		||||
			nodeData = make(map[string][]*schema.JobMetric)
 | 
			
		||||
			data[node] = nodeData
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for metric, floatArray := range metricData {
 | 
			
		||||
			avg, min, max := 0.0, 0.0, 0.0
 | 
			
		||||
			for _, val := range floatArray {
 | 
			
		||||
				avg += float64(val)
 | 
			
		||||
				min = math.Min(min, float64(val))
 | 
			
		||||
				max = math.Max(max, float64(val))
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			stats := schema.MetricStatistics{
 | 
			
		||||
				Avg: (math.Round((avg/float64(len(floatArray)))*100) / 100),
 | 
			
		||||
				Min: (math.Round(min*100) / 100),
 | 
			
		||||
				Max: (math.Round(max*100) / 100),
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			mc := archive.GetMetricConfig(cluster, metric)
 | 
			
		||||
			nodeData[metric] = append(nodeData[metric], &schema.JobMetric{
 | 
			
		||||
				Unit:     mc.Unit,
 | 
			
		||||
				Timestep: mc.Timestep,
 | 
			
		||||
				Series: []schema.Series{
 | 
			
		||||
					{
 | 
			
		||||
						Hostname:   node,
 | 
			
		||||
						Statistics: stats,
 | 
			
		||||
						Data:       floatArray,
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			})
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return data, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Used in Systems-View @ Node-List
 | 
			
		||||
// UNTESTED
 | 
			
		||||
func (idb *InfluxDBv2DataRepository) LoadNodeListData(
 | 
			
		||||
	cluster, subCluster, nodeFilter string,
 | 
			
		||||
	metrics []string,
 | 
			
		||||
@@ -324,10 +497,79 @@ func (idb *InfluxDBv2DataRepository) LoadNodeListData(
 | 
			
		||||
	ctx context.Context,
 | 
			
		||||
) (map[string]schema.JobData, int, bool, error) {
 | 
			
		||||
 | 
			
		||||
	// Assumption: idb.loadData() only returns series node-scope - use node scope for NodeList
 | 
			
		||||
 | 
			
		||||
	// 0) Init additional vars
 | 
			
		||||
	var totalNodes int = 0
 | 
			
		||||
	var hasNextPage bool = false
 | 
			
		||||
	// TODO : Implement to be used in NodeList-View
 | 
			
		||||
	log.Infof("LoadNodeListData unimplemented for InfluxDBv2DataRepository, Args: cluster %s, metrics %v, nodeFilter %v, scopes %v", cluster, metrics, nodeFilter, scopes)
 | 
			
		||||
 | 
			
		||||
	return nil, totalNodes, hasNextPage, errors.New("METRICDATA/INFLUXV2 > unimplemented for InfluxDBv2DataRepository")
 | 
			
		||||
	// 1) Get list of all nodes
 | 
			
		||||
	var nodes []string
 | 
			
		||||
	if subCluster != "" {
 | 
			
		||||
		scNodes := archive.NodeLists[cluster][subCluster]
 | 
			
		||||
		nodes = scNodes.PrintList()
 | 
			
		||||
	} else {
 | 
			
		||||
		subClusterNodeLists := archive.NodeLists[cluster]
 | 
			
		||||
		for _, nodeList := range subClusterNodeLists {
 | 
			
		||||
			nodes = append(nodes, nodeList.PrintList()...)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 2) Filter nodes
 | 
			
		||||
	if nodeFilter != "" {
 | 
			
		||||
		filteredNodes := []string{}
 | 
			
		||||
		for _, node := range nodes {
 | 
			
		||||
			if strings.Contains(node, nodeFilter) {
 | 
			
		||||
				filteredNodes = append(filteredNodes, node)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		nodes = filteredNodes
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 2.1) Count total nodes && Sort nodes -> Sorting invalidated after return ...
 | 
			
		||||
	totalNodes = len(nodes)
 | 
			
		||||
	sort.Strings(nodes)
 | 
			
		||||
 | 
			
		||||
	// 3) Apply paging
 | 
			
		||||
	if len(nodes) > page.ItemsPerPage {
 | 
			
		||||
		start := (page.Page - 1) * page.ItemsPerPage
 | 
			
		||||
		end := start + page.ItemsPerPage
 | 
			
		||||
		if end > len(nodes) {
 | 
			
		||||
			end = len(nodes)
 | 
			
		||||
			hasNextPage = false
 | 
			
		||||
		} else {
 | 
			
		||||
			hasNextPage = true
 | 
			
		||||
		}
 | 
			
		||||
		nodes = nodes[start:end]
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 4) Fetch And Convert Data, use idb.LoadNodeData() for query
 | 
			
		||||
 | 
			
		||||
	rawNodeData, err := idb.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Error(fmt.Sprintf("Error while loading influx nodeData for nodeListData %#v\n", err))
 | 
			
		||||
		return nil, totalNodes, hasNextPage, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	data := make(map[string]schema.JobData)
 | 
			
		||||
	for node, nodeData := range rawNodeData {
 | 
			
		||||
		// Init Nested Map Data Structures If Not Found
 | 
			
		||||
		hostData, ok := data[node]
 | 
			
		||||
		if !ok {
 | 
			
		||||
			hostData = make(schema.JobData)
 | 
			
		||||
			data[node] = hostData
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for metric, nodeMetricData := range nodeData {
 | 
			
		||||
			metricData, ok := hostData[metric]
 | 
			
		||||
			if !ok {
 | 
			
		||||
				metricData = make(map[schema.MetricScope]*schema.JobMetric)
 | 
			
		||||
				data[node][metric] = metricData
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			data[node][metric][schema.MetricScopeNode] = nodeMetricData[0] // Only Node Scope Returned from loadNodeData
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return data, totalNodes, hasNextPage, nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -24,9 +24,12 @@ type MetricDataRepository interface {
 | 
			
		||||
	// Return the JobData for the given job, only with the requested metrics.
 | 
			
		||||
	LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error)
 | 
			
		||||
 | 
			
		||||
	// Return a map of metrics to a map of nodes to the metric statistics of the job. node scope assumed for now.
 | 
			
		||||
	// Return a map of metrics to a map of nodes to the metric statistics of the job. node scope only.
 | 
			
		||||
	LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error)
 | 
			
		||||
 | 
			
		||||
	// Return a map of metrics to a map of scopes to the scoped metric statistics of the job.
 | 
			
		||||
	LoadScopedStats(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.ScopedJobStats, error)
 | 
			
		||||
 | 
			
		||||
	// Return a map of hosts to a map of metrics at the requested scopes (currently only node) for that node.
 | 
			
		||||
	LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -448,6 +448,51 @@ func (pdb *PrometheusDataRepository) LoadNodeData(
 | 
			
		||||
	return data, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Implemented by NHR@FAU; Used in Job-View StatsTable
 | 
			
		||||
func (pdb *PrometheusDataRepository) LoadScopedStats(
 | 
			
		||||
	job *schema.Job,
 | 
			
		||||
	metrics []string,
 | 
			
		||||
	scopes []schema.MetricScope,
 | 
			
		||||
	ctx context.Context) (schema.ScopedJobStats, error) {
 | 
			
		||||
 | 
			
		||||
	// Assumption: pdb.loadData() only returns series node-scope - use node scope for statsTable
 | 
			
		||||
	scopedJobStats := make(schema.ScopedJobStats)
 | 
			
		||||
	data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("Error while loading job for scopedJobStats")
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for metric, metricData := range data {
 | 
			
		||||
		for _, scope := range scopes {
 | 
			
		||||
			if scope != schema.MetricScopeNode {
 | 
			
		||||
				logOnce.Do(func() {
 | 
			
		||||
					log.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)
 | 
			
		||||
				})
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if _, ok := scopedJobStats[metric]; !ok {
 | 
			
		||||
				scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if _, ok := scopedJobStats[metric][scope]; !ok {
 | 
			
		||||
				scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			for _, series := range metricData[scope].Series {
 | 
			
		||||
				scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{
 | 
			
		||||
					Hostname: series.Hostname,
 | 
			
		||||
					Data:     &series.Statistics,
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return scopedJobStats, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Implemented by NHR@FAU; Used in NodeList-View
 | 
			
		||||
func (pdb *PrometheusDataRepository) LoadNodeListData(
 | 
			
		||||
	cluster, subCluster, nodeFilter string,
 | 
			
		||||
	metrics []string,
 | 
			
		||||
@@ -458,10 +503,132 @@ func (pdb *PrometheusDataRepository) LoadNodeListData(
 | 
			
		||||
	ctx context.Context,
 | 
			
		||||
) (map[string]schema.JobData, int, bool, error) {
 | 
			
		||||
 | 
			
		||||
	// Assumption: pdb.loadData() only returns series node-scope - use node scope for NodeList
 | 
			
		||||
 | 
			
		||||
	// 0) Init additional vars
 | 
			
		||||
	var totalNodes int = 0
 | 
			
		||||
	var hasNextPage bool = false
 | 
			
		||||
	// TODO : Implement to be used in NodeList-View
 | 
			
		||||
	log.Infof("LoadNodeListData unimplemented for PrometheusDataRepository, Args: cluster %s, metrics %v, nodeFilter %v, scopes %v", cluster, metrics, nodeFilter, scopes)
 | 
			
		||||
 | 
			
		||||
	return nil, totalNodes, hasNextPage, errors.New("METRICDATA/INFLUXV2 > unimplemented for PrometheusDataRepository")
 | 
			
		||||
	// 1) Get list of all nodes
 | 
			
		||||
	var nodes []string
 | 
			
		||||
	if subCluster != "" {
 | 
			
		||||
		scNodes := archive.NodeLists[cluster][subCluster]
 | 
			
		||||
		nodes = scNodes.PrintList()
 | 
			
		||||
	} else {
 | 
			
		||||
		subClusterNodeLists := archive.NodeLists[cluster]
 | 
			
		||||
		for _, nodeList := range subClusterNodeLists {
 | 
			
		||||
			nodes = append(nodes, nodeList.PrintList()...)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 2) Filter nodes
 | 
			
		||||
	if nodeFilter != "" {
 | 
			
		||||
		filteredNodes := []string{}
 | 
			
		||||
		for _, node := range nodes {
 | 
			
		||||
			if strings.Contains(node, nodeFilter) {
 | 
			
		||||
				filteredNodes = append(filteredNodes, node)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		nodes = filteredNodes
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 2.1) Count total nodes && Sort nodes -> Sorting invalidated after return ...
 | 
			
		||||
	totalNodes = len(nodes)
 | 
			
		||||
	sort.Strings(nodes)
 | 
			
		||||
 | 
			
		||||
	// 3) Apply paging
 | 
			
		||||
	if len(nodes) > page.ItemsPerPage {
 | 
			
		||||
		start := (page.Page - 1) * page.ItemsPerPage
 | 
			
		||||
		end := start + page.ItemsPerPage
 | 
			
		||||
		if end > len(nodes) {
 | 
			
		||||
			end = len(nodes)
 | 
			
		||||
			hasNextPage = false
 | 
			
		||||
		} else {
 | 
			
		||||
			hasNextPage = true
 | 
			
		||||
		}
 | 
			
		||||
		nodes = nodes[start:end]
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 4) Fetch Data, based on pdb.LoadNodeData()
 | 
			
		||||
 | 
			
		||||
	t0 := time.Now()
 | 
			
		||||
	// Map of hosts of jobData
 | 
			
		||||
	data := make(map[string]schema.JobData)
 | 
			
		||||
 | 
			
		||||
	// query db for each metric
 | 
			
		||||
	// TODO: scopes seems to be always empty
 | 
			
		||||
	if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) {
 | 
			
		||||
		scopes = append(scopes, schema.MetricScopeNode)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, scope := range scopes {
 | 
			
		||||
		if scope != schema.MetricScopeNode {
 | 
			
		||||
			logOnce.Do(func() {
 | 
			
		||||
				log.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)
 | 
			
		||||
			})
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for _, metric := range metrics {
 | 
			
		||||
			metricConfig := archive.GetMetricConfig(cluster, metric)
 | 
			
		||||
			if metricConfig == nil {
 | 
			
		||||
				log.Warnf("Error in LoadNodeListData: Metric %s for cluster %s not configured", metric, cluster)
 | 
			
		||||
				return nil, totalNodes, hasNextPage, errors.New("Prometheus config error")
 | 
			
		||||
			}
 | 
			
		||||
			query, err := pdb.FormatQuery(metric, scope, nodes, cluster)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				log.Warn("Error while formatting prometheus query")
 | 
			
		||||
				return nil, totalNodes, hasNextPage, err
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// ranged query over all nodes
 | 
			
		||||
			r := promv1.Range{
 | 
			
		||||
				Start: from,
 | 
			
		||||
				End:   to,
 | 
			
		||||
				Step:  time.Duration(metricConfig.Timestep * 1e9),
 | 
			
		||||
			}
 | 
			
		||||
			result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				log.Errorf("Prometheus query error in LoadNodeData: %v\n", err)
 | 
			
		||||
				return nil, totalNodes, hasNextPage, errors.New("Prometheus query error")
 | 
			
		||||
			}
 | 
			
		||||
			if len(warnings) > 0 {
 | 
			
		||||
				log.Warnf("Warnings: %v\n", warnings)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			step := int64(metricConfig.Timestep)
 | 
			
		||||
			steps := int64(to.Sub(from).Seconds()) / step
 | 
			
		||||
 | 
			
		||||
			// iter rows of host, metric, values
 | 
			
		||||
			for _, row := range result.(promm.Matrix) {
 | 
			
		||||
				hostname := strings.TrimSuffix(string(row.Metric["exported_instance"]), pdb.suffix)
 | 
			
		||||
 | 
			
		||||
				hostdata, ok := data[hostname]
 | 
			
		||||
				if !ok {
 | 
			
		||||
					hostdata = make(schema.JobData)
 | 
			
		||||
					data[hostname] = hostdata
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				metricdata, ok := hostdata[metric]
 | 
			
		||||
				if !ok {
 | 
			
		||||
					metricdata = make(map[schema.MetricScope]*schema.JobMetric)
 | 
			
		||||
					data[hostname][metric] = metricdata
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				// output per host, metric and scope
 | 
			
		||||
				scopeData, ok := metricdata[scope]
 | 
			
		||||
				if !ok {
 | 
			
		||||
					scopeData = &schema.JobMetric{
 | 
			
		||||
						Unit:     metricConfig.Unit,
 | 
			
		||||
						Timestep: metricConfig.Timestep,
 | 
			
		||||
						Series:   []schema.Series{pdb.RowToSeries(from, step, steps, row)},
 | 
			
		||||
					}
 | 
			
		||||
					data[hostname][metric][scope] = scopeData
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	t1 := time.Since(t0)
 | 
			
		||||
	log.Debugf("LoadNodeListData of %v nodes took %s", len(data), t1)
 | 
			
		||||
	return data, totalNodes, hasNextPage, nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -36,7 +36,17 @@ func (tmdr *TestMetricDataRepository) LoadData(
 | 
			
		||||
 | 
			
		||||
func (tmdr *TestMetricDataRepository) LoadStats(
 | 
			
		||||
	job *schema.Job,
 | 
			
		||||
	metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) {
 | 
			
		||||
	metrics []string,
 | 
			
		||||
	ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) {
 | 
			
		||||
 | 
			
		||||
	panic("TODO")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (tmdr *TestMetricDataRepository) LoadScopedStats(
 | 
			
		||||
	job *schema.Job,
 | 
			
		||||
	metrics []string,
 | 
			
		||||
	scopes []schema.MetricScope,
 | 
			
		||||
	ctx context.Context) (schema.ScopedJobStats, error) {
 | 
			
		||||
 | 
			
		||||
	panic("TODO")
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -27,6 +27,8 @@ type ArchiveBackend interface {
 | 
			
		||||
 | 
			
		||||
	LoadJobData(job *schema.Job) (schema.JobData, error)
 | 
			
		||||
 | 
			
		||||
	LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error)
 | 
			
		||||
 | 
			
		||||
	LoadClusterCfg(name string) (*schema.Cluster, error)
 | 
			
		||||
 | 
			
		||||
	StoreJobMeta(jobMeta *schema.JobMeta) error
 | 
			
		||||
@@ -87,7 +89,7 @@ func Init(rawConfig json.RawMessage, disableArchive bool) error {
 | 
			
		||||
		var version uint64
 | 
			
		||||
		version, err = ar.Init(rawConfig)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Error("Error while initializing archiveBackend")
 | 
			
		||||
			log.Errorf("Error while initializing archiveBackend: %s", err.Error())
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		log.Infof("Load archive version %d", version)
 | 
			
		||||
@@ -110,7 +112,7 @@ func LoadAveragesFromArchive(
 | 
			
		||||
) error {
 | 
			
		||||
	metaFile, err := ar.LoadJobMeta(job)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("Error while loading job metadata from archiveBackend")
 | 
			
		||||
		log.Errorf("Error while loading job metadata from archiveBackend: %s", err.Error())
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -125,7 +127,7 @@ func LoadAveragesFromArchive(
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Helper to metricdataloader.LoadStatData().
 | 
			
		||||
// Helper to metricdataloader.LoadJobStats().
 | 
			
		||||
func LoadStatsFromArchive(
 | 
			
		||||
	job *schema.Job,
 | 
			
		||||
	metrics []string,
 | 
			
		||||
@@ -133,7 +135,7 @@ func LoadStatsFromArchive(
 | 
			
		||||
	data := make(map[string]schema.MetricStatistics, len(metrics))
 | 
			
		||||
	metaFile, err := ar.LoadJobMeta(job)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("Error while loading job metadata from archiveBackend")
 | 
			
		||||
		log.Errorf("Error while loading job metadata from archiveBackend: %s", err.Error())
 | 
			
		||||
		return data, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -154,10 +156,26 @@ func LoadStatsFromArchive(
 | 
			
		||||
	return data, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Helper to metricdataloader.LoadScopedJobStats().
 | 
			
		||||
func LoadScopedStatsFromArchive(
 | 
			
		||||
	job *schema.Job,
 | 
			
		||||
	metrics []string,
 | 
			
		||||
	scopes []schema.MetricScope,
 | 
			
		||||
) (schema.ScopedJobStats, error) {
 | 
			
		||||
 | 
			
		||||
	data, err := ar.LoadJobStats(job)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Errorf("Error while loading job stats from archiveBackend: %s", err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return data, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
 | 
			
		||||
	metaFile, err := ar.LoadJobMeta(job)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("Error while loading job metadata from archiveBackend")
 | 
			
		||||
		log.Errorf("Error while loading job metadata from archiveBackend: %s", err.Error())
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -173,7 +191,7 @@ func UpdateMetadata(job *schema.Job, metadata map[string]string) error {
 | 
			
		||||
 | 
			
		||||
	jobMeta, err := ar.LoadJobMeta(job)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("Error while loading job metadata from archiveBackend")
 | 
			
		||||
		log.Errorf("Error while loading job metadata from archiveBackend: %s", err.Error())
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -193,7 +211,7 @@ func UpdateTags(job *schema.Job, tags []*schema.Tag) error {
 | 
			
		||||
 | 
			
		||||
	jobMeta, err := ar.LoadJobMeta(job)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("Error while loading job metadata from archiveBackend")
 | 
			
		||||
		log.Errorf("Error while loading job metadata from archiveBackend: %s", err.Error())
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -115,6 +115,40 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func loadJobStats(filename string, isCompressed bool) (schema.ScopedJobStats, error) {
 | 
			
		||||
	f, err := os.Open(filename)
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Errorf("fsBackend LoadJobStats()- %v", err)
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	defer f.Close()
 | 
			
		||||
 | 
			
		||||
	if isCompressed {
 | 
			
		||||
		r, err := gzip.NewReader(f)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Errorf(" %v", err)
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		defer r.Close()
 | 
			
		||||
 | 
			
		||||
		if config.Keys.Validate {
 | 
			
		||||
			if err := schema.Validate(schema.Data, r); err != nil {
 | 
			
		||||
				return nil, fmt.Errorf("validate job data: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return DecodeJobStats(r, filename)
 | 
			
		||||
	} else {
 | 
			
		||||
		if config.Keys.Validate {
 | 
			
		||||
			if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil {
 | 
			
		||||
				return nil, fmt.Errorf("validate job data: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return DecodeJobStats(bufio.NewReader(f), filename)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
 | 
			
		||||
 | 
			
		||||
	var config FsArchiveConfig
 | 
			
		||||
@@ -389,6 +423,18 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
 | 
			
		||||
	return loadJobData(filename, isCompressed)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (fsa *FsArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error) {
 | 
			
		||||
	var isCompressed bool = true
 | 
			
		||||
	filename := getPath(job, fsa.path, "data.json.gz")
 | 
			
		||||
 | 
			
		||||
	if !util.CheckFileExists(filename) {
 | 
			
		||||
		filename = getPath(job, fsa.path, "data.json")
 | 
			
		||||
		isCompressed = false
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return loadJobStats(filename, isCompressed)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) {
 | 
			
		||||
	filename := getPath(job, fsa.path, "meta.json")
 | 
			
		||||
	return loadJobMeta(filename)
 | 
			
		||||
 
 | 
			
		||||
@@ -32,6 +32,43 @@ func DecodeJobData(r io.Reader, k string) (schema.JobData, error) {
 | 
			
		||||
	return data.(schema.JobData), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func DecodeJobStats(r io.Reader, k string) (schema.ScopedJobStats, error) {
 | 
			
		||||
	jobData, err := DecodeJobData(r, k)
 | 
			
		||||
	// Convert schema.JobData to schema.ScopedJobStats
 | 
			
		||||
	if jobData != nil {
 | 
			
		||||
		scopedJobStats := make(schema.ScopedJobStats)
 | 
			
		||||
		for metric, metricData := range jobData {
 | 
			
		||||
			if _, ok := scopedJobStats[metric]; !ok {
 | 
			
		||||
				scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			for scope, jobMetric := range metricData {
 | 
			
		||||
				if _, ok := scopedJobStats[metric][scope]; !ok {
 | 
			
		||||
					scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0)
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				for _, series := range jobMetric.Series {
 | 
			
		||||
					scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{
 | 
			
		||||
						Hostname: series.Hostname,
 | 
			
		||||
						Id:       series.Id,
 | 
			
		||||
						Data:     &series.Statistics,
 | 
			
		||||
					})
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				// So that one can later check len(scopedJobStats[metric][scope]): Remove from map if empty
 | 
			
		||||
				if len(scopedJobStats[metric][scope]) == 0 {
 | 
			
		||||
					delete(scopedJobStats[metric], scope)
 | 
			
		||||
					if len(scopedJobStats[metric]) == 0 {
 | 
			
		||||
						delete(scopedJobStats, metric)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return scopedJobStats, nil
 | 
			
		||||
	}
 | 
			
		||||
	return nil, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func DecodeJobMeta(r io.Reader) (*schema.JobMeta, error) {
 | 
			
		||||
	var d schema.JobMeta
 | 
			
		||||
	if err := json.NewDecoder(r).Decode(&d); err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -15,6 +15,7 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type JobData map[string]map[MetricScope]*JobMetric
 | 
			
		||||
type ScopedJobStats map[string]map[MetricScope][]*ScopedStats
 | 
			
		||||
 | 
			
		||||
type JobMetric struct {
 | 
			
		||||
	StatisticsSeries *StatsSeries `json:"statisticsSeries,omitempty"`
 | 
			
		||||
@@ -30,6 +31,12 @@ type Series struct {
 | 
			
		||||
	Statistics MetricStatistics `json:"statistics"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type ScopedStats struct {
 | 
			
		||||
	Hostname string            `json:"hostname"`
 | 
			
		||||
	Id       *string           `json:"id,omitempty"`
 | 
			
		||||
	Data     *MetricStatistics `json:"data"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type MetricStatistics struct {
 | 
			
		||||
	Avg float64 `json:"avg"`
 | 
			
		||||
	Min float64 `json:"min"`
 | 
			
		||||
 
 | 
			
		||||
@@ -40,7 +40,7 @@
 | 
			
		||||
  import JobRoofline from "./job/JobRoofline.svelte";
 | 
			
		||||
  import EnergySummary from "./job/EnergySummary.svelte";
 | 
			
		||||
  import PlotGrid from "./generic/PlotGrid.svelte";
 | 
			
		||||
  import StatsTable from "./job/StatsTable.svelte";
 | 
			
		||||
  import StatsTab from "./job/StatsTab.svelte";
 | 
			
		||||
 | 
			
		||||
  export let dbid;
 | 
			
		||||
  export let username;
 | 
			
		||||
@@ -53,10 +53,8 @@
 | 
			
		||||
 | 
			
		||||
 let isMetricsSelectionOpen = false,
 | 
			
		||||
    selectedMetrics = [],
 | 
			
		||||
    selectedScopes = [];
 | 
			
		||||
 | 
			
		||||
  let plots = {},
 | 
			
		||||
    statsTable
 | 
			
		||||
    selectedScopes = [],
 | 
			
		||||
    plots = {};
 | 
			
		||||
 | 
			
		||||
  let availableMetrics = new Set(),
 | 
			
		||||
    missingMetrics = [],
 | 
			
		||||
@@ -127,28 +125,17 @@
 | 
			
		||||
    let job = $initq.data.job;
 | 
			
		||||
    if (!job) return;
 | 
			
		||||
 | 
			
		||||
    const pendingMetrics = [
 | 
			
		||||
      ...(
 | 
			
		||||
        (
 | 
			
		||||
          ccconfig[`job_view_selectedMetrics:${job.cluster}:${job.subCluster}`] ||
 | 
			
		||||
          ccconfig[`job_view_selectedMetrics:${job.cluster}`]
 | 
			
		||||
        ) ||
 | 
			
		||||
        $initq.data.globalMetrics
 | 
			
		||||
          .reduce((names, gm) => {
 | 
			
		||||
            if (gm.availability.find((av) => av.cluster === job.cluster && av.subClusters.includes(job.subCluster))) {
 | 
			
		||||
              names.push(gm.name);
 | 
			
		||||
            }
 | 
			
		||||
            return names;
 | 
			
		||||
          }, [])
 | 
			
		||||
      ),
 | 
			
		||||
      ...(
 | 
			
		||||
        (
 | 
			
		||||
          ccconfig[`job_view_nodestats_selectedMetrics:${job.cluster}:${job.subCluster}`] ||
 | 
			
		||||
          ccconfig[`job_view_nodestats_selectedMetrics:${job.cluster}`]
 | 
			
		||||
        ) ||
 | 
			
		||||
        ccconfig[`job_view_nodestats_selectedMetrics`]
 | 
			
		||||
      ),
 | 
			
		||||
    ];
 | 
			
		||||
    const pendingMetrics = (
 | 
			
		||||
      ccconfig[`job_view_selectedMetrics:${job.cluster}:${job.subCluster}`] ||
 | 
			
		||||
      ccconfig[`job_view_selectedMetrics:${job.cluster}`]
 | 
			
		||||
    ) ||
 | 
			
		||||
    $initq.data.globalMetrics
 | 
			
		||||
      .reduce((names, gm) => {
 | 
			
		||||
        if (gm.availability.find((av) => av.cluster === job.cluster && av.subClusters.includes(job.subCluster))) {
 | 
			
		||||
          names.push(gm.name);
 | 
			
		||||
        }
 | 
			
		||||
        return names;
 | 
			
		||||
      }, [])
 | 
			
		||||
 | 
			
		||||
    // Select default Scopes to load: Check before if any metric has accelerator scope by default
 | 
			
		||||
    const accScopeDefault = [...pendingMetrics].some(function (m) {
 | 
			
		||||
@@ -343,7 +330,6 @@
 | 
			
		||||
        {#if item.data}
 | 
			
		||||
          <Metric
 | 
			
		||||
            bind:this={plots[item.metric]}
 | 
			
		||||
            on:more-loaded={({ detail }) => statsTable.moreLoaded(detail)}
 | 
			
		||||
            job={$initq.data.job}
 | 
			
		||||
            metricName={item.metric}
 | 
			
		||||
            metricUnit={$initq.data.globalMetrics.find((gm) => gm.name == item.metric)?.unit}
 | 
			
		||||
@@ -398,22 +384,8 @@
 | 
			
		||||
              </div>
 | 
			
		||||
            </TabPane>
 | 
			
		||||
          {/if}
 | 
			
		||||
          <TabPane
 | 
			
		||||
            tabId="stats"
 | 
			
		||||
            tab="Statistics Table"
 | 
			
		||||
            class="overflow-x-auto"
 | 
			
		||||
            active={!somethingMissing}
 | 
			
		||||
          >
 | 
			
		||||
            {#if $jobMetrics?.data?.jobMetrics}
 | 
			
		||||
              {#key $jobMetrics.data.jobMetrics}
 | 
			
		||||
                <StatsTable
 | 
			
		||||
                  bind:this={statsTable}
 | 
			
		||||
                  job={$initq.data.job}
 | 
			
		||||
                  jobMetrics={$jobMetrics.data.jobMetrics}
 | 
			
		||||
                />
 | 
			
		||||
              {/key}
 | 
			
		||||
            {/if}
 | 
			
		||||
          </TabPane>
 | 
			
		||||
          <!-- Includes <TabPane> Statistics Table with Independent GQL Query -->
 | 
			
		||||
          <StatsTab job={$initq.data.job} clusters={$initq.data.clusters} tabActive={!somethingMissing}/>
 | 
			
		||||
          <TabPane tabId="job-script" tab="Job Script">
 | 
			
		||||
            <div class="pre-wrapper">
 | 
			
		||||
              {#if $initq.data.job.metaData?.jobScript}
 | 
			
		||||
 
 | 
			
		||||
@@ -150,11 +150,6 @@
 | 
			
		||||
 | 
			
		||||
        // On additional scope request
 | 
			
		||||
        if (selectedScope == "load-all") {
 | 
			
		||||
          // Push scope to statsTable (Needs to be in this case, else newly selected 'Metric.svelte' renders cause statsTable race condition)
 | 
			
		||||
          const statsTableData = $metricData.data.singleUpdate.filter((x) => x.scope !== "node")
 | 
			
		||||
          if (statsTableData.length > 0) {
 | 
			
		||||
            dispatch("more-loaded", statsTableData);
 | 
			
		||||
          }
 | 
			
		||||
          // Set selected scope to min of returned scopes
 | 
			
		||||
          selectedScope = minScope(scopes)
 | 
			
		||||
          nodeOnly = (selectedScope == "node") // "node" still only scope after load-all
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										145
									
								
								web/frontend/src/job/StatsTab.svelte
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										145
									
								
								web/frontend/src/job/StatsTab.svelte
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,145 @@
 | 
			
		||||
<!--
 | 
			
		||||
    @component Job-View subcomponent; Wraps the statsTable in a TabPane and contains GQL query for scoped statsData
 | 
			
		||||
 | 
			
		||||
    Properties:
 | 
			
		||||
    - `job Object`: The job object
 | 
			
		||||
    - `clusters Object`: The clusters object
 | 
			
		||||
    - `tabActive bool`: Boolean if StatsTabe Tab is Active on Creation
 | 
			
		||||
 -->
 | 
			
		||||
 | 
			
		||||
<script>
 | 
			
		||||
  import { 
 | 
			
		||||
    queryStore,
 | 
			
		||||
    gql,
 | 
			
		||||
    getContextClient 
 | 
			
		||||
  } from "@urql/svelte";
 | 
			
		||||
  import { getContext } from "svelte";
 | 
			
		||||
  import {
 | 
			
		||||
    Card,
 | 
			
		||||
    Button,
 | 
			
		||||
    Row,
 | 
			
		||||
    Col,
 | 
			
		||||
    TabPane,
 | 
			
		||||
    Spinner,
 | 
			
		||||
    Icon
 | 
			
		||||
  } from "@sveltestrap/sveltestrap";
 | 
			
		||||
  import MetricSelection from "../generic/select/MetricSelection.svelte";
 | 
			
		||||
  import StatsTable from "./statstab/StatsTable.svelte";
 | 
			
		||||
 | 
			
		||||
  export let job;
 | 
			
		||||
  export let clusters;
 | 
			
		||||
  export let tabActive;
 | 
			
		||||
 | 
			
		||||
  let loadScopes = false;
 | 
			
		||||
  let selectedScopes = [];
 | 
			
		||||
  let selectedMetrics = [];
 | 
			
		||||
  let availableMetrics = new Set(); // For Info Only, filled by MetricSelection Component
 | 
			
		||||
  let isMetricSelectionOpen = false;
 | 
			
		||||
 | 
			
		||||
  const client = getContextClient();
 | 
			
		||||
  const query = gql`
 | 
			
		||||
    query ($dbid: ID!, $selectedMetrics: [String!]!, $selectedScopes: [MetricScope!]!) {
 | 
			
		||||
      scopedJobStats(id: $dbid, metrics: $selectedMetrics, scopes: $selectedScopes) {
 | 
			
		||||
        name
 | 
			
		||||
        scope
 | 
			
		||||
        stats {
 | 
			
		||||
          hostname
 | 
			
		||||
          id
 | 
			
		||||
          data {
 | 
			
		||||
            min
 | 
			
		||||
            avg
 | 
			
		||||
            max
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  `;
 | 
			
		||||
 | 
			
		||||
  $: scopedStats = queryStore({
 | 
			
		||||
    client: client,
 | 
			
		||||
    query: query,
 | 
			
		||||
    variables: { dbid: job.id, selectedMetrics, selectedScopes },
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  $: if (loadScopes) {
 | 
			
		||||
    selectedScopes = ["node", "socket", "core", "hwthread", "accelerator"];
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  // Handle Job Query on Init -> is not executed anymore
 | 
			
		||||
  getContext("on-init")(() => {
 | 
			
		||||
    if (!job) return;
 | 
			
		||||
 | 
			
		||||
    const pendingMetrics = (
 | 
			
		||||
      getContext("cc-config")[`job_view_nodestats_selectedMetrics:${job.cluster}:${job.subCluster}`] ||
 | 
			
		||||
      getContext("cc-config")[`job_view_nodestats_selectedMetrics:${job.cluster}`]
 | 
			
		||||
    ) || getContext("cc-config")["job_view_nodestats_selectedMetrics"];
 | 
			
		||||
 | 
			
		||||
    // Select default Scopes to load: Check before if any metric has accelerator scope by default
 | 
			
		||||
    const accScopeDefault = [...pendingMetrics].some(function (m) {
 | 
			
		||||
      const cluster = clusters.find((c) => c.name == job.cluster);
 | 
			
		||||
      const subCluster = cluster.subClusters.find((sc) => sc.name == job.subCluster);
 | 
			
		||||
      return subCluster.metricConfig.find((smc) => smc.name == m)?.scope === "accelerator";
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    const pendingScopes = ["node"]
 | 
			
		||||
    if (job.numNodes === 1) {
 | 
			
		||||
      pendingScopes.push("socket")
 | 
			
		||||
      pendingScopes.push("core")
 | 
			
		||||
      pendingScopes.push("hwthread")
 | 
			
		||||
      if (accScopeDefault) { pendingScopes.push("accelerator") }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    selectedMetrics = [...pendingMetrics];
 | 
			
		||||
    selectedScopes = [...pendingScopes];
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
</script>
 | 
			
		||||
 | 
			
		||||
<TabPane tabId="stats" tab="Statistics Table" class="overflow-x-auto" active={tabActive}>
 | 
			
		||||
  <Row>
 | 
			
		||||
    <Col class="m-2">
 | 
			
		||||
      <Button outline on:click={() => (isMetricSelectionOpen = true)} class="px-2" color="primary" style="margin-right:0.5rem">
 | 
			
		||||
        Select Metrics (Selected {selectedMetrics.length} of {availableMetrics.size} available)
 | 
			
		||||
      </Button>
 | 
			
		||||
      {#if job.numNodes > 1}
 | 
			
		||||
        <Button class="px-2 ml-auto" color="success" outline on:click={() => (loadScopes = !loadScopes)} disabled={loadScopes}>
 | 
			
		||||
          {#if !loadScopes}
 | 
			
		||||
            <Icon name="plus-square-fill" style="margin-right:0.25rem"/> Add More Scopes
 | 
			
		||||
          {:else}
 | 
			
		||||
            <Icon name="check-square-fill" style="margin-right:0.25rem"/> OK: Scopes Added
 | 
			
		||||
          {/if}
 | 
			
		||||
        </Button>
 | 
			
		||||
     {/if}
 | 
			
		||||
    </Col>
 | 
			
		||||
  </Row>
 | 
			
		||||
  <hr class="mb-1 mt-1"/>
 | 
			
		||||
  <!-- ROW1: Status-->
 | 
			
		||||
  {#if $scopedStats.fetching}
 | 
			
		||||
    <Row>
 | 
			
		||||
      <Col class="m-3" style="text-align: center;">
 | 
			
		||||
        <Spinner secondary/>
 | 
			
		||||
      </Col>
 | 
			
		||||
    </Row>
 | 
			
		||||
  {:else if $scopedStats.error}
 | 
			
		||||
    <Row>
 | 
			
		||||
      <Col class="m-2">
 | 
			
		||||
        <Card body color="danger">{$scopedStats.error.message}</Card>
 | 
			
		||||
      </Col>
 | 
			
		||||
    </Row>
 | 
			
		||||
  {:else}
 | 
			
		||||
    <StatsTable 
 | 
			
		||||
      hosts={job.resources.map((r) => r.hostname).sort()}
 | 
			
		||||
      data={$scopedStats?.data?.scopedJobStats}
 | 
			
		||||
      {selectedMetrics}
 | 
			
		||||
    />
 | 
			
		||||
  {/if}
 | 
			
		||||
</TabPane>
 | 
			
		||||
 | 
			
		||||
<MetricSelection
 | 
			
		||||
  cluster={job.cluster}
 | 
			
		||||
  subCluster={job.subCluster}
 | 
			
		||||
  configName="job_view_nodestats_selectedMetrics"
 | 
			
		||||
  bind:allMetrics={availableMetrics}
 | 
			
		||||
  bind:metrics={selectedMetrics}
 | 
			
		||||
  bind:isOpen={isMetricSelectionOpen}
 | 
			
		||||
/>
 | 
			
		||||
@@ -1,178 +0,0 @@
 | 
			
		||||
<!--
 | 
			
		||||
    @component Job-View subcomponent; display table of metric data statistics with selectable scopes
 | 
			
		||||
 | 
			
		||||
    Properties:
 | 
			
		||||
    - `job Object`: The job object
 | 
			
		||||
    - `jobMetrics [Object]`: The jobs metricdata
 | 
			
		||||
 | 
			
		||||
    Exported:
 | 
			
		||||
    - `moreLoaded`: Adds additional scopes requested from Metric.svelte in Job-View
 | 
			
		||||
 -->
 | 
			
		||||
 | 
			
		||||
<script>
 | 
			
		||||
  import { getContext } from "svelte";
 | 
			
		||||
  import {
 | 
			
		||||
    Button,
 | 
			
		||||
    Table,
 | 
			
		||||
    Input,
 | 
			
		||||
    InputGroup,
 | 
			
		||||
    InputGroupText,
 | 
			
		||||
    Icon,
 | 
			
		||||
    Row,
 | 
			
		||||
    Col
 | 
			
		||||
  } from "@sveltestrap/sveltestrap";
 | 
			
		||||
  import { maxScope } from "../generic/utils.js";
 | 
			
		||||
  import StatsTableEntry from "./StatsTableEntry.svelte";
 | 
			
		||||
  import MetricSelection from "../generic/select/MetricSelection.svelte";
 | 
			
		||||
 | 
			
		||||
  export let job;
 | 
			
		||||
  export let jobMetrics;
 | 
			
		||||
 | 
			
		||||
  const sortedJobMetrics = [...new Set(jobMetrics.map((m) => m.name))].sort()
 | 
			
		||||
  const scopesForMetric = (metric) =>
 | 
			
		||||
      jobMetrics.filter((jm) => jm.name == metric).map((jm) => jm.scope);
 | 
			
		||||
 | 
			
		||||
  let hosts = job.resources.map((r) => r.hostname).sort(),
 | 
			
		||||
    selectedScopes = {},
 | 
			
		||||
    sorting = {},
 | 
			
		||||
    isMetricSelectionOpen = false,
 | 
			
		||||
    availableMetrics = new Set(),
 | 
			
		||||
    selectedMetrics = (
 | 
			
		||||
      getContext("cc-config")[`job_view_nodestats_selectedMetrics:${job.cluster}:${job.subCluster}`] ||
 | 
			
		||||
      getContext("cc-config")[`job_view_nodestats_selectedMetrics:${job.cluster}`]
 | 
			
		||||
    ) || getContext("cc-config")["job_view_nodestats_selectedMetrics"];
 | 
			
		||||
 | 
			
		||||
  for (let metric of sortedJobMetrics) {
 | 
			
		||||
    // Not Exclusive or Multi-Node: get maxScope directly (mostly: node)
 | 
			
		||||
    //   -> Else: Load smallest available granularity as default as per availability
 | 
			
		||||
    const availableScopes = scopesForMetric(metric);
 | 
			
		||||
    if (job.exclusive != 1 || job.numNodes == 1) {
 | 
			
		||||
      if (availableScopes.includes("accelerator")) {
 | 
			
		||||
        selectedScopes[metric] = "accelerator";
 | 
			
		||||
      } else if (availableScopes.includes("core")) {
 | 
			
		||||
        selectedScopes[metric] = "core";
 | 
			
		||||
      } else if (availableScopes.includes("socket")) {
 | 
			
		||||
        selectedScopes[metric] = "socket";
 | 
			
		||||
      } else {
 | 
			
		||||
        selectedScopes[metric] = "node";
 | 
			
		||||
      }
 | 
			
		||||
    } else {
 | 
			
		||||
      selectedScopes[metric] = maxScope(availableScopes);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    sorting[metric] = {
 | 
			
		||||
      min: { dir: "up", active: false },
 | 
			
		||||
      avg: { dir: "up", active: false },
 | 
			
		||||
      max: { dir: "up", active: false },
 | 
			
		||||
    };
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  function sortBy(metric, stat) {
 | 
			
		||||
    let s = sorting[metric][stat];
 | 
			
		||||
    if (s.active) {
 | 
			
		||||
      s.dir = s.dir == "up" ? "down" : "up";
 | 
			
		||||
    } else {
 | 
			
		||||
      for (let metric in sorting)
 | 
			
		||||
        for (let stat in sorting[metric]) sorting[metric][stat].active = false;
 | 
			
		||||
      s.active = true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let series = jobMetrics.find(
 | 
			
		||||
      (jm) => jm.name == metric && jm.scope == "node",
 | 
			
		||||
    )?.metric.series;
 | 
			
		||||
    sorting = { ...sorting };
 | 
			
		||||
    hosts = hosts.sort((h1, h2) => {
 | 
			
		||||
      let s1 = series.find((s) => s.hostname == h1)?.statistics;
 | 
			
		||||
      let s2 = series.find((s) => s.hostname == h2)?.statistics;
 | 
			
		||||
      if (s1 == null || s2 == null) return -1;
 | 
			
		||||
 | 
			
		||||
      return s.dir != "up" ? s1[stat] - s2[stat] : s2[stat] - s1[stat];
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  export function moreLoaded(moreJobMetrics) {
 | 
			
		||||
    moreJobMetrics.forEach(function (newMetric) {
 | 
			
		||||
      if (!jobMetrics.some((m) => m.scope == newMetric.scope)) {
 | 
			
		||||
        jobMetrics = [...jobMetrics, newMetric]
 | 
			
		||||
      }
 | 
			
		||||
    });
 | 
			
		||||
  };
 | 
			
		||||
</script>
 | 
			
		||||
 | 
			
		||||
<Row>
 | 
			
		||||
  <Col class="m-2">
 | 
			
		||||
    <Button outline on:click={() => (isMetricSelectionOpen = true)} class="w-auto px-2" color="primary">
 | 
			
		||||
      Select Metrics (Selected {selectedMetrics.length} of {availableMetrics.size} available)
 | 
			
		||||
    </Button>
 | 
			
		||||
  </Col>
 | 
			
		||||
</Row>
 | 
			
		||||
<hr class="mb-1 mt-1"/>
 | 
			
		||||
<Table class="mb-0">
 | 
			
		||||
  <thead>
 | 
			
		||||
    <!-- Header Row 1: Selectors -->
 | 
			
		||||
    <tr>
 | 
			
		||||
      <th/>
 | 
			
		||||
      {#each selectedMetrics as metric}
 | 
			
		||||
        <!-- To Match Row-2 Header Field Count-->
 | 
			
		||||
        <th colspan={selectedScopes[metric] == "node" ? 3 : 4}>
 | 
			
		||||
          <InputGroup>
 | 
			
		||||
            <InputGroupText>
 | 
			
		||||
              {metric}
 | 
			
		||||
            </InputGroupText>
 | 
			
		||||
            <Input type="select" bind:value={selectedScopes[metric]}>
 | 
			
		||||
              {#each scopesForMetric(metric, jobMetrics) as scope}
 | 
			
		||||
                <option value={scope}>{scope}</option>
 | 
			
		||||
              {/each}
 | 
			
		||||
            </Input>
 | 
			
		||||
          </InputGroup>
 | 
			
		||||
        </th>
 | 
			
		||||
      {/each}
 | 
			
		||||
    </tr>
 | 
			
		||||
    <!-- Header Row 2: Fields -->
 | 
			
		||||
    <tr>
 | 
			
		||||
      <th>Node</th>
 | 
			
		||||
      {#each selectedMetrics as metric}
 | 
			
		||||
        {#if selectedScopes[metric] != "node"}
 | 
			
		||||
          <th>Id</th>
 | 
			
		||||
        {/if}
 | 
			
		||||
        {#each ["min", "avg", "max"] as stat}
 | 
			
		||||
          <th on:click={() => sortBy(metric, stat)}>
 | 
			
		||||
            {stat}
 | 
			
		||||
            {#if selectedScopes[metric] == "node"}
 | 
			
		||||
              <Icon
 | 
			
		||||
                name="caret-{sorting[metric][stat].dir}{sorting[metric][stat]
 | 
			
		||||
                  .active
 | 
			
		||||
                  ? '-fill'
 | 
			
		||||
                  : ''}"
 | 
			
		||||
              />
 | 
			
		||||
            {/if}
 | 
			
		||||
          </th>
 | 
			
		||||
        {/each}
 | 
			
		||||
      {/each}
 | 
			
		||||
    </tr>
 | 
			
		||||
  </thead>
 | 
			
		||||
  <tbody>
 | 
			
		||||
    {#each hosts as host (host)}
 | 
			
		||||
      <tr>
 | 
			
		||||
        <th scope="col">{host}</th>
 | 
			
		||||
        {#each selectedMetrics as metric (metric)}
 | 
			
		||||
          <StatsTableEntry
 | 
			
		||||
            {host}
 | 
			
		||||
            {metric}
 | 
			
		||||
            scope={selectedScopes[metric]}
 | 
			
		||||
            {jobMetrics}
 | 
			
		||||
          />
 | 
			
		||||
        {/each}
 | 
			
		||||
      </tr>
 | 
			
		||||
    {/each}
 | 
			
		||||
  </tbody>
 | 
			
		||||
</Table>
 | 
			
		||||
 | 
			
		||||
<MetricSelection
 | 
			
		||||
  cluster={job.cluster}
 | 
			
		||||
  subCluster={job.subCluster}
 | 
			
		||||
  configName="job_view_nodestats_selectedMetrics"
 | 
			
		||||
  bind:allMetrics={availableMetrics}
 | 
			
		||||
  bind:metrics={selectedMetrics}
 | 
			
		||||
  bind:isOpen={isMetricSelectionOpen}
 | 
			
		||||
/>
 | 
			
		||||
@@ -40,14 +40,14 @@
 | 
			
		||||
    const client = getContextClient();
 | 
			
		||||
    const polarQuery = gql`
 | 
			
		||||
    query ($dbid: ID!, $selectedMetrics: [String!]!) {
 | 
			
		||||
        jobMetricStats(id: $dbid, metrics: $selectedMetrics) {
 | 
			
		||||
      jobStats(id: $dbid, metrics: $selectedMetrics) {
 | 
			
		||||
        name
 | 
			
		||||
        stats {
 | 
			
		||||
            min
 | 
			
		||||
            avg
 | 
			
		||||
            max
 | 
			
		||||
        }
 | 
			
		||||
          min
 | 
			
		||||
          avg
 | 
			
		||||
          max
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    `;
 | 
			
		||||
 | 
			
		||||
@@ -66,7 +66,7 @@
 | 
			
		||||
  {:else}
 | 
			
		||||
    <Polar
 | 
			
		||||
      {polarMetrics}
 | 
			
		||||
      polarData={$polarData.data.jobMetricStats}
 | 
			
		||||
      polarData={$polarData.data.jobStats}
 | 
			
		||||
    />
 | 
			
		||||
  {/if}
 | 
			
		||||
</CardBody>
 | 
			
		||||
							
								
								
									
										139
									
								
								web/frontend/src/job/statstab/StatsTable.svelte
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										139
									
								
								web/frontend/src/job/statstab/StatsTable.svelte
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,139 @@
 | 
			
		||||
<!--:
 | 
			
		||||
    @component Job-View subcomponent; display table of metric data statistics with selectable scopes
 | 
			
		||||
 | 
			
		||||
    Properties:
 | 
			
		||||
    - `data Object`: The data object
 | 
			
		||||
    - `selectedMetrics [String]`: The selected metrics
 | 
			
		||||
    - `hosts [String]`: The list of hostnames of this job
 | 
			
		||||
 -->
 | 
			
		||||
 | 
			
		||||
<script>
 | 
			
		||||
  import {
 | 
			
		||||
    Table,
 | 
			
		||||
    Input,
 | 
			
		||||
    InputGroup,
 | 
			
		||||
    InputGroupText,
 | 
			
		||||
    Icon,
 | 
			
		||||
  } from "@sveltestrap/sveltestrap";
 | 
			
		||||
  import StatsTableEntry from "./StatsTableEntry.svelte";
 | 
			
		||||
 | 
			
		||||
  export let data = [];
 | 
			
		||||
  export let selectedMetrics = [];
 | 
			
		||||
  export let hosts = [];
 | 
			
		||||
 | 
			
		||||
  let sorting = {};
 | 
			
		||||
  let availableScopes = {};
 | 
			
		||||
  let selectedScopes = {};
 | 
			
		||||
 | 
			
		||||
  const scopesForMetric = (metric) =>
 | 
			
		||||
    data?.filter((jm) => jm.name == metric)?.map((jm) => jm.scope) || [];
 | 
			
		||||
  const setScopeForMetric = (metric, scope) =>
 | 
			
		||||
    selectedScopes[metric] = scope
 | 
			
		||||
 | 
			
		||||
  $: if (data && selectedMetrics) {
 | 
			
		||||
    for (let metric of selectedMetrics) {
 | 
			
		||||
      availableScopes[metric] = scopesForMetric(metric);
 | 
			
		||||
      // Set Initial Selection, but do not use selectedScopes: Skips reactivity
 | 
			
		||||
      if (availableScopes[metric].includes("accelerator")) {
 | 
			
		||||
        setScopeForMetric(metric, "accelerator");
 | 
			
		||||
      } else if (availableScopes[metric].includes("core")) {
 | 
			
		||||
        setScopeForMetric(metric, "core");
 | 
			
		||||
      } else if (availableScopes[metric].includes("socket")) {
 | 
			
		||||
        setScopeForMetric(metric, "socket");
 | 
			
		||||
      } else {
 | 
			
		||||
        setScopeForMetric(metric, "node");
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      sorting[metric] = {
 | 
			
		||||
        min: { dir: "up", active: false },
 | 
			
		||||
        avg: { dir: "up", active: false },
 | 
			
		||||
        max: { dir: "up", active: false },
 | 
			
		||||
      };
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  function sortBy(metric, stat) {
 | 
			
		||||
    let s = sorting[metric][stat];
 | 
			
		||||
    if (s.active) {
 | 
			
		||||
     s.dir = s.dir == "up" ? "down" : "up";
 | 
			
		||||
    } else {
 | 
			
		||||
      for (let metric in sorting)
 | 
			
		||||
        for (let stat in sorting[metric]) sorting[metric][stat].active = false;
 | 
			
		||||
      s.active = true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let stats = data.find(
 | 
			
		||||
      (d) => d.name == metric && d.scope == "node",
 | 
			
		||||
    )?.stats || [];
 | 
			
		||||
    sorting = { ...sorting };
 | 
			
		||||
    hosts = hosts.sort((h1, h2) => {
 | 
			
		||||
      let s1 = stats.find((s) => s.hostname == h1)?.data;
 | 
			
		||||
      let s2 = stats.find((s) => s.hostname == h2)?.data;
 | 
			
		||||
      if (s1 == null || s2 == null) return -1;
 | 
			
		||||
 | 
			
		||||
      return s.dir != "up" ? s1[stat] - s2[stat] : s2[stat] - s1[stat];
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
</script>
 | 
			
		||||
 | 
			
		||||
<Table class="mb-0">
 | 
			
		||||
  <thead>
 | 
			
		||||
    <!-- Header Row 1: Selectors -->
 | 
			
		||||
    <tr>
 | 
			
		||||
      <th/>
 | 
			
		||||
      {#each selectedMetrics as metric}
 | 
			
		||||
        <!-- To Match Row-2 Header Field Count-->
 | 
			
		||||
        <th colspan={selectedScopes[metric] == "node" ? 3 : 4}>
 | 
			
		||||
          <InputGroup>
 | 
			
		||||
            <InputGroupText>
 | 
			
		||||
              {metric}
 | 
			
		||||
            </InputGroupText>
 | 
			
		||||
            <Input type="select" bind:value={selectedScopes[metric]} disabled={availableScopes[metric].length === 1}>
 | 
			
		||||
              {#each (availableScopes[metric] || []) as scope}
 | 
			
		||||
                <option value={scope}>{scope}</option>
 | 
			
		||||
              {/each}
 | 
			
		||||
            </Input>
 | 
			
		||||
          </InputGroup>
 | 
			
		||||
        </th>
 | 
			
		||||
      {/each}
 | 
			
		||||
    </tr>
 | 
			
		||||
    <!-- Header Row 2: Fields -->
 | 
			
		||||
    <tr>
 | 
			
		||||
      <th>Node</th>
 | 
			
		||||
      {#each selectedMetrics as metric}
 | 
			
		||||
        {#if selectedScopes[metric] != "node"}
 | 
			
		||||
          <th>Id</th>
 | 
			
		||||
        {/if}
 | 
			
		||||
        {#each ["min", "avg", "max"] as stat}
 | 
			
		||||
          <th on:click={() => sortBy(metric, stat)}>
 | 
			
		||||
            {stat}
 | 
			
		||||
            {#if selectedScopes[metric] == "node"}
 | 
			
		||||
              <Icon
 | 
			
		||||
                name="caret-{sorting[metric][stat].dir}{sorting[metric][stat]
 | 
			
		||||
                  .active
 | 
			
		||||
                  ? '-fill'
 | 
			
		||||
                  : ''}"
 | 
			
		||||
              />
 | 
			
		||||
            {/if}
 | 
			
		||||
          </th>
 | 
			
		||||
        {/each}
 | 
			
		||||
      {/each}
 | 
			
		||||
    </tr>
 | 
			
		||||
  </thead>
 | 
			
		||||
  <tbody>
 | 
			
		||||
    {#each hosts as host (host)}
 | 
			
		||||
      <tr>
 | 
			
		||||
        <th scope="col">{host}</th>
 | 
			
		||||
        {#each selectedMetrics as metric (metric)}
 | 
			
		||||
          <StatsTableEntry
 | 
			
		||||
            {data}
 | 
			
		||||
            {host}
 | 
			
		||||
            {metric}
 | 
			
		||||
            scope={selectedScopes[metric]}
 | 
			
		||||
          />
 | 
			
		||||
        {/each}
 | 
			
		||||
      </tr>
 | 
			
		||||
    {/each}
 | 
			
		||||
  </tbody>
 | 
			
		||||
</Table>
 | 
			
		||||
@@ -1,11 +1,11 @@
 | 
			
		||||
<!--
 | 
			
		||||
    @component Job-View subcomponent; Single Statistics entry component fpr statstable
 | 
			
		||||
    @component Job-View subcomponent; Single Statistics entry component for statstable
 | 
			
		||||
 | 
			
		||||
    Properties:
 | 
			
		||||
    - `host String`: The hostname (== node)
 | 
			
		||||
    - `metric String`: The metric name
 | 
			
		||||
    - `scope String`: The selected scope
 | 
			
		||||
    - `jobMetrics [Object]`: The jobs metricdata
 | 
			
		||||
    - `data [Object]`: The jobs statsdata
 | 
			
		||||
 -->
 | 
			
		||||
 | 
			
		||||
<script>
 | 
			
		||||
@@ -14,59 +14,59 @@
 | 
			
		||||
  export let host;
 | 
			
		||||
  export let metric;
 | 
			
		||||
  export let scope;
 | 
			
		||||
  export let jobMetrics;
 | 
			
		||||
  export let data;
 | 
			
		||||
 | 
			
		||||
  function compareNumbers(a, b) {
 | 
			
		||||
    return a.id - b.id;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  function sortByField(field) {
 | 
			
		||||
    let s = sorting[field];
 | 
			
		||||
    if (s.active) {
 | 
			
		||||
      s.dir = s.dir == "up" ? "down" : "up";
 | 
			
		||||
    } else {
 | 
			
		||||
      for (let field in sorting) sorting[field].active = false;
 | 
			
		||||
      s.active = true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    sorting = { ...sorting };
 | 
			
		||||
    series = series.sort((a, b) => {
 | 
			
		||||
      if (a == null || b == null) return -1;
 | 
			
		||||
 | 
			
		||||
      if (field === "id") {
 | 
			
		||||
        return s.dir != "up" ? a[field] - b[field] : b[field] - a[field];
 | 
			
		||||
      } else {
 | 
			
		||||
        return s.dir != "up"
 | 
			
		||||
          ? a.statistics[field] - b.statistics[field]
 | 
			
		||||
          : b.statistics[field] - a.statistics[field];
 | 
			
		||||
      }
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  let sorting = {
 | 
			
		||||
  let entrySorting = {
 | 
			
		||||
    id: { dir: "down", active: true },
 | 
			
		||||
    min: { dir: "up", active: false },
 | 
			
		||||
    avg: { dir: "up", active: false },
 | 
			
		||||
    max: { dir: "up", active: false },
 | 
			
		||||
  };
 | 
			
		||||
 | 
			
		||||
  $: series = jobMetrics
 | 
			
		||||
    .find((jm) => jm.name == metric && jm.scope == scope)
 | 
			
		||||
    ?.metric.series.filter((s) => s.hostname == host && s.statistics != null)
 | 
			
		||||
    ?.sort(compareNumbers);
 | 
			
		||||
  function compareNumbers(a, b) {
 | 
			
		||||
    return a.id - b.id;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  function sortByField(field) {
 | 
			
		||||
    let s = entrySorting[field];
 | 
			
		||||
    if (s.active) {
 | 
			
		||||
      s.dir = s.dir == "up" ? "down" : "up";
 | 
			
		||||
    } else {
 | 
			
		||||
      for (let field in entrySorting) entrySorting[field].active = false;
 | 
			
		||||
      s.active = true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    entrySorting = { ...entrySorting };
 | 
			
		||||
    stats = stats.sort((a, b) => {
 | 
			
		||||
      if (a == null || b == null) return -1;
 | 
			
		||||
 | 
			
		||||
      if (field === "id") {
 | 
			
		||||
        return s.dir != "up" ?  a[field].localeCompare(b[field]) : b[field].localeCompare(a[field])
 | 
			
		||||
      } else {
 | 
			
		||||
        return s.dir != "up"
 | 
			
		||||
          ? a.data[field] - b.data[field]
 | 
			
		||||
          : b.data[field] - a.data[field];
 | 
			
		||||
      }
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  $: stats = data
 | 
			
		||||
    ?.find((d) => d.name == metric && d.scope == scope)
 | 
			
		||||
    ?.stats.filter((s) => s.hostname == host && s.data != null)
 | 
			
		||||
    ?.sort(compareNumbers) || [];
 | 
			
		||||
</script>
 | 
			
		||||
 | 
			
		||||
{#if series == null || series.length == 0}
 | 
			
		||||
{#if stats == null || stats.length == 0}
 | 
			
		||||
  <td colspan={scope == "node" ? 3 : 4}><i>No data</i></td>
 | 
			
		||||
{:else if series.length == 1 && scope == "node"}
 | 
			
		||||
{:else if stats.length == 1 && scope == "node"}
 | 
			
		||||
  <td>
 | 
			
		||||
    {series[0].statistics.min}
 | 
			
		||||
    {stats[0].data.min}
 | 
			
		||||
  </td>
 | 
			
		||||
  <td>
 | 
			
		||||
    {series[0].statistics.avg}
 | 
			
		||||
    {stats[0].data.avg}
 | 
			
		||||
  </td>
 | 
			
		||||
  <td>
 | 
			
		||||
    {series[0].statistics.max}
 | 
			
		||||
    {stats[0].data.max}
 | 
			
		||||
  </td>
 | 
			
		||||
{:else}
 | 
			
		||||
  <td colspan="4">
 | 
			
		||||
@@ -76,19 +76,19 @@
 | 
			
		||||
          <th on:click={() => sortByField(field)}>
 | 
			
		||||
            Sort
 | 
			
		||||
            <Icon
 | 
			
		||||
              name="caret-{sorting[field].dir}{sorting[field].active
 | 
			
		||||
              name="caret-{entrySorting[field].dir}{entrySorting[field].active
 | 
			
		||||
                ? '-fill'
 | 
			
		||||
                : ''}"
 | 
			
		||||
            />
 | 
			
		||||
          </th>
 | 
			
		||||
        {/each}
 | 
			
		||||
      </tr>
 | 
			
		||||
      {#each series as s, i}
 | 
			
		||||
      {#each stats as s, i}
 | 
			
		||||
        <tr>
 | 
			
		||||
          <th>{s.id ?? i}</th>
 | 
			
		||||
          <td>{s.statistics.min}</td>
 | 
			
		||||
          <td>{s.statistics.avg}</td>
 | 
			
		||||
          <td>{s.statistics.max}</td>
 | 
			
		||||
          <td>{s.data.min}</td>
 | 
			
		||||
          <td>{s.data.avg}</td>
 | 
			
		||||
          <td>{s.data.max}</td>
 | 
			
		||||
        </tr>
 | 
			
		||||
      {/each}
 | 
			
		||||
    </table>
 | 
			
		||||
		Reference in New Issue
	
	Block a user