mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-21 07:17:30 +01:00
Replace explicit resampling config with policy based approach
Entire-Checkpoint: f69e38210bb1
This commit is contained in:
@@ -112,6 +112,8 @@ type ResampleConfig struct {
|
||||
Resolutions []int `json:"resolutions"`
|
||||
// Trigger next zoom level at less than this many visible datapoints
|
||||
Trigger int `json:"trigger"`
|
||||
// Policy-derived target point count (set dynamically from user preference, not from config.json)
|
||||
TargetPoints int `json:"targetPoints,omitempty"`
|
||||
}
|
||||
|
||||
type NATSConfig struct {
|
||||
|
||||
113
internal/graph/resample.go
Normal file
113
internal/graph/resample.go
Normal file
@@ -0,0 +1,113 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved. This file is part of cc-backend.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package graph
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
)
|
||||
|
||||
// resolveResolutionFromPolicy reads the user's resample policy preference and
|
||||
// computes a resolution based on job duration and metric frequency. Returns nil
|
||||
// if the user has no policy set.
|
||||
func resolveResolutionFromPolicy(ctx context.Context, duration int64, cluster string, metrics []string) *int {
|
||||
user := repository.GetUserFromContext(ctx)
|
||||
if user == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
conf, err := repository.GetUserCfgRepo().GetUIConfig(user)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
policyVal, ok := conf["plotConfiguration_resamplePolicy"]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
policyStr, ok := policyVal.(string)
|
||||
if !ok || policyStr == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
policy := metricdispatch.ResamplePolicy(policyStr)
|
||||
targetPoints := metricdispatch.TargetPointsForPolicy(policy)
|
||||
if targetPoints == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Find the smallest metric frequency across the requested metrics
|
||||
frequency := smallestFrequency(cluster, metrics)
|
||||
if frequency <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
res := metricdispatch.ComputeResolution(duration, int64(frequency), targetPoints)
|
||||
return &res
|
||||
}
|
||||
|
||||
// resolveResampleAlgo returns the resampling algorithm name to use, checking
|
||||
// the explicit GraphQL parameter first, then the user's preference.
|
||||
func resolveResampleAlgo(ctx context.Context, resampleAlgo *model.ResampleAlgo) string {
|
||||
if resampleAlgo != nil {
|
||||
return strings.ToLower(resampleAlgo.String())
|
||||
}
|
||||
|
||||
user := repository.GetUserFromContext(ctx)
|
||||
if user == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
conf, err := repository.GetUserCfgRepo().GetUIConfig(user)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
algoVal, ok := conf["plotConfiguration_resampleAlgo"]
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
algoStr, ok := algoVal.(string)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
|
||||
return algoStr
|
||||
}
|
||||
|
||||
// smallestFrequency returns the smallest metric timestep (in seconds) among the
|
||||
// requested metrics for the given cluster. Falls back to 0 if nothing is found.
|
||||
func smallestFrequency(cluster string, metrics []string) int {
|
||||
cl := archive.GetCluster(cluster)
|
||||
if cl == nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
minFreq := 0
|
||||
for _, mc := range cl.MetricConfig {
|
||||
if len(metrics) > 0 {
|
||||
found := false
|
||||
for _, m := range metrics {
|
||||
if mc.Name == m {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if minFreq == 0 || mc.Timestep < minFreq {
|
||||
minFreq = mc.Timestep
|
||||
}
|
||||
}
|
||||
|
||||
return minFreq
|
||||
}
|
||||
@@ -499,26 +499,27 @@ func (r *queryResolver) Job(ctx context.Context, id string) (*schema.Job, error)
|
||||
|
||||
// JobMetrics is the resolver for the jobMetrics field.
|
||||
func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope, resolution *int, resampleAlgo *model.ResampleAlgo) ([]*model.JobMetricWithName, error) {
|
||||
if resolution == nil { // Load from Config
|
||||
if config.Keys.EnableResampling != nil {
|
||||
defaultRes := slices.Max(config.Keys.EnableResampling.Resolutions)
|
||||
resolution = &defaultRes
|
||||
} else { // Set 0 (Loads configured metric timestep)
|
||||
defaultRes := 0
|
||||
resolution = &defaultRes
|
||||
}
|
||||
}
|
||||
|
||||
job, err := r.Query().Job(ctx, id)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while querying job for metrics")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
algoName := ""
|
||||
if resampleAlgo != nil {
|
||||
algoName = strings.ToLower(resampleAlgo.String())
|
||||
// Resolve resolution: explicit param > user policy > global config > 0
|
||||
if resolution == nil {
|
||||
resolution = resolveResolutionFromPolicy(ctx, int64(job.Duration), job.Cluster, metrics)
|
||||
}
|
||||
if resolution == nil {
|
||||
if config.Keys.EnableResampling != nil {
|
||||
defaultRes := slices.Max(config.Keys.EnableResampling.Resolutions)
|
||||
resolution = &defaultRes
|
||||
} else {
|
||||
defaultRes := 0
|
||||
resolution = &defaultRes
|
||||
}
|
||||
}
|
||||
|
||||
algoName := resolveResampleAlgo(ctx, resampleAlgo)
|
||||
|
||||
data, err := metricdispatch.LoadData(job, metrics, scopes, ctx, *resolution, algoName)
|
||||
if err != nil {
|
||||
@@ -878,11 +879,16 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes [
|
||||
|
||||
// NodeMetricsList is the resolver for the nodeMetricsList field.
|
||||
func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, subCluster string, stateFilter string, nodeFilter string, scopes []schema.MetricScope, metrics []string, from time.Time, to time.Time, page *model.PageRequest, resolution *int, resampleAlgo *model.ResampleAlgo) (*model.NodesResultList, error) {
|
||||
if resolution == nil { // Load from Config
|
||||
// Resolve resolution: explicit param > user policy > global config > 0
|
||||
duration := int64(to.Sub(from).Seconds())
|
||||
if resolution == nil {
|
||||
resolution = resolveResolutionFromPolicy(ctx, duration, cluster, metrics)
|
||||
}
|
||||
if resolution == nil {
|
||||
if config.Keys.EnableResampling != nil {
|
||||
defaultRes := slices.Max(config.Keys.EnableResampling.Resolutions)
|
||||
resolution = &defaultRes
|
||||
} else { // Set 0 (Loads configured metric timestep)
|
||||
} else {
|
||||
defaultRes := 0
|
||||
resolution = &defaultRes
|
||||
}
|
||||
@@ -906,10 +912,7 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub
|
||||
}
|
||||
}
|
||||
|
||||
algoName := ""
|
||||
if resampleAlgo != nil {
|
||||
algoName = strings.ToLower(resampleAlgo.String())
|
||||
}
|
||||
algoName := resolveResampleAlgo(ctx, resampleAlgo)
|
||||
|
||||
// data -> map hostname:jobdata
|
||||
data, err := metricdispatch.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, *resolution, from, to, ctx, algoName)
|
||||
|
||||
49
internal/metricdispatch/resamplepolicy.go
Normal file
49
internal/metricdispatch/resamplepolicy.go
Normal file
@@ -0,0 +1,49 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved. This file is part of cc-backend.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package metricdispatch
|
||||
|
||||
import "math"
|
||||
|
||||
type ResamplePolicy string
|
||||
|
||||
const (
|
||||
ResamplePolicyLow ResamplePolicy = "low"
|
||||
ResamplePolicyMedium ResamplePolicy = "medium"
|
||||
ResamplePolicyHigh ResamplePolicy = "high"
|
||||
)
|
||||
|
||||
// TargetPointsForPolicy returns the target number of data points for a given policy.
|
||||
func TargetPointsForPolicy(policy ResamplePolicy) int {
|
||||
switch policy {
|
||||
case ResamplePolicyLow:
|
||||
return 200
|
||||
case ResamplePolicyMedium:
|
||||
return 500
|
||||
case ResamplePolicyHigh:
|
||||
return 1000
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// ComputeResolution computes the resampling resolution in seconds for a given
|
||||
// job duration, metric frequency, and target point count. Returns 0 if the
|
||||
// total number of data points is already at or below targetPoints (no resampling needed).
|
||||
func ComputeResolution(duration int64, frequency int64, targetPoints int) int {
|
||||
if frequency <= 0 || targetPoints <= 0 || duration <= 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
totalPoints := duration / frequency
|
||||
if totalPoints <= int64(targetPoints) {
|
||||
return 0
|
||||
}
|
||||
|
||||
targetRes := math.Ceil(float64(duration) / float64(targetPoints))
|
||||
// Round up to nearest multiple of frequency
|
||||
resolution := int(math.Ceil(targetRes/float64(frequency))) * int(frequency)
|
||||
|
||||
return resolution
|
||||
}
|
||||
68
internal/metricdispatch/resamplepolicy_test.go
Normal file
68
internal/metricdispatch/resamplepolicy_test.go
Normal file
@@ -0,0 +1,68 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved. This file is part of cc-backend.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package metricdispatch
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestTargetPointsForPolicy(t *testing.T) {
|
||||
tests := []struct {
|
||||
policy ResamplePolicy
|
||||
want int
|
||||
}{
|
||||
{ResamplePolicyLow, 200},
|
||||
{ResamplePolicyMedium, 500},
|
||||
{ResamplePolicyHigh, 1000},
|
||||
{ResamplePolicy("unknown"), 0},
|
||||
{ResamplePolicy(""), 0},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
if got := TargetPointsForPolicy(tt.policy); got != tt.want {
|
||||
t.Errorf("TargetPointsForPolicy(%q) = %d, want %d", tt.policy, got, tt.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestComputeResolution(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
duration int64
|
||||
frequency int64
|
||||
targetPoints int
|
||||
want int
|
||||
}{
|
||||
// 24h job, 60s frequency, 1440 total points
|
||||
{"low_24h_60s", 86400, 60, 200, 480},
|
||||
{"medium_24h_60s", 86400, 60, 500, 180},
|
||||
{"high_24h_60s", 86400, 60, 1000, 120},
|
||||
|
||||
// 2h job, 60s frequency, 120 total points — no resampling needed
|
||||
{"low_2h_60s", 7200, 60, 200, 0},
|
||||
{"medium_2h_60s", 7200, 60, 500, 0},
|
||||
{"high_2h_60s", 7200, 60, 1000, 0},
|
||||
|
||||
// Edge: zero/negative inputs
|
||||
{"zero_duration", 0, 60, 200, 0},
|
||||
{"zero_frequency", 86400, 0, 200, 0},
|
||||
{"zero_target", 86400, 60, 0, 0},
|
||||
{"negative_duration", -100, 60, 200, 0},
|
||||
|
||||
// 12h job, 30s frequency, 1440 total points
|
||||
{"medium_12h_30s", 43200, 30, 500, 90},
|
||||
|
||||
// Exact fit: total points == target points
|
||||
{"exact_fit", 12000, 60, 200, 0},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := ComputeResolution(tt.duration, tt.frequency, tt.targetPoints)
|
||||
if got != tt.want {
|
||||
t.Errorf("ComputeResolution(%d, %d, %d) = %d, want %d",
|
||||
tt.duration, tt.frequency, tt.targetPoints, got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/web"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
@@ -493,13 +494,15 @@ func SetupRoutes(router chi.Router, buildInfo web.Build) {
|
||||
// Get Roles
|
||||
availableRoles, _ := schema.GetValidRolesMap(user)
|
||||
|
||||
resampling := resamplingForUser(conf)
|
||||
|
||||
page := web.Page{
|
||||
Title: title,
|
||||
User: *user,
|
||||
Roles: availableRoles,
|
||||
Build: buildInfo,
|
||||
Config: conf,
|
||||
Resampling: config.Keys.EnableResampling,
|
||||
Resampling: resampling,
|
||||
Infos: infos,
|
||||
}
|
||||
|
||||
@@ -586,3 +589,32 @@ func HandleSearchBar(rw http.ResponseWriter, r *http.Request, buildInfo web.Buil
|
||||
web.RenderTemplate(rw, "message.tmpl", &web.Page{Title: "Warning", MsgType: "alert-warning", Message: "Empty search", User: *user, Roles: availableRoles, Build: buildInfo})
|
||||
}
|
||||
}
|
||||
|
||||
// resamplingForUser returns a ResampleConfig that incorporates the user's
|
||||
// resample policy preference. If the user has a policy set, it creates a
|
||||
// policy-derived config with targetPoints and trigger. Otherwise falls back
|
||||
// to the global config.
|
||||
func resamplingForUser(conf map[string]any) *config.ResampleConfig {
|
||||
globalCfg := config.Keys.EnableResampling
|
||||
|
||||
policyVal, ok := conf["plotConfiguration_resamplePolicy"]
|
||||
if !ok {
|
||||
return globalCfg
|
||||
}
|
||||
policyStr, ok := policyVal.(string)
|
||||
if !ok || policyStr == "" {
|
||||
return globalCfg
|
||||
}
|
||||
|
||||
policy := metricdispatch.ResamplePolicy(policyStr)
|
||||
targetPoints := metricdispatch.TargetPointsForPolicy(policy)
|
||||
if targetPoints == 0 {
|
||||
return globalCfg
|
||||
}
|
||||
|
||||
// Build a policy-derived config: targetPoints + trigger, no resolutions array
|
||||
return &config.ResampleConfig{
|
||||
TargetPoints: targetPoints,
|
||||
Trigger: targetPoints / 4,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user