mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-12-31 10:56:15 +01:00
491 lines
15 KiB
Go
491 lines
15 KiB
Go
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
|
// All rights reserved. This file is part of cc-backend.
|
|
// Use of this source code is governed by a MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
// Package metricdispatcher provides a unified interface for loading and caching job metric data.
|
|
//
|
|
// This package serves as a central dispatcher that routes metric data requests to the appropriate
|
|
// backend based on job state. For running jobs, data is fetched from the metric store (e.g., cc-metric-store).
|
|
// For completed jobs, data is retrieved from the file-based job archive.
|
|
//
|
|
// # Key Features
|
|
//
|
|
// - Automatic backend selection based on job state (running vs. archived)
|
|
// - LRU cache for performance optimization (128 MB default cache size)
|
|
// - Data resampling using Largest Triangle Three Bucket algorithm for archived data
|
|
// - Automatic statistics series generation for jobs with many nodes
|
|
// - Support for scoped metrics (node, socket, accelerator, core)
|
|
//
|
|
// # Cache Behavior
|
|
//
|
|
// Cached data has different TTL (time-to-live) values depending on job state:
|
|
// - Running jobs: 2 minutes (data changes frequently)
|
|
// - Completed jobs: 5 hours (data is static)
|
|
//
|
|
// The cache key is based on job ID, state, requested metrics, scopes, and resolution.
|
|
//
|
|
// # Usage
|
|
//
|
|
// The primary entry point is LoadData, which automatically handles both running and archived jobs:
|
|
//
|
|
// jobData, err := metricdispatcher.LoadData(job, metrics, scopes, ctx, resolution)
|
|
// if err != nil {
|
|
// // Handle error
|
|
// }
|
|
//
|
|
// For statistics only, use LoadJobStats, LoadScopedJobStats, or LoadAverages depending on the required format.
|
|
package metricdispatcher
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"time"
|
|
|
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
|
"github.com/ClusterCockpit/cc-backend/internal/memorystore"
|
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
|
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
|
"github.com/ClusterCockpit/cc-lib/lrucache"
|
|
"github.com/ClusterCockpit/cc-lib/resampler"
|
|
"github.com/ClusterCockpit/cc-lib/schema"
|
|
)
|
|
|
|
// cache is an LRU cache with 128 MB capacity for storing loaded job metric data.
|
|
// The cache reduces load on both the metric store and archive backends.
|
|
var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
|
|
|
|
// cacheKey generates a unique cache key for a job's metric data based on job ID, state,
|
|
// requested metrics, scopes, and resolution. Duration and StartTime are intentionally excluded
|
|
// because job.ID is more unique and the cache TTL ensures entries don't persist indefinitely.
|
|
func cacheKey(
|
|
job *schema.Job,
|
|
metrics []string,
|
|
scopes []schema.MetricScope,
|
|
resolution int,
|
|
) string {
|
|
return fmt.Sprintf("%d(%s):[%v],[%v]-%d",
|
|
job.ID, job.State, metrics, scopes, resolution)
|
|
}
|
|
|
|
// LoadData retrieves metric data for a job from the appropriate backend (memory store for running jobs,
|
|
// archive for completed jobs) and applies caching, resampling, and statistics generation as needed.
|
|
//
|
|
// For running jobs or when archive is disabled, data is fetched from the metric store.
|
|
// For completed archived jobs, data is loaded from the job archive and resampled if needed.
|
|
//
|
|
// Parameters:
|
|
// - job: The job for which to load metric data
|
|
// - metrics: List of metric names to load (nil loads all metrics for the cluster)
|
|
// - scopes: Metric scopes to include (nil defaults to node scope)
|
|
// - ctx: Context for cancellation and timeouts
|
|
// - resolution: Target number of data points for resampling (only applies to archived data)
|
|
//
|
|
// Returns the loaded job data and any error encountered. For partial errors (some metrics failed),
|
|
// the function returns the successfully loaded data with a warning logged.
|
|
func LoadData(job *schema.Job,
|
|
metrics []string,
|
|
scopes []schema.MetricScope,
|
|
ctx context.Context,
|
|
resolution int,
|
|
) (schema.JobData, error) {
|
|
data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ any, ttl time.Duration, size int) {
|
|
var jd schema.JobData
|
|
var err error
|
|
|
|
if job.State == schema.JobStateRunning ||
|
|
job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving ||
|
|
config.Keys.DisableArchive {
|
|
|
|
if scopes == nil {
|
|
scopes = append(scopes, schema.MetricScopeNode)
|
|
}
|
|
|
|
if metrics == nil {
|
|
cluster := archive.GetCluster(job.Cluster)
|
|
for _, mc := range cluster.MetricConfig {
|
|
metrics = append(metrics, mc.Name)
|
|
}
|
|
}
|
|
|
|
jd, err = memorystore.LoadData(job, metrics, scopes, ctx, resolution)
|
|
if err != nil {
|
|
if len(jd) != 0 {
|
|
cclog.Warnf("partial error loading metrics from store for job %d (user: %s, project: %s): %s",
|
|
job.JobID, job.User, job.Project, err.Error())
|
|
} else {
|
|
cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s",
|
|
job.JobID, job.User, job.Project, err.Error())
|
|
return err, 0, 0
|
|
}
|
|
}
|
|
size = jd.Size()
|
|
} else {
|
|
var jdTemp schema.JobData
|
|
jdTemp, err = archive.GetHandle().LoadJobData(job)
|
|
if err != nil {
|
|
cclog.Errorf("failed to load job data from archive for job %d (user: %s, project: %s): %s",
|
|
job.JobID, job.User, job.Project, err.Error())
|
|
return err, 0, 0
|
|
}
|
|
|
|
jd = deepCopy(jdTemp)
|
|
|
|
// Resample archived data using Largest Triangle Three Bucket algorithm to reduce data points
|
|
// to the requested resolution, improving transfer performance and client-side rendering.
|
|
for _, v := range jd {
|
|
for _, v_ := range v {
|
|
timestep := int64(0)
|
|
for i := 0; i < len(v_.Series); i += 1 {
|
|
v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, int64(v_.Timestep), int64(resolution))
|
|
if err != nil {
|
|
return err, 0, 0
|
|
}
|
|
}
|
|
v_.Timestep = int(timestep)
|
|
}
|
|
}
|
|
|
|
// Filter job data to only include requested metrics and scopes, avoiding unnecessary data transfer.
|
|
if metrics != nil || scopes != nil {
|
|
if metrics == nil {
|
|
metrics = make([]string, 0, len(jd))
|
|
for k := range jd {
|
|
metrics = append(metrics, k)
|
|
}
|
|
}
|
|
|
|
res := schema.JobData{}
|
|
for _, metric := range metrics {
|
|
if perscope, ok := jd[metric]; ok {
|
|
if len(perscope) > 1 {
|
|
subset := make(map[schema.MetricScope]*schema.JobMetric)
|
|
for _, scope := range scopes {
|
|
if jm, ok := perscope[scope]; ok {
|
|
subset[scope] = jm
|
|
}
|
|
}
|
|
|
|
if len(subset) > 0 {
|
|
perscope = subset
|
|
}
|
|
}
|
|
|
|
res[metric] = perscope
|
|
}
|
|
}
|
|
jd = res
|
|
}
|
|
size = jd.Size()
|
|
}
|
|
|
|
ttl = 5 * time.Hour
|
|
if job.State == schema.JobStateRunning {
|
|
ttl = 2 * time.Minute
|
|
}
|
|
|
|
// Generate statistics series for jobs with many nodes to enable min/median/max graphs
|
|
// instead of overwhelming the UI with individual node lines. Note that newly calculated
|
|
// statistics use min/median/max, while archived statistics may use min/mean/max.
|
|
const maxSeriesSize int = 15
|
|
for _, scopes := range jd {
|
|
for _, jm := range scopes {
|
|
if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize {
|
|
continue
|
|
}
|
|
|
|
jm.AddStatisticsSeries()
|
|
}
|
|
}
|
|
|
|
nodeScopeRequested := false
|
|
for _, scope := range scopes {
|
|
if scope == schema.MetricScopeNode {
|
|
nodeScopeRequested = true
|
|
}
|
|
}
|
|
|
|
if nodeScopeRequested {
|
|
jd.AddNodeScope("flops_any")
|
|
jd.AddNodeScope("mem_bw")
|
|
}
|
|
|
|
// Round Resulting Stat Values
|
|
jd.RoundMetricStats()
|
|
|
|
return jd, ttl, size
|
|
})
|
|
|
|
if err, ok := data.(error); ok {
|
|
cclog.Errorf("error in cached dataset for job %d: %s", job.JobID, err.Error())
|
|
return nil, err
|
|
}
|
|
|
|
return data.(schema.JobData), nil
|
|
}
|
|
|
|
// LoadAverages computes average values for the specified metrics across all nodes of a job.
|
|
// For running jobs, it loads statistics from the metric store. For completed jobs, it uses
|
|
// the pre-calculated averages from the job archive. The results are appended to the data slice.
|
|
func LoadAverages(
|
|
job *schema.Job,
|
|
metrics []string,
|
|
data [][]schema.Float,
|
|
ctx context.Context,
|
|
) error {
|
|
if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
|
|
return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here?
|
|
}
|
|
|
|
stats, err := memorystore.LoadStats(job, metrics, ctx)
|
|
if err != nil {
|
|
cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s",
|
|
job.JobID, job.User, job.Project, err.Error())
|
|
return err
|
|
}
|
|
|
|
for i, m := range metrics {
|
|
nodes, ok := stats[m]
|
|
if !ok {
|
|
data[i] = append(data[i], schema.NaN)
|
|
continue
|
|
}
|
|
|
|
sum := 0.0
|
|
for _, node := range nodes {
|
|
sum += node.Avg
|
|
}
|
|
data[i] = append(data[i], schema.Float(sum))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// LoadScopedJobStats retrieves job statistics organized by metric scope (node, socket, core, accelerator).
|
|
// For running jobs, statistics are computed from the metric store. For completed jobs, pre-calculated
|
|
// statistics are loaded from the job archive.
|
|
func LoadScopedJobStats(
|
|
job *schema.Job,
|
|
metrics []string,
|
|
scopes []schema.MetricScope,
|
|
ctx context.Context,
|
|
) (schema.ScopedJobStats, error) {
|
|
if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
|
|
return archive.LoadScopedStatsFromArchive(job, metrics, scopes)
|
|
}
|
|
|
|
scopedStats, err := memorystore.LoadScopedStats(job, metrics, scopes, ctx)
|
|
if err != nil {
|
|
cclog.Errorf("failed to load scoped statistics from metric store for job %d (user: %s, project: %s): %s",
|
|
job.JobID, job.User, job.Project, err.Error())
|
|
return nil, err
|
|
}
|
|
|
|
return scopedStats, nil
|
|
}
|
|
|
|
// LoadJobStats retrieves aggregated statistics (min/avg/max) for each requested metric across all job nodes.
|
|
// For running jobs, statistics are computed from the metric store. For completed jobs, pre-calculated
|
|
// statistics are loaded from the job archive.
|
|
func LoadJobStats(
|
|
job *schema.Job,
|
|
metrics []string,
|
|
ctx context.Context,
|
|
) (map[string]schema.MetricStatistics, error) {
|
|
if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
|
|
return archive.LoadStatsFromArchive(job, metrics)
|
|
}
|
|
|
|
data := make(map[string]schema.MetricStatistics, len(metrics))
|
|
|
|
stats, err := memorystore.LoadStats(job, metrics, ctx)
|
|
if err != nil {
|
|
cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s",
|
|
job.JobID, job.User, job.Project, err.Error())
|
|
return data, err
|
|
}
|
|
|
|
for _, m := range metrics {
|
|
sum, avg, min, max := 0.0, 0.0, 0.0, 0.0
|
|
nodes, ok := stats[m]
|
|
if !ok {
|
|
data[m] = schema.MetricStatistics{Min: min, Avg: avg, Max: max}
|
|
continue
|
|
}
|
|
|
|
for _, node := range nodes {
|
|
sum += node.Avg
|
|
min = math.Min(min, node.Min)
|
|
max = math.Max(max, node.Max)
|
|
}
|
|
|
|
data[m] = schema.MetricStatistics{
|
|
Avg: (math.Round((sum/float64(job.NumNodes))*100) / 100),
|
|
Min: (math.Round(min*100) / 100),
|
|
Max: (math.Round(max*100) / 100),
|
|
}
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
// LoadNodeData retrieves metric data for specific nodes in a cluster within a time range.
|
|
// This is used for node monitoring views and system status pages. Data is always fetched from
|
|
// the metric store (not the archive) since it's for current/recent node status monitoring.
|
|
//
|
|
// Returns a nested map structure: node -> metric -> scoped data.
|
|
func LoadNodeData(
|
|
cluster string,
|
|
metrics, nodes []string,
|
|
scopes []schema.MetricScope,
|
|
from, to time.Time,
|
|
ctx context.Context,
|
|
) (map[string]map[string][]*schema.JobMetric, error) {
|
|
if metrics == nil {
|
|
for _, m := range archive.GetCluster(cluster).MetricConfig {
|
|
metrics = append(metrics, m.Name)
|
|
}
|
|
}
|
|
|
|
data, err := memorystore.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
|
|
if err != nil {
|
|
if len(data) != 0 {
|
|
cclog.Warnf("partial error loading node data from metric store for cluster %s: %s", cluster, err.Error())
|
|
} else {
|
|
cclog.Errorf("failed to load node data from metric store for cluster %s: %s", cluster, err.Error())
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if data == nil {
|
|
return nil, fmt.Errorf("metric store for cluster '%s' does not support node data queries", cluster)
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
// LoadNodeListData retrieves time-series metric data for multiple nodes within a time range,
|
|
// with optional resampling and automatic statistics generation for large datasets.
|
|
// This is used for comparing multiple nodes or displaying node status over time.
|
|
//
|
|
// Returns a map of node names to their job-like metric data structures.
|
|
func LoadNodeListData(
|
|
cluster, subCluster string,
|
|
nodes []string,
|
|
metrics []string,
|
|
scopes []schema.MetricScope,
|
|
resolution int,
|
|
from, to time.Time,
|
|
ctx context.Context,
|
|
) (map[string]schema.JobData, error) {
|
|
if metrics == nil {
|
|
for _, m := range archive.GetCluster(cluster).MetricConfig {
|
|
metrics = append(metrics, m.Name)
|
|
}
|
|
}
|
|
|
|
data, err := memorystore.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx)
|
|
if err != nil {
|
|
if len(data) != 0 {
|
|
cclog.Warnf("partial error loading node list data from metric store for cluster %s, subcluster %s: %s",
|
|
cluster, subCluster, err.Error())
|
|
} else {
|
|
cclog.Errorf("failed to load node list data from metric store for cluster %s, subcluster %s: %s",
|
|
cluster, subCluster, err.Error())
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Generate statistics series for datasets with many series to improve visualization performance.
|
|
// Statistics are calculated as min/median/max.
|
|
const maxSeriesSize int = 8
|
|
for _, jd := range data {
|
|
for _, scopes := range jd {
|
|
for _, jm := range scopes {
|
|
if jm.StatisticsSeries != nil || len(jm.Series) < maxSeriesSize {
|
|
continue
|
|
}
|
|
jm.AddStatisticsSeries()
|
|
}
|
|
}
|
|
}
|
|
|
|
if data == nil {
|
|
return nil, fmt.Errorf("metric store for cluster '%s' does not support node list queries", cluster)
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
// deepCopy creates a deep copy of JobData to prevent cache corruption when modifying
|
|
// archived data (e.g., during resampling). This ensures the cached archive data remains
|
|
// immutable while allowing per-request transformations.
|
|
func deepCopy(source schema.JobData) schema.JobData {
|
|
result := make(schema.JobData, len(source))
|
|
|
|
for metricName, scopeMap := range source {
|
|
result[metricName] = make(map[schema.MetricScope]*schema.JobMetric, len(scopeMap))
|
|
|
|
for scope, jobMetric := range scopeMap {
|
|
result[metricName][scope] = copyJobMetric(jobMetric)
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func copyJobMetric(src *schema.JobMetric) *schema.JobMetric {
|
|
dst := &schema.JobMetric{
|
|
Timestep: src.Timestep,
|
|
Unit: src.Unit,
|
|
Series: make([]schema.Series, len(src.Series)),
|
|
}
|
|
|
|
for i := range src.Series {
|
|
dst.Series[i] = copySeries(&src.Series[i])
|
|
}
|
|
|
|
if src.StatisticsSeries != nil {
|
|
dst.StatisticsSeries = copyStatisticsSeries(src.StatisticsSeries)
|
|
}
|
|
|
|
return dst
|
|
}
|
|
|
|
func copySeries(src *schema.Series) schema.Series {
|
|
dst := schema.Series{
|
|
Hostname: src.Hostname,
|
|
Id: src.Id,
|
|
Statistics: src.Statistics,
|
|
Data: make([]schema.Float, len(src.Data)),
|
|
}
|
|
|
|
copy(dst.Data, src.Data)
|
|
return dst
|
|
}
|
|
|
|
func copyStatisticsSeries(src *schema.StatsSeries) *schema.StatsSeries {
|
|
dst := &schema.StatsSeries{
|
|
Min: make([]schema.Float, len(src.Min)),
|
|
Mean: make([]schema.Float, len(src.Mean)),
|
|
Median: make([]schema.Float, len(src.Median)),
|
|
Max: make([]schema.Float, len(src.Max)),
|
|
}
|
|
|
|
copy(dst.Min, src.Min)
|
|
copy(dst.Mean, src.Mean)
|
|
copy(dst.Median, src.Median)
|
|
copy(dst.Max, src.Max)
|
|
|
|
if len(src.Percentiles) > 0 {
|
|
dst.Percentiles = make(map[int][]schema.Float, len(src.Percentiles))
|
|
for percentile, values := range src.Percentiles {
|
|
dst.Percentiles[percentile] = make([]schema.Float, len(values))
|
|
copy(dst.Percentiles[percentile], values)
|
|
}
|
|
}
|
|
|
|
return dst
|
|
}
|