diff --git a/api/schema.graphqls b/api/schema.graphqls index c38a4a1..ed94d42 100644 --- a/api/schema.graphqls +++ b/api/schema.graphqls @@ -224,7 +224,7 @@ type Query { allocatedNodes(cluster: String!): [Count!]! job(id: ID!): Job - jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]! + jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!], resolution: Int): [JobMetricWithName!]! jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList! diff --git a/internal/api/api_test.go b/internal/api/api_test.go index d0a7916..0312e43 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -175,7 +175,6 @@ func cleanup() { func TestRestApi(t *testing.T) { restapi := setup(t) t.Cleanup(cleanup) - testData := schema.JobData{ "load_one": map[schema.MetricScope]*schema.JobMetric{ schema.MetricScopeNode: { @@ -192,7 +191,7 @@ func TestRestApi(t *testing.T) { }, } - metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { + metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) { return testData, nil } @@ -344,7 +343,7 @@ func TestRestApi(t *testing.T) { } t.Run("CheckArchive", func(t *testing.T) { - data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background()) + data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60) if err != nil { t.Fatal(err) } diff --git a/internal/api/rest.go b/internal/api/rest.go index 6e90365..62b2f19 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -516,8 +516,15 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) var data schema.JobData + metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + resolution := 0 + + for _, mc := range metricConfigs { + resolution = max(resolution, mc.Timestep) + } + if r.URL.Query().Get("all-metrics") == "true" { - data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context()) + data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context(), resolution) if err != nil { log.Warn("Error while loading job data") return @@ -606,7 +613,14 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { scopes = []schema.MetricScope{"node"} } - data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context()) + metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + resolution := 0 + + for _, mc := range metricConfigs { + resolution = max(resolution, mc.Timestep) + } + + data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context(), resolution) if err != nil { log.Warn("Error while loading job data") return @@ -1114,7 +1128,7 @@ func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) { } resolver := graph.GetResolverInstance() - data, err := resolver.Query().JobMetrics(r.Context(), id, metrics, scopes) + data, err := resolver.Query().JobMetrics(r.Context(), id, metrics, scopes, nil) if err != nil { json.NewEncoder(rw).Encode(Respone{ Error: &struct { diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index de84cf0..1c4a3ec 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -34,7 +34,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { scopes = append(scopes, schema.MetricScopeAccelerator) } - jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx) + jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 0) // 0 Resulotion-Value retrieves highest res (60s) if err != nil { log.Error("Error wile loading job data for archiving") return nil, err diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index 54c8e70..3fd3649 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -246,7 +246,7 @@ type ComplexityRoot struct { Clusters func(childComplexity int) int GlobalMetrics func(childComplexity int) int Job func(childComplexity int, id string) int - JobMetrics func(childComplexity int, id string, metrics []string, scopes []schema.MetricScope) int + JobMetrics func(childComplexity int, id string, metrics []string, scopes []schema.MetricScope, resolution *int) int Jobs func(childComplexity int, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) int JobsFootprints func(childComplexity int, filter []*model.JobFilter, metrics []string) int JobsStatistics func(childComplexity int, filter []*model.JobFilter, metrics []string, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate) int @@ -369,7 +369,7 @@ type QueryResolver interface { User(ctx context.Context, username string) (*model.User, error) AllocatedNodes(ctx context.Context, cluster string) ([]*model.Count, error) Job(ctx context.Context, id string) (*schema.Job, error) - JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobMetricWithName, error) + JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope, resolution *int) ([]*model.JobMetricWithName, error) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error) JobsStatistics(ctx context.Context, filter []*model.JobFilter, metrics []string, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) @@ -1291,7 +1291,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return 0, false } - return e.complexity.Query.JobMetrics(childComplexity, args["id"].(string), args["metrics"].([]string), args["scopes"].([]schema.MetricScope)), true + return e.complexity.Query.JobMetrics(childComplexity, args["id"].(string), args["metrics"].([]string), args["scopes"].([]schema.MetricScope), args["resolution"].(*int)), true case "Query.jobs": if e.complexity.Query.Jobs == nil { @@ -2068,7 +2068,7 @@ type Query { allocatedNodes(cluster: String!): [Count!]! job(id: ID!): Job - jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]! + jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!], resolution: Int): [JobMetricWithName!]! jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList! @@ -2388,6 +2388,15 @@ func (ec *executionContext) field_Query_jobMetrics_args(ctx context.Context, raw } } args["scopes"] = arg2 + var arg3 *int + if tmp, ok := rawArgs["resolution"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("resolution")) + arg3, err = ec.unmarshalOInt2áš–int(ctx, tmp) + if err != nil { + return nil, err + } + } + args["resolution"] = arg3 return args, nil } @@ -8527,7 +8536,7 @@ func (ec *executionContext) _Query_jobMetrics(ctx context.Context, field graphql }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return ec.resolvers.Query().JobMetrics(rctx, fc.Args["id"].(string), fc.Args["metrics"].([]string), fc.Args["scopes"].([]schema.MetricScope)) + return ec.resolvers.Query().JobMetrics(rctx, fc.Args["id"].(string), fc.Args["metrics"].([]string), fc.Args["scopes"].([]schema.MetricScope), fc.Args["resolution"].(*int)) }) if err != nil { ec.Error(ctx, err) diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 6fcb4a0..ef17e1d 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "slices" "strconv" "strings" "time" @@ -226,14 +227,19 @@ func (r *queryResolver) Job(ctx context.Context, id string) (*schema.Job, error) } // JobMetrics is the resolver for the jobMetrics field. -func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobMetricWithName, error) { +func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope, resolution *int) ([]*model.JobMetricWithName, error) { + if resolution == nil && config.Keys.EnableResampling != nil { + defaultRes := slices.Max(config.Keys.EnableResampling.Resolutions) + resolution = &defaultRes + } + job, err := r.Query().Job(ctx, id) if err != nil { log.Warn("Error while querying job for metrics") return nil, err } - data, err := metricDataDispatcher.LoadData(job, metrics, scopes, ctx) + data, err := metricDataDispatcher.LoadData(job, metrics, scopes, ctx, *resolution) if err != nil { log.Warn("Error while loading job data") return nil, err @@ -442,11 +448,9 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } // SubCluster returns generated.SubClusterResolver implementation. func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} } -type ( - clusterResolver struct{ *Resolver } - jobResolver struct{ *Resolver } - metricValueResolver struct{ *Resolver } - mutationResolver struct{ *Resolver } - queryResolver struct{ *Resolver } - subClusterResolver struct{ *Resolver } -) +type clusterResolver struct{ *Resolver } +type jobResolver struct{ *Resolver } +type metricValueResolver struct{ *Resolver } +type mutationResolver struct{ *Resolver } +type queryResolver struct{ *Resolver } +type subClusterResolver struct{ *Resolver } diff --git a/internal/graph/util.go b/internal/graph/util.go index 8296a02..c2bd73d 100644 --- a/internal/graph/util.go +++ b/internal/graph/util.go @@ -47,7 +47,14 @@ func (r *queryResolver) rooflineHeatmap( continue } - jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx) + // metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + // resolution := 0 + + // for _, mc := range metricConfigs { + // resolution = max(resolution, mc.Timestep) + // } + + jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0) if err != nil { log.Errorf("Error while loading roofline metrics for job %d", job.ID) return nil, err diff --git a/internal/metricDataDispatcher/dataLoader.go b/internal/metricDataDispatcher/dataLoader.go index 2c7cfa6..121fbf4 100644 --- a/internal/metricDataDispatcher/dataLoader.go +++ b/internal/metricDataDispatcher/dataLoader.go @@ -14,6 +14,7 @@ import ( "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/lrucache" + "github.com/ClusterCockpit/cc-backend/pkg/resampler" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) @@ -23,11 +24,12 @@ func cacheKey( job *schema.Job, metrics []string, scopes []schema.MetricScope, + resolution int, ) string { // Duration and StartTime do not need to be in the cache key as StartTime is less unique than // job.ID and the TTL of the cache entry makes sure it does not stay there forever. - return fmt.Sprintf("%d(%s):[%v],[%v]", - job.ID, job.State, metrics, scopes) + return fmt.Sprintf("%d(%s):[%v],[%v]-%d", + job.ID, job.State, metrics, scopes, resolution) } // Fetches the metric data for a job. @@ -35,8 +37,9 @@ func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, + resolution int, ) (schema.JobData, error) { - data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) { + data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ interface{}, ttl time.Duration, size int) { var jd schema.JobData var err error @@ -60,7 +63,7 @@ func LoadData(job *schema.Job, } } - jd, err = repo.LoadData(job, metrics, scopes, ctx) + jd, err = repo.LoadData(job, metrics, scopes, ctx, resolution) if err != nil { if len(jd) != 0 { log.Warnf("partial error: %s", err.Error()) @@ -72,12 +75,31 @@ func LoadData(job *schema.Job, } size = jd.Size() } else { - jd, err = archive.GetHandle().LoadJobData(job) + var jd_temp schema.JobData + jd_temp, err = archive.GetHandle().LoadJobData(job) if err != nil { log.Error("Error while loading job data from archive") return err, 0, 0 } + //Deep copy the cached archive hashmap + jd = metricdata.DeepCopy(jd_temp) + + //Resampling for archived data. + //Pass the resolution from frontend here. + for _, v := range jd { + for _, v_ := range v { + timestep := 0 + for i := 0; i < len(v_.Series); i += 1 { + v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, v_.Timestep, resolution) + if err != nil { + return err, 0, 0 + } + } + v_.Timestep = timestep + } + } + // Avoid sending unrequested data to the client: if metrics != nil || scopes != nil { if metrics == nil { @@ -117,6 +139,7 @@ func LoadData(job *schema.Job, } // FIXME: Review: Is this really necessary or correct. + // Note: Lines 142-170 formerly known as prepareJobData(jobData, scoeps) // For /monitoring/job/ and some other places, flops_any and mem_bw need // to be available at the scope 'node'. If a job has a lot of nodes, // statisticsSeries should be available so that a min/median/max Graph can be diff --git a/internal/metricdata/cc-metric-store.go b/internal/metricdata/cc-metric-store.go index e564db6..f2853e3 100644 --- a/internal/metricdata/cc-metric-store.go +++ b/internal/metricdata/cc-metric-store.go @@ -55,6 +55,7 @@ type ApiQuery struct { SubType *string `json:"subtype,omitempty"` Metric string `json:"metric"` Hostname string `json:"host"` + Resolution int `json:"resolution"` TypeIds []string `json:"type-ids,omitempty"` SubTypeIds []string `json:"subtype-ids,omitempty"` Aggregate bool `json:"aggreg"` @@ -66,13 +67,14 @@ type ApiQueryResponse struct { } type ApiMetricData struct { - Error *string `json:"error"` - Data []schema.Float `json:"data"` - From int64 `json:"from"` - To int64 `json:"to"` - Avg schema.Float `json:"avg"` - Min schema.Float `json:"min"` - Max schema.Float `json:"max"` + Error *string `json:"error"` + Data []schema.Float `json:"data"` + From int64 `json:"from"` + To int64 `json:"to"` + Resolution int `json:"resolution"` + Avg schema.Float `json:"avg"` + Min schema.Float `json:"min"` + Max schema.Float `json:"max"` } func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error { @@ -83,7 +85,7 @@ func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error { } ccms.url = config.Url - ccms.queryEndpoint = fmt.Sprintf("%s/api/query", config.Url) + ccms.queryEndpoint = fmt.Sprintf("%s/api/query/", config.Url) ccms.jwt = config.Token ccms.client = http.Client{ Timeout: 10 * time.Second, @@ -129,7 +131,7 @@ func (ccms *CCMetricStore) doRequest( return nil, err } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, ccms.queryEndpoint, buf) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, ccms.queryEndpoint, buf) if err != nil { log.Warn("Error while building request body") return nil, err @@ -138,6 +140,13 @@ func (ccms *CCMetricStore) doRequest( req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt)) } + // versioning the cc-metric-store query API. + // v2 = data with resampling + // v1 = data without resampling + q := req.URL.Query() + q.Add("version", "v2") + req.URL.RawQuery = q.Encode() + res, err := ccms.client.Do(req) if err != nil { log.Error("Error while performing request") @@ -162,8 +171,9 @@ func (ccms *CCMetricStore) LoadData( metrics []string, scopes []schema.MetricScope, ctx context.Context, + resolution int, ) (schema.JobData, error) { - queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes) + queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, resolution) if err != nil { log.Warn("Error while building queries") return nil, err @@ -195,11 +205,17 @@ func (ccms *CCMetricStore) LoadData( jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric) } + res := row[0].Resolution + if res == 0 { + res = mc.Timestep + } + jobMetric, ok := jobData[metric][scope] + if !ok { jobMetric = &schema.JobMetric{ Unit: mc.Unit, - Timestep: mc.Timestep, + Timestep: res, Series: make([]schema.Series, 0), } jobData[metric][scope] = jobMetric @@ -251,7 +267,6 @@ func (ccms *CCMetricStore) LoadData( /* Returns list for "partial errors" */ return jobData, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) } - return jobData, nil } @@ -267,6 +282,7 @@ func (ccms *CCMetricStore) buildQueries( job *schema.Job, metrics []string, scopes []schema.MetricScope, + resolution int, ) ([]ApiQuery, []schema.MetricScope, error) { queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) assignedScope := []schema.MetricScope{} @@ -318,11 +334,12 @@ func (ccms *CCMetricStore) buildQueries( } queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &acceleratorString, - TypeIds: host.Accelerators, + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: false, + Type: &acceleratorString, + TypeIds: host.Accelerators, + Resolution: resolution, }) assignedScope = append(assignedScope, schema.MetricScopeAccelerator) continue @@ -335,11 +352,12 @@ func (ccms *CCMetricStore) buildQueries( } queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &acceleratorString, - TypeIds: host.Accelerators, + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &acceleratorString, + TypeIds: host.Accelerators, + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -348,11 +366,12 @@ func (ccms *CCMetricStore) buildQueries( // HWThread -> HWThead if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &hwthreadString, - TypeIds: intToStringSlice(hwthreads), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: false, + Type: &hwthreadString, + TypeIds: intToStringSlice(hwthreads), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -363,11 +382,12 @@ func (ccms *CCMetricStore) buildQueries( cores, _ := topology.GetCoresFromHWThreads(hwthreads) for _, core := range cores { queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Core[core]), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(topology.Core[core]), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) } @@ -379,11 +399,12 @@ func (ccms *CCMetricStore) buildQueries( sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) for _, socket := range sockets { queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Socket[socket]), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(topology.Socket[socket]), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) } @@ -393,11 +414,12 @@ func (ccms *CCMetricStore) buildQueries( // HWThread -> Node if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(hwthreads), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(hwthreads), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -407,11 +429,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &coreString, - TypeIds: intToStringSlice(cores), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: false, + Type: &coreString, + TypeIds: intToStringSlice(cores), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -421,11 +444,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { cores, _ := topology.GetCoresFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(cores), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &coreString, + TypeIds: intToStringSlice(cores), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -435,11 +459,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: false, + Type: &memoryDomainString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -449,11 +474,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &memoryDomainString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -463,11 +489,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &socketString, - TypeIds: intToStringSlice(sockets), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: false, + Type: &socketString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -477,11 +504,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &socketString, - TypeIds: intToStringSlice(sockets), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &socketString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -490,8 +518,9 @@ func (ccms *CCMetricStore) buildQueries( // Node -> Node if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, + Metric: remoteName, + Hostname: host.Hostname, + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -510,7 +539,15 @@ func (ccms *CCMetricStore) LoadStats( metrics []string, ctx context.Context, ) (map[string]map[string]schema.MetricStatistics, error) { - queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}) // #166 Add scope shere for analysis view accelerator normalization? + + // metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + // resolution := 9000 + + // for _, mc := range metricConfigs { + // resolution = min(resolution, mc.Timestep) + // } + + queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization? if err != nil { log.Warn("Error while building query") return nil, err @@ -588,8 +625,9 @@ func (ccms *CCMetricStore) LoadNodeData( for _, node := range nodes { for _, metric := range metrics { req.Queries = append(req.Queries, ApiQuery{ - Hostname: node, - Metric: ccms.toRemoteName(metric), + Hostname: node, + Metric: ccms.toRemoteName(metric), + Resolution: 60, // Default for Node Queries }) } } @@ -597,7 +635,7 @@ func (ccms *CCMetricStore) LoadNodeData( resBody, err := ccms.doRequest(ctx, &req) if err != nil { - log.Error("Error while performing request") + log.Error(fmt.Sprintf("Error while performing request %#v\n", err)) return nil, err } diff --git a/internal/metricdata/influxdb-v2.go b/internal/metricdata/influxdb-v2.go index b95f07e..b416fa5 100644 --- a/internal/metricdata/influxdb-v2.go +++ b/internal/metricdata/influxdb-v2.go @@ -60,7 +60,8 @@ func (idb *InfluxDBv2DataRepository) LoadData( job *schema.Job, metrics []string, scopes []schema.MetricScope, - ctx context.Context) (schema.JobData, error) { + ctx context.Context, + resolution int) (schema.JobData, error) { measurementsConds := make([]string, 0, len(metrics)) for _, m := range metrics { diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index 68d8d32..354dd5f 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -21,7 +21,7 @@ type MetricDataRepository interface { Init(rawConfig json.RawMessage) error // Return the JobData for the given job, only with the requested metrics. - LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) + LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) // Return a map of metrics to a map of nodes to the metric statistics of the job. node scope assumed for now. LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) diff --git a/internal/metricdata/prometheus.go b/internal/metricdata/prometheus.go index a8d9f39..0611824 100644 --- a/internal/metricdata/prometheus.go +++ b/internal/metricdata/prometheus.go @@ -265,6 +265,7 @@ func (pdb *PrometheusDataRepository) LoadData( metrics []string, scopes []schema.MetricScope, ctx context.Context, + resolution int, ) (schema.JobData, error) { // TODO respect requested scope if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) { @@ -356,7 +357,7 @@ func (pdb *PrometheusDataRepository) LoadStats( // map of metrics of nodes of stats stats := map[string]map[string]schema.MetricStatistics{} - data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx) + data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/) if err != nil { log.Warn("Error while loading job for stats") return nil, err diff --git a/internal/metricdata/utils.go b/internal/metricdata/utils.go index 6d490fe..f480e40 100644 --- a/internal/metricdata/utils.go +++ b/internal/metricdata/utils.go @@ -12,7 +12,7 @@ import ( "github.com/ClusterCockpit/cc-backend/pkg/schema" ) -var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { +var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) { panic("TODO") } @@ -27,9 +27,10 @@ func (tmdr *TestMetricDataRepository) LoadData( job *schema.Job, metrics []string, scopes []schema.MetricScope, - ctx context.Context) (schema.JobData, error) { + ctx context.Context, + resolution int) (schema.JobData, error) { - return TestLoadDataCallback(job, metrics, scopes, ctx) + return TestLoadDataCallback(job, metrics, scopes, ctx, resolution) } func (tmdr *TestMetricDataRepository) LoadStats( @@ -48,3 +49,41 @@ func (tmdr *TestMetricDataRepository) LoadNodeData( panic("TODO") } + +func DeepCopy(jd_temp schema.JobData) schema.JobData { + var jd schema.JobData + + jd = make(schema.JobData, len(jd_temp)) + for k, v := range jd_temp { + jd[k] = make(map[schema.MetricScope]*schema.JobMetric, len(jd_temp[k])) + for k_, v_ := range v { + jd[k][k_] = new(schema.JobMetric) + jd[k][k_].Series = make([]schema.Series, len(v_.Series)) + for i := 0; i < len(v_.Series); i += 1 { + jd[k][k_].Series[i].Data = make([]schema.Float, len(v_.Series[i].Data)) + copy(jd[k][k_].Series[i].Data, v_.Series[i].Data) + jd[k][k_].Series[i].Hostname = v_.Series[i].Hostname + jd[k][k_].Series[i].Id = v_.Series[i].Id + jd[k][k_].Series[i].Statistics.Avg = v_.Series[i].Statistics.Avg + jd[k][k_].Series[i].Statistics.Min = v_.Series[i].Statistics.Min + jd[k][k_].Series[i].Statistics.Max = v_.Series[i].Statistics.Max + } + jd[k][k_].Timestep = v_.Timestep + jd[k][k_].Unit.Base = v_.Unit.Base + jd[k][k_].Unit.Prefix = v_.Unit.Prefix + if v_.StatisticsSeries != nil { + jd[k][k_].StatisticsSeries = new(schema.StatsSeries) + copy(jd[k][k_].StatisticsSeries.Max, v_.StatisticsSeries.Max) + copy(jd[k][k_].StatisticsSeries.Min, v_.StatisticsSeries.Min) + copy(jd[k][k_].StatisticsSeries.Median, v_.StatisticsSeries.Median) + copy(jd[k][k_].StatisticsSeries.Mean, v_.StatisticsSeries.Mean) + for k__, v__ := range v_.StatisticsSeries.Percentiles { + jd[k][k_].StatisticsSeries.Percentiles[k__] = v__ + } + } else { + jd[k][k_].StatisticsSeries = v_.StatisticsSeries + } + } + } + return jd +} diff --git a/internal/repository/stats.go b/internal/repository/stats.go index ca05ca3..ba7a8aa 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -77,8 +77,8 @@ func (r *JobRepository) buildStatsQuery( // fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType) if col != "" { - // Scan columns: id, totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours - query = sq.Select(col, "COUNT(job.id) as totalJobs", + // Scan columns: id, totalJobs, name, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours + query = sq.Select(col, "COUNT(job.id) as totalJobs", "name", fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s) as totalWalltime`, time.Now().Unix(), castType), fmt.Sprintf(`CAST(SUM(job.num_nodes) as %s) as totalNodes`, castType), fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as %s) as totalNodeHours`, time.Now().Unix(), castType), @@ -86,9 +86,9 @@ func (r *JobRepository) buildStatsQuery( fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as %s) as totalCoreHours`, time.Now().Unix(), castType), fmt.Sprintf(`CAST(SUM(job.num_acc) as %s) as totalAccs`, castType), fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s) as totalAccHours`, time.Now().Unix(), castType), - ).From("job").GroupBy(col) + ).From("job").Join("user ON user.username = job.user").GroupBy(col) } else { - // Scan columns: totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours + // Scan columns: totalJobs, name, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours query = sq.Select("COUNT(job.id)", fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s)`, time.Now().Unix(), castType), fmt.Sprintf(`CAST(SUM(job.num_nodes) as %s)`, castType), @@ -107,15 +107,15 @@ func (r *JobRepository) buildStatsQuery( return query } -func (r *JobRepository) getUserName(ctx context.Context, id string) string { - user := GetUserFromContext(ctx) - name, _ := r.FindColumnValue(user, id, "user", "name", "username", false) - if name != "" { - return name - } else { - return "-" - } -} +// func (r *JobRepository) getUserName(ctx context.Context, id string) string { +// user := GetUserFromContext(ctx) +// name, _ := r.FindColumnValue(user, id, "user", "name", "username", false) +// if name != "" { +// return name +// } else { +// return "-" +// } +// } func (r *JobRepository) getCastType() string { var castType string @@ -167,14 +167,20 @@ func (r *JobRepository) JobsStatsGrouped( for rows.Next() { var id sql.NullString + var name sql.NullString var jobs, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64 - if err := rows.Scan(&id, &jobs, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil { + if err := rows.Scan(&id, &jobs, &name, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil { log.Warn("Error while scanning rows") return nil, err } if id.Valid { var totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours int + var personName string + + if name.Valid { + personName = name.String + } if jobs.Valid { totalJobs = int(jobs.Int64) @@ -205,11 +211,11 @@ func (r *JobRepository) JobsStatsGrouped( } if col == "job.user" { - name := r.getUserName(ctx, id.String) + // name := r.getUserName(ctx, id.String) stats = append(stats, &model.JobsStatistics{ ID: id.String, - Name: name, + Name: personName, TotalJobs: totalJobs, TotalWalltime: totalWalltime, TotalNodes: totalNodes, diff --git a/internal/routerConfig/routes.go b/internal/routerConfig/routes.go index eff4ade..1dfdfc2 100644 --- a/internal/routerConfig/routes.go +++ b/internal/routerConfig/routes.go @@ -13,6 +13,7 @@ import ( "strings" "time" + "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/util" @@ -273,12 +274,13 @@ func SetupRoutes(router *mux.Router, buildInfo web.Build) { availableRoles, _ := schema.GetValidRolesMap(user) page := web.Page{ - Title: title, - User: *user, - Roles: availableRoles, - Build: buildInfo, - Config: conf, - Infos: infos, + Title: title, + User: *user, + Roles: availableRoles, + Build: buildInfo, + Config: conf, + Resampling: config.Keys.EnableResampling, + Infos: infos, } if route.Filter { diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 55f4cef..060e332 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -47,8 +47,8 @@ func RegisterFootprintWorker() { scopes = append(scopes, schema.MetricScopeAccelerator) for _, job := range jobs { - log.Debugf("Try job %d", job.JobID) - jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, context.Background()) + // log.Debugf("Try job %d", job.JobID) + jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, context.Background(), 0) // 0 Resolution-Value retrieves highest res if err != nil { log.Errorf("Error wile loading job data for footprint update: %v", err) continue diff --git a/pkg/archive/json.go b/pkg/archive/json.go index ff2c6d9..1219658 100644 --- a/pkg/archive/json.go +++ b/pkg/archive/json.go @@ -9,8 +9,8 @@ import ( "io" "time" - "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" ) func DecodeJobData(r io.Reader, k string) (schema.JobData, error) { diff --git a/pkg/resampler/resampler.go b/pkg/resampler/resampler.go new file mode 100644 index 0000000..26cead0 --- /dev/null +++ b/pkg/resampler/resampler.go @@ -0,0 +1,123 @@ +package resampler + +import ( + "errors" + "fmt" + "math" + + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +func SimpleResampler(data []schema.Float, old_frequency int64, new_frequency int64) ([]schema.Float, error) { + if old_frequency == 0 || new_frequency == 0 { + return nil, errors.New("either old or new frequency is set to 0") + } + + if new_frequency%old_frequency != 0 { + return nil, errors.New("new sampling frequency should be multiple of the old frequency") + } + + var step int = int(new_frequency / old_frequency) + var new_data_length = len(data) / step + + if new_data_length == 0 || len(data) < 100 || new_data_length >= len(data) { + return data, nil + } + + new_data := make([]schema.Float, new_data_length) + + for i := 0; i < new_data_length; i++ { + new_data[i] = data[i*step] + } + + return new_data, nil +} + +// Inspired by one of the algorithms from https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf +// Adapted from https://github.com/haoel/downsampling/blob/master/core/lttb.go +func LargestTriangleThreeBucket(data []schema.Float, old_frequency int, new_frequency int) ([]schema.Float, int, error) { + + if old_frequency == 0 || new_frequency == 0 { + return data, old_frequency, nil + } + + if new_frequency%old_frequency != 0 { + return nil, 0, errors.New(fmt.Sprintf("new sampling frequency : %d should be multiple of the old frequency : %d", new_frequency, old_frequency)) + } + + var step int = int(new_frequency / old_frequency) + var new_data_length = len(data) / step + + if new_data_length == 0 || len(data) < 100 || new_data_length >= len(data) { + return data, old_frequency, nil + } + + new_data := make([]schema.Float, 0, new_data_length) + + // Bucket size. Leave room for start and end data points + bucketSize := float64(len(data)-2) / float64(new_data_length-2) + + new_data = append(new_data, data[0]) // Always add the first point + + // We have 3 pointers represent for + // > bucketLow - the current bucket's beginning location + // > bucketMiddle - the current bucket's ending location, + // also the beginning location of next bucket + // > bucketHight - the next bucket's ending location. + bucketLow := 1 + bucketMiddle := int(math.Floor(bucketSize)) + 1 + + var prevMaxAreaPoint int + + for i := 0; i < new_data_length-2; i++ { + + bucketHigh := int(math.Floor(float64(i+2)*bucketSize)) + 1 + if bucketHigh >= len(data)-1 { + bucketHigh = len(data) - 2 + } + + // Calculate point average for next bucket (containing c) + avgPointX, avgPointY := calculateAverageDataPoint(data[bucketMiddle:bucketHigh+1], int64(bucketMiddle)) + + // Get the range for current bucket + currBucketStart := bucketLow + currBucketEnd := bucketMiddle + + // Point a + pointX := prevMaxAreaPoint + pointY := data[prevMaxAreaPoint] + + maxArea := -1.0 + + var maxAreaPoint int + flag_ := 0 + for ; currBucketStart < currBucketEnd; currBucketStart++ { + + area := calculateTriangleArea(schema.Float(pointX), pointY, avgPointX, avgPointY, schema.Float(currBucketStart), data[currBucketStart]) + if area > maxArea { + maxArea = area + maxAreaPoint = currBucketStart + } + if math.IsNaN(float64(avgPointY)) { + flag_ = 1 + + } + } + + if flag_ == 1 { + new_data = append(new_data, schema.NaN) // Pick this point from the bucket + + } else { + new_data = append(new_data, data[maxAreaPoint]) // Pick this point from the bucket + } + prevMaxAreaPoint = maxAreaPoint // This MaxArea point is the next's prevMAxAreaPoint + + //move to the next window + bucketLow = bucketMiddle + bucketMiddle = bucketHigh + } + + new_data = append(new_data, data[len(data)-1]) // Always add last + + return new_data, new_frequency, nil +} diff --git a/pkg/resampler/util.go b/pkg/resampler/util.go new file mode 100644 index 0000000..36d8bed --- /dev/null +++ b/pkg/resampler/util.go @@ -0,0 +1,35 @@ +package resampler + +import ( + "math" + + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +func calculateTriangleArea(paX, paY, pbX, pbY, pcX, pcY schema.Float) float64 { + area := ((paX-pcX)*(pbY-paY) - (paX-pbX)*(pcY-paY)) * 0.5 + return math.Abs(float64(area)) +} + +func calculateAverageDataPoint(points []schema.Float, xStart int64) (avgX schema.Float, avgY schema.Float) { + flag := 0 + for _, point := range points { + avgX += schema.Float(xStart) + avgY += point + xStart++ + if math.IsNaN(float64(point)) { + flag = 1 + } + } + + l := schema.Float(len(points)) + + avgX /= l + avgY /= l + + if flag == 1 { + return avgX, schema.NaN + } else { + return avgX, avgY + } +} diff --git a/pkg/schema/config.go b/pkg/schema/config.go index 28fa53a..e2cb28c 100644 --- a/pkg/schema/config.go +++ b/pkg/schema/config.go @@ -76,6 +76,13 @@ type Retention struct { IncludeDB bool `json:"includeDB"` } +type ResampleConfig struct { + // Trigger next zoom level at less than this many visible datapoints + Trigger int `json:"trigger"` + // Array of resampling target resolutions, in seconds; Example: [600,300,60] + Resolutions []int `json:"resolutions"` +} + // Format of the configuration (file). See below for the defaults. type ProgramConfig struct { // Address where the http (or https) server will listen on (for example: 'localhost:80'). @@ -133,6 +140,9 @@ type ProgramConfig struct { // be provided! Most options here can be overwritten by the user. UiDefaults map[string]interface{} `json:"ui-defaults"` + // If exists, will enable dynamic zoom in frontend metric plots using the configured values + EnableResampling *ResampleConfig `json:"enable-resampling"` + // Where to store MachineState files MachineStateDir string `json:"machine-state-dir"` diff --git a/pkg/schema/schemas/config.schema.json b/pkg/schema/schemas/config.schema.json index ee64b5a..cc6c553 100644 --- a/pkg/schema/schemas/config.schema.json +++ b/pkg/schema/schemas/config.schema.json @@ -424,6 +424,27 @@ "plot_general_colorscheme", "plot_list_selectedMetrics" ] + }, + "enable-resampling": { + "description": "Enable dynamic zoom in frontend metric plots.", + "type": "object", + "properties": { + "trigger": { + "description": "Trigger next zoom level at less than this many visible datapoints.", + "type": "integer" + }, + "resolutions": { + "description": "Array of resampling target resolutions, in seconds.", + "type": "array", + "items": { + "type": "integer" + } + } + }, + "required": [ + "trigger", + "resolutions" + ] } }, "required": [ diff --git a/web/frontend/src/Header.svelte b/web/frontend/src/Header.svelte index 9150ed0..de5159a 100644 --- a/web/frontend/src/Header.svelte +++ b/web/frontend/src/Header.svelte @@ -48,6 +48,7 @@ href: `/monitoring/user/${username}`, icon: "bar-chart-line-fill", perCluster: false, + listOptions: false, menu: "none", }, { @@ -56,6 +57,7 @@ href: `/monitoring/jobs/`, icon: "card-list", perCluster: false, + listOptions: false, menu: "none", }, { @@ -63,7 +65,8 @@ requiredRole: roles.manager, href: "/monitoring/users/", icon: "people-fill", - perCluster: false, + perCluster: true, + listOptions: true, menu: "Groups", }, { @@ -71,7 +74,8 @@ requiredRole: roles.support, href: "/monitoring/projects/", icon: "folder", - perCluster: false, + perCluster: true, + listOptions: true, menu: "Groups", }, { @@ -80,6 +84,7 @@ href: "/monitoring/tags/", icon: "tags", perCluster: false, + listOptions: false, menu: "Groups", }, { @@ -88,6 +93,7 @@ href: "/monitoring/analysis/", icon: "graph-up", perCluster: true, + listOptions: false, menu: "Stats", }, { @@ -96,6 +102,7 @@ href: "/monitoring/systems/", icon: "cpu", perCluster: true, + listOptions: false, menu: "Groups", }, { @@ -104,6 +111,7 @@ href: "/monitoring/status/", icon: "cpu", perCluster: true, + listOptions: false, menu: "Stats", }, ]; diff --git a/web/frontend/src/Job.root.svelte b/web/frontend/src/Job.root.svelte index 3112c4f..73dd158 100644 --- a/web/frontend/src/Job.root.svelte +++ b/web/frontend/src/Job.root.svelte @@ -56,7 +56,8 @@ selectedScopes = []; let plots = {}, - roofWidth + roofWidth, + statsTable let missingMetrics = [], missingHosts = [], @@ -119,15 +120,6 @@ variables: { dbid, selectedMetrics, selectedScopes }, }); - function loadAllScopes() { - selectedScopes = [...selectedScopes, "socket", "core"] - jobMetrics = queryStore({ - client: client, - query: query, - variables: { dbid, selectedMetrics, selectedScopes}, - }); - } - // Handle Job Query on Init -> is not executed anymore getContext("on-init")(() => { let job = $initq.data.job; @@ -352,7 +344,7 @@ {#if item.data} statsTable.moreLoaded(detail)} job={$initq.data.job} metricName={item.metric} metricUnit={$initq.data.globalMetrics.find((gm) => gm.name == item.metric)?.unit} @@ -418,6 +410,7 @@ {#if $jobMetrics?.data?.jobMetrics} {#key $jobMetrics.data.jobMetrics} diff --git a/web/frontend/src/Node.root.svelte b/web/frontend/src/Node.root.svelte index 2d58540..c8b0d78 100644 --- a/web/frontend/src/Node.root.svelte +++ b/web/frontend/src/Node.root.svelte @@ -90,11 +90,10 @@ }, }); - let itemsPerPage = ccconfig.plot_list_jobsPerPage; - let page = 1; - let paging = { itemsPerPage, page }; - let sorting = { field: "startTime", type: "col", order: "DESC" }; - $: filter = [ + + const paging = { itemsPerPage: 50, page: 1 }; + const sorting = { field: "startTime", type: "col", order: "DESC" }; + const filter = [ { cluster: { eq: cluster } }, { node: { contains: hostname } }, { state: ["running"] }, @@ -207,7 +206,6 @@ cluster={clusters.find((c) => c.name == cluster)} subCluster={$nodeMetricsData.data.nodeMetrics[0].subCluster} series={item.metric.series} - resources={[{ hostname: hostname }]} forNode={true} /> {:else if item.disabled === true && item.metric} diff --git a/web/frontend/src/Systems.root.svelte b/web/frontend/src/Systems.root.svelte index c483401..0d5e70e 100644 --- a/web/frontend/src/Systems.root.svelte +++ b/web/frontend/src/Systems.root.svelte @@ -206,7 +206,6 @@ metric={item.data.name} cluster={clusters.find((c) => c.name == cluster)} subCluster={item.subCluster} - resources={[{ hostname: item.host }]} forNode={true} /> {:else if item.disabled === true && item.data} diff --git a/web/frontend/src/config.entrypoint.js b/web/frontend/src/config.entrypoint.js index 2978c8c..345056b 100644 --- a/web/frontend/src/config.entrypoint.js +++ b/web/frontend/src/config.entrypoint.js @@ -9,6 +9,7 @@ new Config({ username: username }, context: new Map([ - ['cc-config', clusterCockpitConfig] + ['cc-config', clusterCockpitConfig], + ['resampling', resampleConfig] ]) }) diff --git a/web/frontend/src/config/AdminSettings.svelte b/web/frontend/src/config/AdminSettings.svelte index d959c3b..9d3abf2 100644 --- a/web/frontend/src/config/AdminSettings.svelte +++ b/web/frontend/src/config/AdminSettings.svelte @@ -51,7 +51,5 @@ - - - + diff --git a/web/frontend/src/config/admin/Options.svelte b/web/frontend/src/config/admin/Options.svelte index 2a4e11c..a1fe307 100644 --- a/web/frontend/src/config/admin/Options.svelte +++ b/web/frontend/src/config/admin/Options.svelte @@ -3,11 +3,13 @@ --> - - - Scramble Names / Presentation Mode - - Active? - - + + + + Scramble Names / Presentation Mode + + Active? + + + + +{#if resampleConfig} + + + + Metric Plot Resampling +

Triggered at {resampleConfig.trigger} datapoints.

+

Configured resolutions: {resampleConfig.resolutions}

+
+
+ +{/if} diff --git a/web/frontend/src/generic/joblist/JobListRow.svelte b/web/frontend/src/generic/joblist/JobListRow.svelte index 1d8529e..b1e1511 100644 --- a/web/frontend/src/generic/joblist/JobListRow.svelte +++ b/web/frontend/src/generic/joblist/JobListRow.svelte @@ -26,18 +26,23 @@ export let showFootprint; export let triggerMetricRefresh = false; + const resampleConfig = getContext("resampling") || null; + const resampleDefault = resampleConfig ? Math.max(...resampleConfig.resolutions) : 0; + let { id } = job; let scopes = job.numNodes == 1 ? job.numAcc >= 1 ? ["core", "accelerator"] : ["core"] : ["node"]; + let selectedResolution = resampleDefault; + let zoomStates = {}; const cluster = getContext("clusters").find((c) => c.name == job.cluster); const client = getContextClient(); const query = gql` - query ($id: ID!, $metrics: [String!]!, $scopes: [MetricScope!]!) { - jobMetrics(id: $id, metrics: $metrics, scopes: $scopes) { + query ($id: ID!, $metrics: [String!]!, $scopes: [MetricScope!]!, $selectedResolution: Int) { + jobMetrics(id: $id, metrics: $metrics, scopes: $scopes, resolution: $selectedResolution) { name scope metric { @@ -66,17 +71,30 @@ } `; + function handleZoom(detail, metric) { + if ( // States have to differ, causes deathloop if just set + (zoomStates[metric]?.x?.min !== detail?.lastZoomState?.x?.min) && + (zoomStates[metric]?.y?.max !== detail?.lastZoomState?.y?.max) + ) { + zoomStates[metric] = {...detail.lastZoomState} + } + + if (detail?.newRes) { // Triggers GQL + selectedResolution = detail.newRes + } + } + $: metricsQuery = queryStore({ client: client, query: query, - variables: { id, metrics, scopes }, + variables: { id, metrics, scopes, selectedResolution }, }); function refreshMetrics() { metricsQuery = queryStore({ client: client, query: query, - variables: { id, metrics, scopes }, + variables: { id, metrics, scopes, selectedResolution }, // requestPolicy: 'network-only' // use default cache-first for refresh }); } @@ -159,6 +177,7 @@ {#if metric.disabled == false && metric.data} { handleZoom(detail, metric.data.name) }} width={plotWidth} height={plotHeight} timestep={metric.data.metric.timestep} @@ -169,9 +188,9 @@ {cluster} subCluster={job.subCluster} isShared={job.exclusive != 1} - resources={job.resources} numhwthreads={job.numHWThreads} numaccs={job.numAcc} + zoomState={zoomStates[metric.data.name] || null} /> {:else if metric.disabled == true && metric.data} {#each links as item} - {#if !item.perCluster} + {#if item.listOptions} + + + + {item.title} + + + + All Clusters + + + {#each clusters as cluster} + + + {cluster.name} + + + + Running Jobs + + + + {/each} + + + {:else if !item.perCluster} {item.title} diff --git a/web/frontend/src/job.entrypoint.js b/web/frontend/src/job.entrypoint.js index f810955..16714a5 100644 --- a/web/frontend/src/job.entrypoint.js +++ b/web/frontend/src/job.entrypoint.js @@ -10,6 +10,7 @@ new Job({ roles: roles }, context: new Map([ - ['cc-config', clusterCockpitConfig] + ['cc-config', clusterCockpitConfig], + ['resampling', resampleConfig] ]) }) diff --git a/web/frontend/src/job/Metric.svelte b/web/frontend/src/job/Metric.svelte index 88c6da8..89b9d30 100644 --- a/web/frontend/src/job/Metric.svelte +++ b/web/frontend/src/job/Metric.svelte @@ -13,14 +13,24 @@ --> @@ -65,13 +175,13 @@ {metricName} ({unit}) @@ -85,13 +195,13 @@ {/if} {#key series} - {#if fetching == true} + {#if $metricData?.fetching == true} {:else if error != null} {error.message} {:else if series != null && !patternMatches} { handleZoom(detail) }} {width} height={300} cluster={job.cluster} @@ -101,11 +211,11 @@ metric={metricName} {series} {isShared} - resources={job.resources} + {zoomState} /> {:else if statsSeries[selectedScopeIndex] != null && patternMatches} { handleZoom(detail) }} {width} height={300} cluster={job.cluster} @@ -115,7 +225,7 @@ metric={metricName} {series} {isShared} - resources={job.resources} + {zoomState} statisticsSeries={statsSeries[selectedScopeIndex]} useStatsSeries={!!statsSeries[selectedScopeIndex]} /> diff --git a/web/frontend/src/job/StatsTable.svelte b/web/frontend/src/job/StatsTable.svelte index 26b0b44..0d64112 100644 --- a/web/frontend/src/job/StatsTable.svelte +++ b/web/frontend/src/job/StatsTable.svelte @@ -4,6 +4,9 @@ Properties: - `job Object`: The job object - `jobMetrics [Object]`: The jobs metricdata + + Exported: + - `moreLoaded`: Adds additional scopes requested from Metric.svelte in Job-View --> diff --git a/web/frontend/src/jobs.entrypoint.js b/web/frontend/src/jobs.entrypoint.js index f976b4a..9e98d0d 100644 --- a/web/frontend/src/jobs.entrypoint.js +++ b/web/frontend/src/jobs.entrypoint.js @@ -9,6 +9,7 @@ new Jobs({ roles: roles }, context: new Map([ - ['cc-config', clusterCockpitConfig] + ['cc-config', clusterCockpitConfig], + ['resampling', resampleConfig] ]) }) diff --git a/web/frontend/src/user.entrypoint.js b/web/frontend/src/user.entrypoint.js index 0bff82a..16c616d 100644 --- a/web/frontend/src/user.entrypoint.js +++ b/web/frontend/src/user.entrypoint.js @@ -8,6 +8,7 @@ new User({ user: userInfos }, context: new Map([ - ['cc-config', clusterCockpitConfig] + ['cc-config', clusterCockpitConfig], + ['resampling', resampleConfig] ]) }) diff --git a/web/templates/config.tmpl b/web/templates/config.tmpl index 9f3f3be..7993c3e 100644 --- a/web/templates/config.tmpl +++ b/web/templates/config.tmpl @@ -12,6 +12,7 @@ const username = {{ .User.Username }}; const filterPresets = {{ .FilterPresets }}; const clusterCockpitConfig = {{ .Config }}; + const resampleConfig = {{ .Resampling }}; {{end}} \ No newline at end of file diff --git a/web/templates/monitoring/job.tmpl b/web/templates/monitoring/job.tmpl index 9d071b0..92365b3 100644 --- a/web/templates/monitoring/job.tmpl +++ b/web/templates/monitoring/job.tmpl @@ -14,6 +14,7 @@ const username = {{ .User.Username }}; const authlevel = {{ .User.GetAuthLevel }}; const roles = {{ .Roles }}; + const resampleConfig = {{ .Resampling }}; {{end}} diff --git a/web/templates/monitoring/jobs.tmpl b/web/templates/monitoring/jobs.tmpl index 6ea05d5..4248471 100644 --- a/web/templates/monitoring/jobs.tmpl +++ b/web/templates/monitoring/jobs.tmpl @@ -12,6 +12,7 @@ const clusterCockpitConfig = {{ .Config }}; const authlevel = {{ .User.GetAuthLevel }}; const roles = {{ .Roles }}; + const resampleConfig = {{ .Resampling }}; {{end}} diff --git a/web/templates/monitoring/user.tmpl b/web/templates/monitoring/user.tmpl index 693ae61..8b4cf44 100644 --- a/web/templates/monitoring/user.tmpl +++ b/web/templates/monitoring/user.tmpl @@ -10,6 +10,7 @@ const userInfos = {{ .Infos }}; const filterPresets = {{ .FilterPresets }}; const clusterCockpitConfig = {{ .Config }}; + const resampleConfig = {{ .Resampling }}; {{end}} diff --git a/web/web.go b/web/web.go index 99008b5..45ca9e3 100644 --- a/web/web.go +++ b/web/web.go @@ -98,6 +98,7 @@ type Page struct { FilterPresets map[string]interface{} // For pages with the Filter component, this can be used to set initial filters. Infos map[string]interface{} // For generic use (e.g. username for /monitoring/user/, job id for /monitoring/job/) Config map[string]interface{} // UI settings for the currently logged in user (e.g. line width, ...) + Resampling *schema.ResampleConfig // If not nil, defines resampling trigger and resolutions } func RenderTemplate(rw http.ResponseWriter, file string, page *Page) {