Introduce average resampler support

Fixes #526

Entire-Checkpoint: 893a1de325b5
This commit is contained in:
2026-03-19 21:16:48 +01:00
parent 10b4fa5a06
commit c0d2d65f96
17 changed files with 239 additions and 73 deletions

View File

@@ -62,9 +62,10 @@ func cacheKey(
metrics []string,
scopes []schema.MetricScope,
resolution int,
resampleAlgo string,
) string {
return fmt.Sprintf("%d(%s):[%v],[%v]-%d",
*job.ID, job.State, metrics, scopes, resolution)
return fmt.Sprintf("%d(%s):[%v],[%v]-%d-%s",
*job.ID, job.State, metrics, scopes, resolution, resampleAlgo)
}
// LoadData retrieves metric data for a job from the appropriate backend (memory store for running jobs,
@@ -87,8 +88,9 @@ func LoadData(job *schema.Job,
scopes []schema.MetricScope,
ctx context.Context,
resolution int,
resampleAlgo string,
) (schema.JobData, error) {
data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ any, ttl time.Duration, size int) {
data := cache.Get(cacheKey(job, metrics, scopes, resolution, resampleAlgo), func() (_ any, ttl time.Duration, size int) {
var jd schema.JobData
var err error
@@ -136,13 +138,17 @@ func LoadData(job *schema.Job,
jd = deepCopy(jdTemp)
// Resample archived data using Largest Triangle Three Bucket algorithm to reduce data points
// to the requested resolution, improving transfer performance and client-side rendering.
// Resample archived data to reduce data points to the requested resolution,
// improving transfer performance and client-side rendering.
resampleFn, rfErr := resampler.GetResampler(resampleAlgo)
if rfErr != nil {
return rfErr, 0, 0
}
for _, v := range jd {
for _, v_ := range v {
timestep := int64(0)
for i := 0; i < len(v_.Series); i += 1 {
v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, int64(v_.Timestep), int64(resolution))
v_.Series[i].Data, timestep, err = resampleFn(v_.Series[i].Data, int64(v_.Timestep), int64(resolution))
if err != nil {
return err, 0, 0
}
@@ -414,6 +420,7 @@ func LoadNodeListData(
resolution int,
from, to time.Time,
ctx context.Context,
resampleAlgo string,
) (map[string]schema.JobData, error) {
if metrics == nil {
for _, m := range archive.GetCluster(cluster).MetricConfig {
@@ -428,7 +435,7 @@ func LoadNodeListData(
return nil, err
}
data, err := ms.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx)
data, err := ms.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx, resampleAlgo)
if err != nil {
if len(data) != 0 {
cclog.Warnf("partial error loading node list data from metric store for cluster %s, subcluster %s: %s",