5 Commits

32 changed files with 1120 additions and 729 deletions

View File

@@ -284,8 +284,13 @@ func initSubsystems() error {
}
// Initialize metricdata
if err := metricdata.Init(); err != nil {
return fmt.Errorf("initializing metricdata repository: %w", err)
// if err := metricdata.Init(); err != nil {
// return fmt.Errorf("initializing metricdata repository: %w", err)
// }
// Initialize upstream metricdata repositories for pull worker
if err := metricdata.InitUpstreamRepos(); err != nil {
return fmt.Errorf("initializing upstream metricdata repositories: %w", err)
}
// Handle database re-initialization
@@ -322,13 +327,12 @@ func initSubsystems() error {
func runServer(ctx context.Context) error {
var wg sync.WaitGroup
// Start metric store if enabled
if memorystore.InternalCCMSFlag {
mscfg := ccconf.GetPackageConfig("metric-store")
if mscfg == nil {
return fmt.Errorf("metric store configuration must be present")
}
// Initialize metric store if configuration is provided
mscfg := ccconf.GetPackageConfig("metric-store")
if mscfg != nil {
memorystore.Init(mscfg, &wg)
} else {
cclog.Debug("Metric store configuration not found, skipping memorystore initialization")
}
// Start archiver and task manager

View File

@@ -253,9 +253,7 @@ func (s *Server) init() error {
}
}
if memorystore.InternalCCMSFlag {
s.restAPIHandle.MountMetricStoreAPIRoutes(metricstoreapi)
}
s.restAPIHandle.MountMetricStoreAPIRoutes(metricstoreapi)
if config.Keys.EmbedStaticFiles {
if i, err := os.Stat("./var/img"); err == nil {
@@ -383,9 +381,7 @@ func (s *Server) Shutdown(ctx context.Context) {
}
// Archive all the metric store data
if memorystore.InternalCCMSFlag {
memorystore.Shutdown()
}
memorystore.Shutdown()
// Shutdown archiver with 10 second timeout for fast shutdown
if err := archiver.Shutdown(10 * time.Second); err != nil {

View File

@@ -37,11 +37,6 @@
"clusters": [
{
"name": "fritz",
"metricDataRepository": {
"kind": "cc-metric-store-internal",
"url": "http://localhost:8082",
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJFZERTQSJ9.eyJ1c2VyIjoiYWRtaW4iLCJyb2xlcyI6WyJST0xFX0FETUlOIiwiUk9MRV9BTkFMWVNUIiwiUk9MRV9VU0VSIl19.d-3_3FZTsadPjDEdsWrrQ7nS0edMAR4zjl-eK7rJU3HziNBfI9PDHDIpJVHTNN5E5SlLGLFXctWyKAkwhXL-Dw"
},
"filterRanges": {
"numNodes": {
"from": 1,
@@ -59,11 +54,6 @@
},
{
"name": "alex",
"metricDataRepository": {
"kind": "cc-metric-store-internal",
"url": "http://localhost:8082",
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJFZERTQSJ9.eyJ1c2VyIjoiYWRtaW4iLCJyb2xlcyI6WyJST0xFX0FETUlOIiwiUk9MRV9BTkFMWVNUIiwiUk9MRV9VU0VSIl19.d-3_3FZTsadPjDEdsWrrQ7nS0edMAR4zjl-eK7rJU3HziNBfI9PDHDIpJVHTNN5E5SlLGLFXctWyKAkwhXL-Dw"
},
"filterRanges": {
"numNodes": {
"from": 1,

View File

@@ -29,11 +29,6 @@
"clusters": [
{
"name": "test",
"metricDataRepository": {
"kind": "cc-metric-store",
"url": "http://localhost:8082",
"token": "eyJhbGciOiJF-E-pQBQ"
},
"filterRanges": {
"numNodes": {
"from": 1,

View File

@@ -17,14 +17,15 @@ import (
"strings"
"testing"
"time"
"sync"
"github.com/ClusterCockpit/cc-backend/internal/api"
"github.com/ClusterCockpit/cc-backend/internal/archiver"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/memorystore"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
ccconf "github.com/ClusterCockpit/cc-lib/ccConfig"
@@ -56,7 +57,6 @@ func setup(t *testing.T) *api.RestAPI {
"clusters": [
{
"name": "testcluster",
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
"filterRanges": {
"numNodes": { "from": 1, "to": 64 },
"duration": { "from": 0, "to": 86400 },
@@ -171,8 +171,12 @@ func setup(t *testing.T) *api.RestAPI {
t.Fatal(err)
}
if err := metricdata.Init(); err != nil {
t.Fatal(err)
// Initialize memorystore (optional - will return nil if not configured)
// For this test, we don't initialize it to test the nil handling
mscfg := ccconf.GetPackageConfig("metric-store")
if mscfg != nil {
var wg sync.WaitGroup
memorystore.Init(mscfg, &wg)
}
archiver.Start(repository.GetJobRepository(), context.Background())
@@ -194,36 +198,19 @@ func cleanup() {
if err := archiver.Shutdown(5 * time.Second); err != nil {
cclog.Warnf("Archiver shutdown timeout in tests: %v", err)
}
// TODO: Clear all caches, reset all modules, etc...
// Shutdown memorystore if it was initialized
memorystore.Shutdown()
}
/*
* This function starts a job, stops it, and then reads its data from the job-archive.
* Do not run sub-tests in parallel! Tests should not be run in parallel at all, because
* at least `setup` modifies global state.
* This function starts a job, stops it, and tests the REST API.
* Do not run sub-tests in parallel! Tests should not be run in parallel at all, because
* at least `setup` modifies global state.
*/
func TestRestApi(t *testing.T) {
restapi := setup(t)
t.Cleanup(cleanup)
testData := schema.JobData{
"load_one": map[schema.MetricScope]*schema.JobMetric{
schema.MetricScopeNode: {
Unit: schema.Unit{Base: "load"},
Timestep: 60,
Series: []schema.Series{
{
Hostname: "host123",
Statistics: schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3},
Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3},
},
},
},
},
}
metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
return testData, nil
}
r := mux.NewRouter()
r.PathPrefix("/api").Subrouter()
@@ -278,18 +265,12 @@ func TestRestApi(t *testing.T) {
if response.StatusCode != http.StatusCreated {
t.Fatal(response.Status, recorder.Body.String())
}
// resolver := graph.GetResolverInstance()
restapi.JobRepository.SyncJobs()
job, err := restapi.JobRepository.Find(&TestJobId, &TestClusterName, &TestStartTime)
if err != nil {
t.Fatal(err)
}
// job.Tags, err = resolver.Job().Tags(ctx, job)
// if err != nil {
// t.Fatal(err)
// }
if job.JobID != 123 ||
job.User != "testuser" ||
job.Project != "testproj" ||
@@ -307,10 +288,6 @@ func TestRestApi(t *testing.T) {
job.StartTime != 123456789 {
t.Fatalf("unexpected job properties: %#v", job)
}
// if len(job.Tags) != 1 || job.Tags[0].Type != "testTagType" || job.Tags[0].Name != "testTagName" || job.Tags[0].Scope != "testuser" {
// t.Fatalf("unexpected tags: %#v", job.Tags)
// }
}); !ok {
return
}
@@ -324,7 +301,6 @@ func TestRestApi(t *testing.T) {
"stopTime": 123457789
}`
var stoppedJob *schema.Job
if ok := t.Run("StopJob", func(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/jobs/stop_job/", bytes.NewBuffer([]byte(stopJobBody)))
recorder := httptest.NewRecorder()
@@ -360,21 +336,12 @@ func TestRestApi(t *testing.T) {
t.Fatalf("unexpected job.metaData: %#v", job.MetaData)
}
stoppedJob = job
}); !ok {
return
}
t.Run("CheckArchive", func(t *testing.T) {
data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(data, testData) {
t.Fatal("unexpected data fetched from archive")
}
})
// Note: We skip the CheckArchive test because without memorystore initialized,
// archiving will fail gracefully. This test now focuses on the REST API itself.
t.Run("CheckDoubleStart", func(t *testing.T) {
// Starting a job with the same jobId and cluster should only be allowed if the startTime is far appart!

View File

@@ -22,7 +22,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/importer"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatcher"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
@@ -293,7 +293,7 @@ func (api *RestAPI) getCompleteJobByID(rw http.ResponseWriter, r *http.Request)
}
if r.URL.Query().Get("all-metrics") == "true" {
data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context(), resolution)
data, err = metricdispatcher.LoadData(job, nil, scopes, r.Context(), resolution)
if err != nil {
cclog.Warnf("REST: error while loading all-metrics job data for JobID %d on %s", job.JobID, job.Cluster)
return
@@ -389,7 +389,7 @@ func (api *RestAPI) getJobByID(rw http.ResponseWriter, r *http.Request) {
resolution = max(resolution, mc.Timestep)
}
data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context(), resolution)
data, err := metricdispatcher.LoadData(job, metrics, scopes, r.Context(), resolution)
if err != nil {
cclog.Warnf("REST: error while loading job data for JobID %d on %s", job.JobID, job.Cluster)
return

View File

@@ -106,7 +106,7 @@ Data is archived at the highest available resolution (typically 60s intervals).
```go
// In archiver.go ArchiveJob() function
jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 300)
jobData, err := metricdispatcher.LoadData(job, allMetrics, scopes, ctx, 300)
// 0 = highest resolution
// 300 = 5-minute resolution
```
@@ -185,6 +185,6 @@ Internal state is protected by:
## Dependencies
- `internal/repository`: Database operations for job metadata
- `internal/metricDataDispatcher`: Loading metric data from various backends
- `internal/metricdispatcher`: Loading metric data from various backends
- `pkg/archive`: Archive backend abstraction (filesystem, S3, SQLite)
- `cc-lib/schema`: Job and metric data structures

View File

@@ -10,7 +10,7 @@ import (
"math"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatcher"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
@@ -60,7 +60,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.Job, error) {
scopes = append(scopes, schema.MetricScopeAccelerator)
}
jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 0) // 0 Resulotion-Value retrieves highest res (60s)
jobData, err := metricdispatcher.LoadData(job, allMetrics, scopes, ctx, 0) // 0 Resulotion-Value retrieves highest res (60s)
if err != nil {
cclog.Error("Error wile loading job data for archiving")
return nil, err

View File

@@ -78,6 +78,9 @@ type ProgramConfig struct {
// If exists, will enable dynamic zoom in frontend metric plots using the configured values
EnableResampling *ResampleConfig `json:"resampling"`
// Global upstream metric repository configuration for metric pull workers
UpstreamMetricRepository *json.RawMessage `json:"upstreamMetricRepository,omitempty"`
}
type ResampleConfig struct {
@@ -113,9 +116,8 @@ type FilterRanges struct {
}
type ClusterConfig struct {
Name string `json:"name"`
FilterRanges *FilterRanges `json:"filterRanges"`
MetricDataRepository json.RawMessage `json:"metricDataRepository"`
Name string `json:"name"`
FilterRanges *FilterRanges `json:"filterRanges"`
}
var Clusters []*ClusterConfig

View File

@@ -119,6 +119,23 @@ var configSchema = `
}
},
"required": ["trigger", "resolutions"]
},
"upstreamMetricRepository": {
"description": "Global upstream metric repository configuration for metric pull workers",
"type": "object",
"properties": {
"kind": {
"type": "string",
"enum": ["influxdb", "prometheus", "cc-metric-store", "cc-metric-store-internal", "test"]
},
"url": {
"type": "string"
},
"token": {
"type": "string"
}
},
"required": ["kind"]
}
},
"required": ["apiAllowedIPs"]
@@ -199,7 +216,7 @@ var clustersSchema = `
"required": ["numNodes", "duration", "startTime"]
}
},
"required": ["name", "metricDataRepository", "filterRanges"],
"required": ["name", "filterRanges"],
"minItems": 1
}
}`

View File

@@ -19,7 +19,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/generated"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatcher"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
@@ -484,7 +484,7 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str
return nil, err
}
data, err := metricDataDispatcher.LoadData(job, metrics, scopes, ctx, *resolution)
data, err := metricdispatcher.LoadData(job, metrics, scopes, ctx, *resolution)
if err != nil {
cclog.Warn("Error while loading job data")
return nil, err
@@ -512,7 +512,7 @@ func (r *queryResolver) JobStats(ctx context.Context, id string, metrics []strin
return nil, err
}
data, err := metricDataDispatcher.LoadJobStats(job, metrics, ctx)
data, err := metricdispatcher.LoadJobStats(job, metrics, ctx)
if err != nil {
cclog.Warnf("Error while loading jobStats data for job id %s", id)
return nil, err
@@ -537,7 +537,7 @@ func (r *queryResolver) ScopedJobStats(ctx context.Context, id string, metrics [
return nil, err
}
data, err := metricDataDispatcher.LoadScopedJobStats(job, metrics, scopes, ctx)
data, err := metricdispatcher.LoadScopedJobStats(job, metrics, scopes, ctx)
if err != nil {
cclog.Warnf("Error while loading scopedJobStats data for job id %s", id)
return nil, err
@@ -702,7 +702,7 @@ func (r *queryResolver) JobsMetricStats(ctx context.Context, filter []*model.Job
res := []*model.JobStats{}
for _, job := range jobs {
data, err := metricDataDispatcher.LoadJobStats(job, metrics, ctx)
data, err := metricdispatcher.LoadJobStats(job, metrics, ctx)
if err != nil {
cclog.Warnf("Error while loading comparison jobStats data for job id %d", job.JobID)
continue
@@ -759,7 +759,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes [
}
}
data, err := metricDataDispatcher.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
data, err := metricdispatcher.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
if err != nil {
cclog.Warn("error while loading node data")
return nil, err
@@ -825,7 +825,7 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub
}
}
data, err := metricDataDispatcher.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, *resolution, from, to, ctx)
data, err := metricdispatcher.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, *resolution, from, to, ctx)
if err != nil {
cclog.Warn("error while loading node data (Resolver.NodeMetricsList")
return nil, err
@@ -880,7 +880,7 @@ func (r *queryResolver) ClusterMetrics(ctx context.Context, cluster string, metr
// 'nodes' == nil -> Defaults to all nodes of cluster for existing query workflow
scopes := []schema.MetricScope{"node"}
data, err := metricDataDispatcher.LoadNodeData(cluster, metrics, nil, scopes, from, to, ctx)
data, err := metricdispatcher.LoadNodeData(cluster, metrics, nil, scopes, from, to, ctx)
if err != nil {
cclog.Warn("error while loading node data")
return nil, err

View File

@@ -13,7 +13,7 @@ import (
"github.com/99designs/gqlgen/graphql"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatcher"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
)
@@ -55,7 +55,7 @@ func (r *queryResolver) rooflineHeatmap(
// resolution = max(resolution, mc.Timestep)
// }
jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0)
jobdata, err := metricdispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0)
if err != nil {
cclog.Errorf("Error while loading roofline metrics for job %d", job.ID)
return nil, err
@@ -128,7 +128,7 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
continue
}
if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil {
if err := metricdispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil {
cclog.Error("Error while loading averages for footprint")
return nil, err
}

View File

@@ -60,7 +60,6 @@ func setup(t *testing.T) *repository.JobRepository {
"clusters": [
{
"name": "testcluster",
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
"filterRanges": {
"numNodes": { "from": 1, "to": 64 },
"duration": { "from": 0, "to": 86400 },
@@ -69,7 +68,6 @@ func setup(t *testing.T) *repository.JobRepository {
},
{
"name": "fritz",
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
"filterRanges": {
"numNodes": { "from": 1, "to": 944 },
"duration": { "from": 0, "to": 86400 },
@@ -78,7 +76,6 @@ func setup(t *testing.T) *repository.JobRepository {
},
{
"name": "taurus",
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
"filterRanges": {
"numNodes": { "from": 1, "to": 4000 },
"duration": { "from": 0, "to": 604800 },

View File

@@ -7,6 +7,7 @@ package memorystore
import (
"errors"
"fmt"
"math"
"github.com/ClusterCockpit/cc-lib/schema"
@@ -124,6 +125,10 @@ func FetchData(req APIQueryRequest) (*APIQueryResponse, error) {
req.WithData = true
ms := GetMemoryStore()
if ms == nil {
return nil, fmt.Errorf("memorystore not initialized")
}
response := APIQueryResponse{
Results: make([][]APIMetricData, 0, len(req.Queries)),

View File

@@ -19,8 +19,6 @@ const (
DefaultAvroCheckpointInterval = time.Minute
)
var InternalCCMSFlag bool = false
type MetricStoreConfig struct {
// Number of concurrent workers for checkpoint and archive operations.
// If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10)

View File

@@ -177,13 +177,19 @@ func InitMetrics(metrics map[string]MetricConfig) {
func GetMemoryStore() *MemoryStore {
if msInstance == nil {
cclog.Fatalf("[METRICSTORE]> MemoryStore not initialized!")
return nil
}
return msInstance
}
func Shutdown() {
// Check if memorystore was initialized
if msInstance == nil {
cclog.Debug("[METRICSTORE]> MemoryStore not initialized, skipping shutdown")
return
}
// Cancel the context to signal all background goroutines to stop
if shutdownFunc != nil {
shutdownFunc()

View File

@@ -3,56 +3,34 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package metricdata
package memorystore
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/ClusterCockpit/cc-backend/internal/memorystore"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
)
// Bloat Code
type CCMetricStoreConfigInternal struct {
Kind string `json:"kind"`
Url string `json:"url"`
Token string `json:"token"`
// If metrics are known to this MetricDataRepository under a different
// name than in the `metricConfig` section of the 'cluster.json',
// provide this optional mapping of local to remote name for this metric.
Renamings map[string]string `json:"metricRenamings"`
}
// Bloat Code
type CCMetricStoreInternal struct{}
// Bloat Code
func (ccms *CCMetricStoreInternal) Init(rawConfig json.RawMessage) error {
return nil
}
func (ccms *CCMetricStoreInternal) LoadData(
func LoadData(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
resolution int,
) (schema.JobData, error) {
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, int64(resolution))
queries, assignedScope, err := buildQueries(job, metrics, scopes, int64(resolution))
if err != nil {
cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error())
return nil, err
}
req := memorystore.APIQueryRequest{
req := APIQueryRequest{
Cluster: job.Cluster,
From: job.StartTime,
To: job.StartTime + int64(job.Duration),
@@ -61,7 +39,7 @@ func (ccms *CCMetricStoreInternal) LoadData(
WithData: true,
}
resBody, err := memorystore.FetchData(req)
resBody, err := FetchData(req)
if err != nil {
cclog.Errorf("Error while fetching data : %s", err.Error())
return nil, err
@@ -149,13 +127,13 @@ var (
acceleratorString = string(schema.MetricScopeAccelerator)
)
func (ccms *CCMetricStoreInternal) buildQueries(
func buildQueries(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
resolution int64,
) ([]memorystore.APIQuery, []schema.MetricScope, error) {
queries := make([]memorystore.APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
) ([]APIQuery, []schema.MetricScope, error) {
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
assignedScope := []schema.MetricScope{}
subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster)
@@ -217,7 +195,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
continue
}
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: false,
@@ -235,7 +213,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
continue
}
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: true,
@@ -249,7 +227,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// HWThread -> HWThead
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: false,
@@ -265,7 +243,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
for _, core := range cores {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: true,
@@ -282,7 +260,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
for _, socket := range sockets {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: true,
@@ -297,7 +275,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// HWThread -> Node
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: true,
@@ -312,7 +290,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// Core -> Core
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: false,
@@ -328,7 +306,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromCores(hwthreads)
for _, socket := range sockets {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: true,
@@ -344,7 +322,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// Core -> Node
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: true,
@@ -359,7 +337,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// MemoryDomain -> MemoryDomain
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain {
sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: false,
@@ -374,7 +352,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// MemoryDoman -> Node
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode {
sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: true,
@@ -389,7 +367,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// Socket -> Socket
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: false,
@@ -404,7 +382,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// Socket -> Node
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: true,
@@ -418,7 +396,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// Node -> Node
if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Resolution: resolution,
@@ -435,18 +413,18 @@ func (ccms *CCMetricStoreInternal) buildQueries(
return queries, assignedScope, nil
}
func (ccms *CCMetricStoreInternal) LoadStats(
func LoadStats(
job *schema.Job,
metrics []string,
ctx context.Context,
) (map[string]map[string]schema.MetricStatistics, error) {
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization?
queries, _, err := buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization?
if err != nil {
cclog.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error())
return nil, err
}
req := memorystore.APIQueryRequest{
req := APIQueryRequest{
Cluster: job.Cluster,
From: job.StartTime,
To: job.StartTime + int64(job.Duration),
@@ -455,7 +433,7 @@ func (ccms *CCMetricStoreInternal) LoadStats(
WithData: false,
}
resBody, err := memorystore.FetchData(req)
resBody, err := FetchData(req)
if err != nil {
cclog.Errorf("Error while fetching data : %s", err.Error())
return nil, err
@@ -492,20 +470,19 @@ func (ccms *CCMetricStoreInternal) LoadStats(
return stats, nil
}
// Used for Job-View Statistics Table
func (ccms *CCMetricStoreInternal) LoadScopedStats(
func LoadScopedStats(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
) (schema.ScopedJobStats, error) {
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, 0)
queries, assignedScope, err := buildQueries(job, metrics, scopes, 0)
if err != nil {
cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error())
return nil, err
}
req := memorystore.APIQueryRequest{
req := APIQueryRequest{
Cluster: job.Cluster,
From: job.StartTime,
To: job.StartTime + int64(job.Duration),
@@ -514,7 +491,7 @@ func (ccms *CCMetricStoreInternal) LoadScopedStats(
WithData: false,
}
resBody, err := memorystore.FetchData(req)
resBody, err := FetchData(req)
if err != nil {
cclog.Errorf("Error while fetching data : %s", err.Error())
return nil, err
@@ -583,15 +560,14 @@ func (ccms *CCMetricStoreInternal) LoadScopedStats(
return scopedJobStats, nil
}
// Used for Systems-View Node-Overview
func (ccms *CCMetricStoreInternal) LoadNodeData(
func LoadNodeData(
cluster string,
metrics, nodes []string,
scopes []schema.MetricScope,
from, to time.Time,
ctx context.Context,
) (map[string]map[string][]*schema.JobMetric, error) {
req := memorystore.APIQueryRequest{
req := APIQueryRequest{
Cluster: cluster,
From: from.Unix(),
To: to.Unix(),
@@ -604,7 +580,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData(
} else {
for _, node := range nodes {
for _, metric := range metrics {
req.Queries = append(req.Queries, memorystore.APIQuery{
req.Queries = append(req.Queries, APIQuery{
Hostname: node,
Metric: metric,
Resolution: 0, // Default for Node Queries: Will return metric $Timestep Resolution
@@ -613,7 +589,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData(
}
}
resBody, err := memorystore.FetchData(req)
resBody, err := FetchData(req)
if err != nil {
cclog.Errorf("Error while fetching data : %s", err.Error())
return nil, err
@@ -622,7 +598,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData(
var errors []string
data := make(map[string]map[string][]*schema.JobMetric)
for i, res := range resBody.Results {
var query memorystore.APIQuery
var query APIQuery
if resBody.Queries != nil {
query = resBody.Queries[i]
} else {
@@ -673,8 +649,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData(
return data, nil
}
// Used for Systems-View Node-List
func (ccms *CCMetricStoreInternal) LoadNodeListData(
func LoadNodeListData(
cluster, subCluster string,
nodes []string,
metrics []string,
@@ -683,15 +658,14 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData(
from, to time.Time,
ctx context.Context,
) (map[string]schema.JobData, error) {
// Note: Order of node data is not guaranteed after this point
queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, int64(resolution))
queries, assignedScope, err := buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, int64(resolution))
if err != nil {
cclog.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error())
return nil, err
}
req := memorystore.APIQueryRequest{
req := APIQueryRequest{
Cluster: cluster,
Queries: queries,
From: from.Unix(),
@@ -700,7 +674,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData(
WithData: true,
}
resBody, err := memorystore.FetchData(req)
resBody, err := FetchData(req)
if err != nil {
cclog.Errorf("Error while fetching data : %s", err.Error())
return nil, err
@@ -709,7 +683,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData(
var errors []string
data := make(map[string]schema.JobData)
for i, row := range resBody.Results {
var query memorystore.APIQuery
var query APIQuery
if resBody.Queries != nil {
query = resBody.Queries[i]
} else {
@@ -789,15 +763,15 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData(
return data, nil
}
func (ccms *CCMetricStoreInternal) buildNodeQueries(
func buildNodeQueries(
cluster string,
subCluster string,
nodes []string,
metrics []string,
scopes []schema.MetricScope,
resolution int64,
) ([]memorystore.APIQuery, []schema.MetricScope, error) {
queries := make([]memorystore.APIQuery, 0, len(metrics)*len(scopes)*len(nodes))
) ([]APIQuery, []schema.MetricScope, error) {
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes))
assignedScope := []schema.MetricScope{}
// Get Topol before loop if subCluster given
@@ -812,7 +786,6 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
}
for _, metric := range metrics {
metric := metric
mc := archive.GetMetricConfig(cluster, metric)
if mc == nil {
// return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, cluster)
@@ -880,7 +853,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
continue
}
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: false,
@@ -898,7 +871,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
continue
}
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
@@ -912,7 +885,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// HWThread -> HWThead
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: false,
@@ -928,7 +901,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(topology.Node)
for _, core := range cores {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
@@ -945,7 +918,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(topology.Node)
for _, socket := range sockets {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
@@ -960,7 +933,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// HWThread -> Node
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
@@ -975,7 +948,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// Core -> Core
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(topology.Node)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: false,
@@ -991,7 +964,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromCores(topology.Node)
for _, socket := range sockets {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
@@ -1007,7 +980,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// Core -> Node
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode {
cores, _ := topology.GetCoresFromHWThreads(topology.Node)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
@@ -1022,7 +995,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// MemoryDomain -> MemoryDomain
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain {
sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: false,
@@ -1037,7 +1010,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// MemoryDoman -> Node
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode {
sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
@@ -1052,7 +1025,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// Socket -> Socket
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(topology.Node)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: false,
@@ -1067,7 +1040,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// Socket -> Node
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode {
sockets, _ := topology.GetSocketsFromHWThreads(topology.Node)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
@@ -1081,7 +1054,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// Node -> Node
if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Resolution: resolution,

View File

@@ -1,381 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package metricDataDispatcher
import (
"context"
"fmt"
"math"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/lrucache"
"github.com/ClusterCockpit/cc-lib/resampler"
"github.com/ClusterCockpit/cc-lib/schema"
)
var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
func cacheKey(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
resolution int,
) string {
// Duration and StartTime do not need to be in the cache key as StartTime is less unique than
// job.ID and the TTL of the cache entry makes sure it does not stay there forever.
return fmt.Sprintf("%d(%s):[%v],[%v]-%d",
job.ID, job.State, metrics, scopes, resolution)
}
// Fetches the metric data for a job.
func LoadData(job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
resolution int,
) (schema.JobData, error) {
data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ any, ttl time.Duration, size int) {
var jd schema.JobData
var err error
if job.State == schema.JobStateRunning ||
job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving ||
config.Keys.DisableArchive {
repo, err := metricdata.GetMetricDataRepo(job.Cluster)
if err != nil {
return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0
}
if scopes == nil {
scopes = append(scopes, schema.MetricScopeNode)
}
if metrics == nil {
cluster := archive.GetCluster(job.Cluster)
for _, mc := range cluster.MetricConfig {
metrics = append(metrics, mc.Name)
}
}
jd, err = repo.LoadData(job, metrics, scopes, ctx, resolution)
if err != nil {
if len(jd) != 0 {
cclog.Warnf("partial error: %s", err.Error())
// return err, 0, 0 // Reactivating will block archiving on one partial error
} else {
cclog.Error("Error while loading job data from metric repository")
return err, 0, 0
}
}
size = jd.Size()
} else {
var jd_temp schema.JobData
jd_temp, err = archive.GetHandle().LoadJobData(job)
if err != nil {
cclog.Error("Error while loading job data from archive")
return err, 0, 0
}
// Deep copy the cached archive hashmap
jd = metricdata.DeepCopy(jd_temp)
// Resampling for archived data.
// Pass the resolution from frontend here.
for _, v := range jd {
for _, v_ := range v {
timestep := int64(0)
for i := 0; i < len(v_.Series); i += 1 {
v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, int64(v_.Timestep), int64(resolution))
if err != nil {
return err, 0, 0
}
}
v_.Timestep = int(timestep)
}
}
// Avoid sending unrequested data to the client:
if metrics != nil || scopes != nil {
if metrics == nil {
metrics = make([]string, 0, len(jd))
for k := range jd {
metrics = append(metrics, k)
}
}
res := schema.JobData{}
for _, metric := range metrics {
if perscope, ok := jd[metric]; ok {
if len(perscope) > 1 {
subset := make(map[schema.MetricScope]*schema.JobMetric)
for _, scope := range scopes {
if jm, ok := perscope[scope]; ok {
subset[scope] = jm
}
}
if len(subset) > 0 {
perscope = subset
}
}
res[metric] = perscope
}
}
jd = res
}
size = jd.Size()
}
ttl = 5 * time.Hour
if job.State == schema.JobStateRunning {
ttl = 2 * time.Minute
}
// FIXME: Review: Is this really necessary or correct.
// Note: Lines 147-170 formerly known as prepareJobData(jobData, scopes)
// For /monitoring/job/<job> and some other places, flops_any and mem_bw need
// to be available at the scope 'node'. If a job has a lot of nodes,
// statisticsSeries should be available so that a min/median/max Graph can be
// used instead of a lot of single lines.
// NOTE: New StatsSeries will always be calculated as 'min/median/max'
// Existing (archived) StatsSeries can be 'min/mean/max'!
const maxSeriesSize int = 15
for _, scopes := range jd {
for _, jm := range scopes {
if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize {
continue
}
jm.AddStatisticsSeries()
}
}
nodeScopeRequested := false
for _, scope := range scopes {
if scope == schema.MetricScopeNode {
nodeScopeRequested = true
}
}
if nodeScopeRequested {
jd.AddNodeScope("flops_any")
jd.AddNodeScope("mem_bw")
}
// Round Resulting Stat Values
jd.RoundMetricStats()
return jd, ttl, size
})
if err, ok := data.(error); ok {
cclog.Error("Error in returned dataset")
return nil, err
}
return data.(schema.JobData), nil
}
// Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize.
func LoadAverages(
job *schema.Job,
metrics []string,
data [][]schema.Float,
ctx context.Context,
) error {
if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here?
}
repo, err := metricdata.GetMetricDataRepo(job.Cluster)
if err != nil {
return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster)
}
stats, err := repo.LoadStats(job, metrics, ctx) // #166 how to handle stats for acc normalizazion?
if err != nil {
cclog.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project)
return err
}
for i, m := range metrics {
nodes, ok := stats[m]
if !ok {
data[i] = append(data[i], schema.NaN)
continue
}
sum := 0.0
for _, node := range nodes {
sum += node.Avg
}
data[i] = append(data[i], schema.Float(sum))
}
return nil
}
// Used for statsTable in frontend: Return scoped statistics by metric.
func LoadScopedJobStats(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
) (schema.ScopedJobStats, error) {
if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
return archive.LoadScopedStatsFromArchive(job, metrics, scopes)
}
repo, err := metricdata.GetMetricDataRepo(job.Cluster)
if err != nil {
return nil, fmt.Errorf("job %d: no metric data repository configured for '%s'", job.JobID, job.Cluster)
}
scopedStats, err := repo.LoadScopedStats(job, metrics, scopes, ctx)
if err != nil {
cclog.Errorf("error while loading scoped statistics for job %d (User %s, Project %s)", job.JobID, job.User, job.Project)
return nil, err
}
return scopedStats, nil
}
// Used for polar plots in frontend: Aggregates statistics for all nodes to single values for job per metric.
func LoadJobStats(
job *schema.Job,
metrics []string,
ctx context.Context,
) (map[string]schema.MetricStatistics, error) {
if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
return archive.LoadStatsFromArchive(job, metrics)
}
data := make(map[string]schema.MetricStatistics, len(metrics))
repo, err := metricdata.GetMetricDataRepo(job.Cluster)
if err != nil {
return data, fmt.Errorf("job %d: no metric data repository configured for '%s'", job.JobID, job.Cluster)
}
stats, err := repo.LoadStats(job, metrics, ctx)
if err != nil {
cclog.Errorf("error while loading statistics for job %d (User %s, Project %s)", job.JobID, job.User, job.Project)
return data, err
}
for _, m := range metrics {
sum, avg, min, max := 0.0, 0.0, 0.0, 0.0
nodes, ok := stats[m]
if !ok {
data[m] = schema.MetricStatistics{Min: min, Avg: avg, Max: max}
continue
}
for _, node := range nodes {
sum += node.Avg
min = math.Min(min, node.Min)
max = math.Max(max, node.Max)
}
data[m] = schema.MetricStatistics{
Avg: (math.Round((sum/float64(job.NumNodes))*100) / 100),
Min: (math.Round(min*100) / 100),
Max: (math.Round(max*100) / 100),
}
}
return data, nil
}
// Used for the classic node/system view. Returns a map of nodes to a map of metrics.
func LoadNodeData(
cluster string,
metrics, nodes []string,
scopes []schema.MetricScope,
from, to time.Time,
ctx context.Context,
) (map[string]map[string][]*schema.JobMetric, error) {
repo, err := metricdata.GetMetricDataRepo(cluster)
if err != nil {
return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster)
}
if metrics == nil {
for _, m := range archive.GetCluster(cluster).MetricConfig {
metrics = append(metrics, m.Name)
}
}
data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
if err != nil {
if len(data) != 0 {
cclog.Warnf("partial error: %s", err.Error())
} else {
cclog.Error("Error while loading node data from metric repository")
return nil, err
}
}
if data == nil {
return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster)
}
return data, nil
}
func LoadNodeListData(
cluster, subCluster string,
nodes []string,
metrics []string,
scopes []schema.MetricScope,
resolution int,
from, to time.Time,
ctx context.Context,
) (map[string]schema.JobData, error) {
repo, err := metricdata.GetMetricDataRepo(cluster)
if err != nil {
return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster)
}
if metrics == nil {
for _, m := range archive.GetCluster(cluster).MetricConfig {
metrics = append(metrics, m.Name)
}
}
data, err := repo.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx)
if err != nil {
if len(data) != 0 {
cclog.Warnf("partial error: %s", err.Error())
} else {
cclog.Error("Error while loading node data from metric repository")
return nil, err
}
}
// NOTE: New StatsSeries will always be calculated as 'min/median/max'
const maxSeriesSize int = 8
for _, jd := range data {
for _, scopes := range jd {
for _, jm := range scopes {
if jm.StatisticsSeries != nil || len(jm.Series) < maxSeriesSize {
continue
}
jm.AddStatisticsSeries()
}
}
}
if data == nil {
return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster)
}
return data, nil
}

View File

@@ -2,6 +2,7 @@
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package metricdata
import (
@@ -11,6 +12,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"
@@ -21,7 +23,7 @@ import (
type CCMetricStoreConfig struct {
Kind string `json:"kind"`
Url string `json:"url"`
URL string `json:"url"`
Token string `json:"token"`
// If metrics are known to this MetricDataRepository under a different
@@ -39,9 +41,9 @@ type CCMetricStore struct {
queryEndpoint string
}
type ApiQueryRequest struct {
type APIQueryRequest struct {
Cluster string `json:"cluster"`
Queries []ApiQuery `json:"queries"`
Queries []APIQuery `json:"queries"`
ForAllNodes []string `json:"for-all-nodes"`
From int64 `json:"from"`
To int64 `json:"to"`
@@ -49,7 +51,7 @@ type ApiQueryRequest struct {
WithData bool `json:"with-data"`
}
type ApiQuery struct {
type APIQuery struct {
Type *string `json:"type,omitempty"`
SubType *string `json:"subtype,omitempty"`
Metric string `json:"metric"`
@@ -60,12 +62,12 @@ type ApiQuery struct {
Aggregate bool `json:"aggreg"`
}
type ApiQueryResponse struct {
Queries []ApiQuery `json:"queries,omitempty"`
Results [][]ApiMetricData `json:"results"`
type APIQueryResponse struct {
Queries []APIQuery `json:"queries,omitempty"`
Results [][]APIMetricData `json:"results"`
}
type ApiMetricData struct {
type APIMetricData struct {
Error *string `json:"error"`
Data []schema.Float `json:"data"`
From int64 `json:"from"`
@@ -76,6 +78,14 @@ type ApiMetricData struct {
Max schema.Float `json:"max"`
}
var (
hwthreadString = string(schema.MetricScopeHWThread)
coreString = string(schema.MetricScopeCore)
memoryDomainString = string(schema.MetricScopeMemoryDomain)
socketString = string(schema.MetricScopeSocket)
acceleratorString = string(schema.MetricScopeAccelerator)
)
func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error {
var config CCMetricStoreConfig
if err := json.Unmarshal(rawConfig, &config); err != nil {
@@ -83,8 +93,8 @@ func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error {
return err
}
ccms.url = config.Url
ccms.queryEndpoint = fmt.Sprintf("%s/api/query", config.Url)
ccms.url = config.URL
ccms.queryEndpoint = fmt.Sprintf("%s/api/query", config.URL)
ccms.jwt = config.Token
ccms.client = http.Client{
Timeout: 10 * time.Second,
@@ -122,8 +132,8 @@ func (ccms *CCMetricStore) toLocalName(metric string) string {
func (ccms *CCMetricStore) doRequest(
ctx context.Context,
body *ApiQueryRequest,
) (*ApiQueryResponse, error) {
body *APIQueryRequest,
) (*APIQueryResponse, error) {
buf := &bytes.Buffer{}
if err := json.NewEncoder(buf).Encode(body); err != nil {
cclog.Errorf("Error while encoding request body: %s", err.Error())
@@ -156,7 +166,7 @@ func (ccms *CCMetricStore) doRequest(
return nil, fmt.Errorf("'%s': HTTP Status: %s", ccms.queryEndpoint, res.Status)
}
var resBody ApiQueryResponse
var resBody APIQueryResponse
if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&resBody); err != nil {
cclog.Errorf("Error while decoding result body: %s", err.Error())
return nil, err
@@ -178,7 +188,7 @@ func (ccms *CCMetricStore) LoadData(
return nil, err
}
req := ApiQueryRequest{
req := APIQueryRequest{
Cluster: job.Cluster,
From: job.StartTime,
To: job.StartTime + int64(job.Duration),
@@ -272,8 +282,8 @@ func (ccms *CCMetricStore) buildQueries(
metrics []string,
scopes []schema.MetricScope,
resolution int,
) ([]ApiQuery, []schema.MetricScope, error) {
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
) ([]APIQuery, []schema.MetricScope, error) {
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
assignedScope := []schema.MetricScope{}
subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster)
@@ -336,7 +346,7 @@ func (ccms *CCMetricStore) buildQueries(
continue
}
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
@@ -354,7 +364,7 @@ func (ccms *CCMetricStore) buildQueries(
continue
}
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
@@ -368,7 +378,7 @@ func (ccms *CCMetricStore) buildQueries(
// HWThread -> HWThead
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread {
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
@@ -384,7 +394,7 @@ func (ccms *CCMetricStore) buildQueries(
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
for _, core := range cores {
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
@@ -401,7 +411,7 @@ func (ccms *CCMetricStore) buildQueries(
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
for _, socket := range sockets {
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
@@ -416,7 +426,7 @@ func (ccms *CCMetricStore) buildQueries(
// HWThread -> Node
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode {
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
@@ -431,7 +441,7 @@ func (ccms *CCMetricStore) buildQueries(
// Core -> Core
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
@@ -447,7 +457,7 @@ func (ccms *CCMetricStore) buildQueries(
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromCores(hwthreads)
for _, socket := range sockets {
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
@@ -463,7 +473,7 @@ func (ccms *CCMetricStore) buildQueries(
// Core -> Node
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
@@ -478,7 +488,7 @@ func (ccms *CCMetricStore) buildQueries(
// MemoryDomain -> MemoryDomain
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain {
sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
@@ -493,7 +503,7 @@ func (ccms *CCMetricStore) buildQueries(
// MemoryDoman -> Node
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode {
sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
@@ -508,7 +518,7 @@ func (ccms *CCMetricStore) buildQueries(
// Socket -> Socket
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: false,
@@ -523,7 +533,7 @@ func (ccms *CCMetricStore) buildQueries(
// Socket -> Node
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: host.Hostname,
Aggregate: true,
@@ -537,7 +547,7 @@ func (ccms *CCMetricStore) buildQueries(
// Node -> Node
if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode {
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: host.Hostname,
Resolution: resolution,
@@ -559,14 +569,13 @@ func (ccms *CCMetricStore) LoadStats(
metrics []string,
ctx context.Context,
) (map[string]map[string]schema.MetricStatistics, error) {
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization?
if err != nil {
cclog.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error())
return nil, err
}
req := ApiQueryRequest{
req := APIQueryRequest{
Cluster: job.Cluster,
From: job.StartTime,
To: job.StartTime + int64(job.Duration),
@@ -612,7 +621,6 @@ func (ccms *CCMetricStore) LoadStats(
return stats, nil
}
// Used for Job-View Statistics Table
func (ccms *CCMetricStore) LoadScopedStats(
job *schema.Job,
metrics []string,
@@ -625,7 +633,7 @@ func (ccms *CCMetricStore) LoadScopedStats(
return nil, err
}
req := ApiQueryRequest{
req := APIQueryRequest{
Cluster: job.Cluster,
From: job.StartTime,
To: job.StartTime + int64(job.Duration),
@@ -703,7 +711,6 @@ func (ccms *CCMetricStore) LoadScopedStats(
return scopedJobStats, nil
}
// Used for Systems-View Node-Overview
func (ccms *CCMetricStore) LoadNodeData(
cluster string,
metrics, nodes []string,
@@ -711,7 +718,7 @@ func (ccms *CCMetricStore) LoadNodeData(
from, to time.Time,
ctx context.Context,
) (map[string]map[string][]*schema.JobMetric, error) {
req := ApiQueryRequest{
req := APIQueryRequest{
Cluster: cluster,
From: from.Unix(),
To: to.Unix(),
@@ -726,7 +733,7 @@ func (ccms *CCMetricStore) LoadNodeData(
} else {
for _, node := range nodes {
for _, metric := range metrics {
req.Queries = append(req.Queries, ApiQuery{
req.Queries = append(req.Queries, APIQuery{
Hostname: node,
Metric: ccms.toRemoteName(metric),
Resolution: 0, // Default for Node Queries: Will return metric $Timestep Resolution
@@ -744,7 +751,7 @@ func (ccms *CCMetricStore) LoadNodeData(
var errors []string
data := make(map[string]map[string][]*schema.JobMetric)
for i, res := range resBody.Results {
var query ApiQuery
var query APIQuery
if resBody.Queries != nil {
query = resBody.Queries[i]
} else {
@@ -795,7 +802,6 @@ func (ccms *CCMetricStore) LoadNodeData(
return data, nil
}
// Used for Systems-View Node-List
func (ccms *CCMetricStore) LoadNodeListData(
cluster, subCluster string,
nodes []string,
@@ -805,7 +811,6 @@ func (ccms *CCMetricStore) LoadNodeListData(
from, to time.Time,
ctx context.Context,
) (map[string]schema.JobData, error) {
// Note: Order of node data is not guaranteed after this point
queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, resolution)
if err != nil {
@@ -813,7 +818,7 @@ func (ccms *CCMetricStore) LoadNodeListData(
return nil, err
}
req := ApiQueryRequest{
req := APIQueryRequest{
Cluster: cluster,
Queries: queries,
From: from.Unix(),
@@ -831,7 +836,7 @@ func (ccms *CCMetricStore) LoadNodeListData(
var errors []string
data := make(map[string]schema.JobData)
for i, row := range resBody.Results {
var query ApiQuery
var query APIQuery
if resBody.Queries != nil {
query = resBody.Queries[i]
} else {
@@ -918,9 +923,8 @@ func (ccms *CCMetricStore) buildNodeQueries(
metrics []string,
scopes []schema.MetricScope,
resolution int,
) ([]ApiQuery, []schema.MetricScope, error) {
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(nodes))
) ([]APIQuery, []schema.MetricScope, error) {
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes))
assignedScope := []schema.MetricScope{}
// Get Topol before loop if subCluster given
@@ -1003,7 +1007,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
continue
}
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: hostname,
Aggregate: false,
@@ -1021,7 +1025,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
continue
}
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: hostname,
Aggregate: true,
@@ -1035,7 +1039,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
// HWThread -> HWThead
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread {
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: hostname,
Aggregate: false,
@@ -1051,7 +1055,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(topology.Node)
for _, core := range cores {
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: hostname,
Aggregate: true,
@@ -1068,7 +1072,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(topology.Node)
for _, socket := range sockets {
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: hostname,
Aggregate: true,
@@ -1083,7 +1087,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
// HWThread -> Node
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode {
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: hostname,
Aggregate: true,
@@ -1098,7 +1102,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
// Core -> Core
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(topology.Node)
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: hostname,
Aggregate: false,
@@ -1114,7 +1118,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromCores(topology.Node)
for _, socket := range sockets {
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: hostname,
Aggregate: true,
@@ -1130,7 +1134,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
// Core -> Node
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode {
cores, _ := topology.GetCoresFromHWThreads(topology.Node)
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: hostname,
Aggregate: true,
@@ -1145,7 +1149,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
// MemoryDomain -> MemoryDomain
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain {
sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node)
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: hostname,
Aggregate: false,
@@ -1160,7 +1164,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
// MemoryDoman -> Node
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode {
sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node)
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: hostname,
Aggregate: true,
@@ -1175,7 +1179,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
// Socket -> Socket
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(topology.Node)
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: hostname,
Aggregate: false,
@@ -1190,7 +1194,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
// Socket -> Node
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode {
sockets, _ := topology.GetSocketsFromHWThreads(topology.Node)
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: hostname,
Aggregate: true,
@@ -1204,7 +1208,7 @@ func (ccms *CCMetricStore) buildNodeQueries(
// Node -> Node
if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode {
queries = append(queries, ApiQuery{
queries = append(queries, APIQuery{
Metric: remoteName,
Hostname: hostname,
Resolution: resolution,
@@ -1220,3 +1224,11 @@ func (ccms *CCMetricStore) buildNodeQueries(
return queries, assignedScope, nil
}
func intToStringSlice(is []int) []string {
ss := make([]string, len(is))
for i, x := range is {
ss[i] = strconv.Itoa(x)
}
return ss
}

View File

@@ -12,7 +12,6 @@ import (
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/memorystore"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
)
@@ -38,51 +37,91 @@ type MetricDataRepository interface {
LoadNodeListData(cluster, subCluster string, nodes, metrics []string, scopes []schema.MetricScope, resolution int, from, to time.Time, ctx context.Context) (map[string]schema.JobData, error)
}
var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{}
var upstreamMetricDataRepo MetricDataRepository
func Init() error {
for _, cluster := range config.Clusters {
if cluster.MetricDataRepository != nil {
var kind struct {
Kind string `json:"kind"`
}
if err := json.Unmarshal(cluster.MetricDataRepository, &kind); err != nil {
cclog.Warn("Error while unmarshaling raw json MetricDataRepository")
return err
}
// func Init() error {
// for _, cluster := range config.Clusters {
// if cluster.MetricDataRepository != nil {
// var kind struct {
// Kind string `json:"kind"`
// }
// if err := json.Unmarshal(cluster.MetricDataRepository, &kind); err != nil {
// cclog.Warn("Error while unmarshaling raw json MetricDataRepository")
// return err
// }
//
// var mdr MetricDataRepository
// switch kind.Kind {
// case "cc-metric-store":
// mdr = &CCMetricStore{}
// case "prometheus":
// mdr = &PrometheusDataRepository{}
// case "test":
// mdr = &TestMetricDataRepository{}
// default:
// return fmt.Errorf("METRICDATA/METRICDATA > Unknown MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name)
// }
//
// if err := mdr.Init(cluster.MetricDataRepository); err != nil {
// cclog.Errorf("Error initializing MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name)
// return err
// }
// metricDataRepos[cluster.Name] = mdr
// }
// }
// return nil
// }
var mdr MetricDataRepository
switch kind.Kind {
case "cc-metric-store":
mdr = &CCMetricStore{}
case "cc-metric-store-internal":
mdr = &CCMetricStoreInternal{}
memorystore.InternalCCMSFlag = true
case "prometheus":
mdr = &PrometheusDataRepository{}
case "test":
mdr = &TestMetricDataRepository{}
default:
return fmt.Errorf("METRICDATA/METRICDATA > Unknown MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name)
}
// func GetMetricDataRepo(cluster string) (MetricDataRepository, error) {
// var err error
// repo, ok := metricDataRepos[cluster]
//
// if !ok {
// err = fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster)
// }
//
// return repo, err
// }
if err := mdr.Init(cluster.MetricDataRepository); err != nil {
cclog.Errorf("Error initializing MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name)
return err
}
metricDataRepos[cluster.Name] = mdr
}
// InitUpstreamRepos initializes global upstream metric data repository for the pull worker
func InitUpstreamRepos() error {
if config.Keys.UpstreamMetricRepository == nil {
return nil
}
var kind struct {
Kind string `json:"kind"`
}
if err := json.Unmarshal(*config.Keys.UpstreamMetricRepository, &kind); err != nil {
cclog.Warn("Error while unmarshaling raw json UpstreamMetricRepository")
return err
}
var mdr MetricDataRepository
switch kind.Kind {
case "cc-metric-store":
mdr = &CCMetricStore{}
case "prometheus":
mdr = &PrometheusDataRepository{}
case "test":
mdr = &TestMetricDataRepository{}
default:
return fmt.Errorf("METRICDATA/METRICDATA > Unknown UpstreamMetricRepository %v", kind.Kind)
}
if err := mdr.Init(*config.Keys.UpstreamMetricRepository); err != nil {
cclog.Errorf("Error initializing UpstreamMetricRepository %v", kind.Kind)
return err
}
upstreamMetricDataRepo = mdr
cclog.Infof("Initialized global upstream metric repository '%s'", kind.Kind)
return nil
}
func GetMetricDataRepo(cluster string) (MetricDataRepository, error) {
var err error
repo, ok := metricDataRepos[cluster]
if !ok {
err = fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster)
// GetUpstreamMetricDataRepo returns the global upstream metric data repository
func GetUpstreamMetricDataRepo() (MetricDataRepository, error) {
if upstreamMetricDataRepo == nil {
return nil, fmt.Errorf("METRICDATA/METRICDATA > no upstream metric data repository configured")
}
return repo, err
return upstreamMetricDataRepo, nil
}

View File

@@ -2,6 +2,7 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package metricdata
import (

View File

@@ -72,47 +72,3 @@ func (tmdr *TestMetricDataRepository) LoadNodeListData(
) (map[string]schema.JobData, error) {
panic("TODO")
}
func DeepCopy(jdTemp schema.JobData) schema.JobData {
jd := make(schema.JobData, len(jdTemp))
for k, v := range jdTemp {
jd[k] = make(map[schema.MetricScope]*schema.JobMetric, len(jdTemp[k]))
for k_, v_ := range v {
jd[k][k_] = new(schema.JobMetric)
jd[k][k_].Series = make([]schema.Series, len(v_.Series))
for i := 0; i < len(v_.Series); i += 1 {
jd[k][k_].Series[i].Data = make([]schema.Float, len(v_.Series[i].Data))
copy(jd[k][k_].Series[i].Data, v_.Series[i].Data)
jd[k][k_].Series[i].Hostname = v_.Series[i].Hostname
jd[k][k_].Series[i].Id = v_.Series[i].Id
jd[k][k_].Series[i].Statistics.Avg = v_.Series[i].Statistics.Avg
jd[k][k_].Series[i].Statistics.Min = v_.Series[i].Statistics.Min
jd[k][k_].Series[i].Statistics.Max = v_.Series[i].Statistics.Max
}
jd[k][k_].Timestep = v_.Timestep
jd[k][k_].Unit.Base = v_.Unit.Base
jd[k][k_].Unit.Prefix = v_.Unit.Prefix
if v_.StatisticsSeries != nil {
// Init Slices
jd[k][k_].StatisticsSeries = new(schema.StatsSeries)
jd[k][k_].StatisticsSeries.Max = make([]schema.Float, len(v_.StatisticsSeries.Max))
jd[k][k_].StatisticsSeries.Min = make([]schema.Float, len(v_.StatisticsSeries.Min))
jd[k][k_].StatisticsSeries.Median = make([]schema.Float, len(v_.StatisticsSeries.Median))
jd[k][k_].StatisticsSeries.Mean = make([]schema.Float, len(v_.StatisticsSeries.Mean))
// Copy Data
copy(jd[k][k_].StatisticsSeries.Max, v_.StatisticsSeries.Max)
copy(jd[k][k_].StatisticsSeries.Min, v_.StatisticsSeries.Min)
copy(jd[k][k_].StatisticsSeries.Median, v_.StatisticsSeries.Median)
copy(jd[k][k_].StatisticsSeries.Mean, v_.StatisticsSeries.Mean)
// Handle Percentiles
for k__, v__ := range v_.StatisticsSeries.Percentiles {
jd[k][k_].StatisticsSeries.Percentiles[k__] = make([]schema.Float, len(v__))
copy(jd[k][k_].StatisticsSeries.Percentiles[k__], v__)
}
} else {
jd[k][k_].StatisticsSeries = v_.StatisticsSeries
}
}
}
return jd
}

View File

@@ -0,0 +1,490 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
// Package metricdispatcher provides a unified interface for loading and caching job metric data.
//
// This package serves as a central dispatcher that routes metric data requests to the appropriate
// backend based on job state. For running jobs, data is fetched from the metric store (e.g., cc-metric-store).
// For completed jobs, data is retrieved from the file-based job archive.
//
// # Key Features
//
// - Automatic backend selection based on job state (running vs. archived)
// - LRU cache for performance optimization (128 MB default cache size)
// - Data resampling using Largest Triangle Three Bucket algorithm for archived data
// - Automatic statistics series generation for jobs with many nodes
// - Support for scoped metrics (node, socket, accelerator, core)
//
// # Cache Behavior
//
// Cached data has different TTL (time-to-live) values depending on job state:
// - Running jobs: 2 minutes (data changes frequently)
// - Completed jobs: 5 hours (data is static)
//
// The cache key is based on job ID, state, requested metrics, scopes, and resolution.
//
// # Usage
//
// The primary entry point is LoadData, which automatically handles both running and archived jobs:
//
// jobData, err := metricdispatcher.LoadData(job, metrics, scopes, ctx, resolution)
// if err != nil {
// // Handle error
// }
//
// For statistics only, use LoadJobStats, LoadScopedJobStats, or LoadAverages depending on the required format.
package metricdispatcher
import (
"context"
"fmt"
"math"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/memorystore"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/lrucache"
"github.com/ClusterCockpit/cc-lib/resampler"
"github.com/ClusterCockpit/cc-lib/schema"
)
// cache is an LRU cache with 128 MB capacity for storing loaded job metric data.
// The cache reduces load on both the metric store and archive backends.
var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
// cacheKey generates a unique cache key for a job's metric data based on job ID, state,
// requested metrics, scopes, and resolution. Duration and StartTime are intentionally excluded
// because job.ID is more unique and the cache TTL ensures entries don't persist indefinitely.
func cacheKey(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
resolution int,
) string {
return fmt.Sprintf("%d(%s):[%v],[%v]-%d",
job.ID, job.State, metrics, scopes, resolution)
}
// LoadData retrieves metric data for a job from the appropriate backend (memory store for running jobs,
// archive for completed jobs) and applies caching, resampling, and statistics generation as needed.
//
// For running jobs or when archive is disabled, data is fetched from the metric store.
// For completed archived jobs, data is loaded from the job archive and resampled if needed.
//
// Parameters:
// - job: The job for which to load metric data
// - metrics: List of metric names to load (nil loads all metrics for the cluster)
// - scopes: Metric scopes to include (nil defaults to node scope)
// - ctx: Context for cancellation and timeouts
// - resolution: Target number of data points for resampling (only applies to archived data)
//
// Returns the loaded job data and any error encountered. For partial errors (some metrics failed),
// the function returns the successfully loaded data with a warning logged.
func LoadData(job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
resolution int,
) (schema.JobData, error) {
data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ any, ttl time.Duration, size int) {
var jd schema.JobData
var err error
if job.State == schema.JobStateRunning ||
job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving ||
config.Keys.DisableArchive {
if scopes == nil {
scopes = append(scopes, schema.MetricScopeNode)
}
if metrics == nil {
cluster := archive.GetCluster(job.Cluster)
for _, mc := range cluster.MetricConfig {
metrics = append(metrics, mc.Name)
}
}
jd, err = memorystore.LoadData(job, metrics, scopes, ctx, resolution)
if err != nil {
if len(jd) != 0 {
cclog.Warnf("partial error loading metrics from store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
} else {
cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
return err, 0, 0
}
}
size = jd.Size()
} else {
var jdTemp schema.JobData
jdTemp, err = archive.GetHandle().LoadJobData(job)
if err != nil {
cclog.Errorf("failed to load job data from archive for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
return err, 0, 0
}
jd = deepCopy(jdTemp)
// Resample archived data using Largest Triangle Three Bucket algorithm to reduce data points
// to the requested resolution, improving transfer performance and client-side rendering.
for _, v := range jd {
for _, v_ := range v {
timestep := int64(0)
for i := 0; i < len(v_.Series); i += 1 {
v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, int64(v_.Timestep), int64(resolution))
if err != nil {
return err, 0, 0
}
}
v_.Timestep = int(timestep)
}
}
// Filter job data to only include requested metrics and scopes, avoiding unnecessary data transfer.
if metrics != nil || scopes != nil {
if metrics == nil {
metrics = make([]string, 0, len(jd))
for k := range jd {
metrics = append(metrics, k)
}
}
res := schema.JobData{}
for _, metric := range metrics {
if perscope, ok := jd[metric]; ok {
if len(perscope) > 1 {
subset := make(map[schema.MetricScope]*schema.JobMetric)
for _, scope := range scopes {
if jm, ok := perscope[scope]; ok {
subset[scope] = jm
}
}
if len(subset) > 0 {
perscope = subset
}
}
res[metric] = perscope
}
}
jd = res
}
size = jd.Size()
}
ttl = 5 * time.Hour
if job.State == schema.JobStateRunning {
ttl = 2 * time.Minute
}
// Generate statistics series for jobs with many nodes to enable min/median/max graphs
// instead of overwhelming the UI with individual node lines. Note that newly calculated
// statistics use min/median/max, while archived statistics may use min/mean/max.
const maxSeriesSize int = 15
for _, scopes := range jd {
for _, jm := range scopes {
if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize {
continue
}
jm.AddStatisticsSeries()
}
}
nodeScopeRequested := false
for _, scope := range scopes {
if scope == schema.MetricScopeNode {
nodeScopeRequested = true
}
}
if nodeScopeRequested {
jd.AddNodeScope("flops_any")
jd.AddNodeScope("mem_bw")
}
// Round Resulting Stat Values
jd.RoundMetricStats()
return jd, ttl, size
})
if err, ok := data.(error); ok {
cclog.Errorf("error in cached dataset for job %d: %s", job.JobID, err.Error())
return nil, err
}
return data.(schema.JobData), nil
}
// LoadAverages computes average values for the specified metrics across all nodes of a job.
// For running jobs, it loads statistics from the metric store. For completed jobs, it uses
// the pre-calculated averages from the job archive. The results are appended to the data slice.
func LoadAverages(
job *schema.Job,
metrics []string,
data [][]schema.Float,
ctx context.Context,
) error {
if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here?
}
stats, err := memorystore.LoadStats(job, metrics, ctx)
if err != nil {
cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
return err
}
for i, m := range metrics {
nodes, ok := stats[m]
if !ok {
data[i] = append(data[i], schema.NaN)
continue
}
sum := 0.0
for _, node := range nodes {
sum += node.Avg
}
data[i] = append(data[i], schema.Float(sum))
}
return nil
}
// LoadScopedJobStats retrieves job statistics organized by metric scope (node, socket, core, accelerator).
// For running jobs, statistics are computed from the metric store. For completed jobs, pre-calculated
// statistics are loaded from the job archive.
func LoadScopedJobStats(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
) (schema.ScopedJobStats, error) {
if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
return archive.LoadScopedStatsFromArchive(job, metrics, scopes)
}
scopedStats, err := memorystore.LoadScopedStats(job, metrics, scopes, ctx)
if err != nil {
cclog.Errorf("failed to load scoped statistics from metric store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
return nil, err
}
return scopedStats, nil
}
// LoadJobStats retrieves aggregated statistics (min/avg/max) for each requested metric across all job nodes.
// For running jobs, statistics are computed from the metric store. For completed jobs, pre-calculated
// statistics are loaded from the job archive.
func LoadJobStats(
job *schema.Job,
metrics []string,
ctx context.Context,
) (map[string]schema.MetricStatistics, error) {
if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
return archive.LoadStatsFromArchive(job, metrics)
}
data := make(map[string]schema.MetricStatistics, len(metrics))
stats, err := memorystore.LoadStats(job, metrics, ctx)
if err != nil {
cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
return data, err
}
for _, m := range metrics {
sum, avg, min, max := 0.0, 0.0, 0.0, 0.0
nodes, ok := stats[m]
if !ok {
data[m] = schema.MetricStatistics{Min: min, Avg: avg, Max: max}
continue
}
for _, node := range nodes {
sum += node.Avg
min = math.Min(min, node.Min)
max = math.Max(max, node.Max)
}
data[m] = schema.MetricStatistics{
Avg: (math.Round((sum/float64(job.NumNodes))*100) / 100),
Min: (math.Round(min*100) / 100),
Max: (math.Round(max*100) / 100),
}
}
return data, nil
}
// LoadNodeData retrieves metric data for specific nodes in a cluster within a time range.
// This is used for node monitoring views and system status pages. Data is always fetched from
// the metric store (not the archive) since it's for current/recent node status monitoring.
//
// Returns a nested map structure: node -> metric -> scoped data.
func LoadNodeData(
cluster string,
metrics, nodes []string,
scopes []schema.MetricScope,
from, to time.Time,
ctx context.Context,
) (map[string]map[string][]*schema.JobMetric, error) {
if metrics == nil {
for _, m := range archive.GetCluster(cluster).MetricConfig {
metrics = append(metrics, m.Name)
}
}
data, err := memorystore.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
if err != nil {
if len(data) != 0 {
cclog.Warnf("partial error loading node data from metric store for cluster %s: %s", cluster, err.Error())
} else {
cclog.Errorf("failed to load node data from metric store for cluster %s: %s", cluster, err.Error())
return nil, err
}
}
if data == nil {
return nil, fmt.Errorf("metric store for cluster '%s' does not support node data queries", cluster)
}
return data, nil
}
// LoadNodeListData retrieves time-series metric data for multiple nodes within a time range,
// with optional resampling and automatic statistics generation for large datasets.
// This is used for comparing multiple nodes or displaying node status over time.
//
// Returns a map of node names to their job-like metric data structures.
func LoadNodeListData(
cluster, subCluster string,
nodes []string,
metrics []string,
scopes []schema.MetricScope,
resolution int,
from, to time.Time,
ctx context.Context,
) (map[string]schema.JobData, error) {
if metrics == nil {
for _, m := range archive.GetCluster(cluster).MetricConfig {
metrics = append(metrics, m.Name)
}
}
data, err := memorystore.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx)
if err != nil {
if len(data) != 0 {
cclog.Warnf("partial error loading node list data from metric store for cluster %s, subcluster %s: %s",
cluster, subCluster, err.Error())
} else {
cclog.Errorf("failed to load node list data from metric store for cluster %s, subcluster %s: %s",
cluster, subCluster, err.Error())
return nil, err
}
}
// Generate statistics series for datasets with many series to improve visualization performance.
// Statistics are calculated as min/median/max.
const maxSeriesSize int = 8
for _, jd := range data {
for _, scopes := range jd {
for _, jm := range scopes {
if jm.StatisticsSeries != nil || len(jm.Series) < maxSeriesSize {
continue
}
jm.AddStatisticsSeries()
}
}
}
if data == nil {
return nil, fmt.Errorf("metric store for cluster '%s' does not support node list queries", cluster)
}
return data, nil
}
// deepCopy creates a deep copy of JobData to prevent cache corruption when modifying
// archived data (e.g., during resampling). This ensures the cached archive data remains
// immutable while allowing per-request transformations.
func deepCopy(source schema.JobData) schema.JobData {
result := make(schema.JobData, len(source))
for metricName, scopeMap := range source {
result[metricName] = make(map[schema.MetricScope]*schema.JobMetric, len(scopeMap))
for scope, jobMetric := range scopeMap {
result[metricName][scope] = copyJobMetric(jobMetric)
}
}
return result
}
func copyJobMetric(src *schema.JobMetric) *schema.JobMetric {
dst := &schema.JobMetric{
Timestep: src.Timestep,
Unit: src.Unit,
Series: make([]schema.Series, len(src.Series)),
}
for i := range src.Series {
dst.Series[i] = copySeries(&src.Series[i])
}
if src.StatisticsSeries != nil {
dst.StatisticsSeries = copyStatisticsSeries(src.StatisticsSeries)
}
return dst
}
func copySeries(src *schema.Series) schema.Series {
dst := schema.Series{
Hostname: src.Hostname,
Id: src.Id,
Statistics: src.Statistics,
Data: make([]schema.Float, len(src.Data)),
}
copy(dst.Data, src.Data)
return dst
}
func copyStatisticsSeries(src *schema.StatsSeries) *schema.StatsSeries {
dst := &schema.StatsSeries{
Min: make([]schema.Float, len(src.Min)),
Mean: make([]schema.Float, len(src.Mean)),
Median: make([]schema.Float, len(src.Median)),
Max: make([]schema.Float, len(src.Max)),
}
copy(dst.Min, src.Min)
copy(dst.Mean, src.Mean)
copy(dst.Median, src.Median)
copy(dst.Max, src.Max)
if len(src.Percentiles) > 0 {
dst.Percentiles = make(map[int][]schema.Float, len(src.Percentiles))
for percentile, values := range src.Percentiles {
dst.Percentiles[percentile] = make([]schema.Float, len(values))
copy(dst.Percentiles[percentile], values)
}
}
return dst
}

View File

@@ -0,0 +1,125 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package metricdispatcher
import (
"testing"
"github.com/ClusterCockpit/cc-lib/schema"
)
func TestDeepCopy(t *testing.T) {
nodeId := "0"
original := schema.JobData{
"cpu_load": {
schema.MetricScopeNode: &schema.JobMetric{
Timestep: 60,
Unit: schema.Unit{Base: "load", Prefix: ""},
Series: []schema.Series{
{
Hostname: "node001",
Id: &nodeId,
Data: []schema.Float{1.0, 2.0, 3.0},
Statistics: schema.MetricStatistics{
Min: 1.0,
Avg: 2.0,
Max: 3.0,
},
},
},
StatisticsSeries: &schema.StatsSeries{
Min: []schema.Float{1.0, 1.5, 2.0},
Mean: []schema.Float{2.0, 2.5, 3.0},
Median: []schema.Float{2.0, 2.5, 3.0},
Max: []schema.Float{3.0, 3.5, 4.0},
Percentiles: map[int][]schema.Float{
25: {1.5, 2.0, 2.5},
75: {2.5, 3.0, 3.5},
},
},
},
},
}
copied := deepCopy(original)
original["cpu_load"][schema.MetricScopeNode].Series[0].Data[0] = 999.0
original["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Min[0] = 888.0
original["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Percentiles[25][0] = 777.0
if copied["cpu_load"][schema.MetricScopeNode].Series[0].Data[0] != 1.0 {
t.Errorf("Series data was not deeply copied: got %v, want 1.0",
copied["cpu_load"][schema.MetricScopeNode].Series[0].Data[0])
}
if copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Min[0] != 1.0 {
t.Errorf("StatisticsSeries was not deeply copied: got %v, want 1.0",
copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Min[0])
}
if copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Percentiles[25][0] != 1.5 {
t.Errorf("Percentiles was not deeply copied: got %v, want 1.5",
copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Percentiles[25][0])
}
if copied["cpu_load"][schema.MetricScopeNode].Timestep != 60 {
t.Errorf("Timestep not copied correctly: got %v, want 60",
copied["cpu_load"][schema.MetricScopeNode].Timestep)
}
if copied["cpu_load"][schema.MetricScopeNode].Series[0].Hostname != "node001" {
t.Errorf("Hostname not copied correctly: got %v, want node001",
copied["cpu_load"][schema.MetricScopeNode].Series[0].Hostname)
}
}
func TestDeepCopyNilStatisticsSeries(t *testing.T) {
original := schema.JobData{
"mem_used": {
schema.MetricScopeNode: &schema.JobMetric{
Timestep: 60,
Series: []schema.Series{
{
Hostname: "node001",
Data: []schema.Float{1.0, 2.0},
},
},
StatisticsSeries: nil,
},
},
}
copied := deepCopy(original)
if copied["mem_used"][schema.MetricScopeNode].StatisticsSeries != nil {
t.Errorf("StatisticsSeries should be nil, got %v",
copied["mem_used"][schema.MetricScopeNode].StatisticsSeries)
}
}
func TestDeepCopyEmptyPercentiles(t *testing.T) {
original := schema.JobData{
"cpu_load": {
schema.MetricScopeNode: &schema.JobMetric{
Timestep: 60,
Series: []schema.Series{},
StatisticsSeries: &schema.StatsSeries{
Min: []schema.Float{1.0},
Mean: []schema.Float{2.0},
Median: []schema.Float{2.0},
Max: []schema.Float{3.0},
Percentiles: nil,
},
},
},
}
copied := deepCopy(original)
if copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Percentiles != nil {
t.Errorf("Percentiles should be nil when source is nil/empty")
}
}

View File

@@ -42,7 +42,6 @@ func nodeTestSetup(t *testing.T) {
"clusters": [
{
"name": "testcluster",
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
"filterRanges": {
"numNodes": { "from": 1, "to": 64 },
"duration": { "from": 0, "to": 86400 },

View File

@@ -12,7 +12,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatcher"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
@@ -766,7 +766,7 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram(
continue
}
if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil {
if err := metricdispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil {
cclog.Errorf("Error while loading averages for histogram: %s", err)
return nil
}

Binary file not shown.

View File

@@ -31,7 +31,6 @@ func setupUserTest(t *testing.T) *UserCfgRepo {
"clusters": [
{
"name": "testcluster",
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
"filterRanges": {
"numNodes": { "from": 1, "to": 64 },
"duration": { "from": 0, "to": 86400 },

View File

@@ -0,0 +1,199 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskmanager
import (
"context"
"fmt"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/memorystore"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
"github.com/go-co-op/gocron/v2"
)
// RegisterMetricPullWorker registers a background worker that pulls metric data
// from upstream backends and populates the internal memorystore
func RegisterMetricPullWorker() {
d, err := parseDuration(Keys.MetricPullWorker)
if err != nil {
cclog.Warnf("Could not parse duration for metric pull worker, using default 60s")
d = 60 * time.Second
}
if d == 0 {
cclog.Info("Metric pull worker disabled (interval is zero)")
return
}
// Register one worker per cluster
registered := 0
for _, cluster := range config.Clusters {
_, err := s.NewJob(
gocron.DurationJob(d),
gocron.NewTask(func() {
if err := pullMetricsForCluster(cluster.Name); err != nil {
cclog.Errorf("Metric pull failed for cluster %s: %s", cluster.Name, err)
}
}),
gocron.WithStartAt(gocron.WithStartImmediately()),
)
if err != nil {
cclog.Errorf("Failed to register metric pull worker for cluster %s: %s", cluster.Name, err)
} else {
registered++
}
}
if registered > 0 {
cclog.Infof("Metric pull worker registered for %d clusters (interval: %s)", registered, d)
}
}
// pullMetricsForCluster pulls metric data for all nodes in a cluster from the upstream backend
func pullMetricsForCluster(clusterName string) error {
startTime := time.Now()
// 1. Get cluster configuration (includes all nodes)
cluster := archive.GetCluster(clusterName)
if cluster == nil {
return fmt.Errorf("cluster %s not found in configuration", clusterName)
}
// 2. Use nil for nodes to query all nodes in the cluster
// The LoadNodeData implementation will default to all nodes when nil is passed
var nodes []string = nil
// 3. Get all metrics for this cluster
metrics := []string{}
for _, mc := range cluster.MetricConfig {
metrics = append(metrics, mc.Name)
}
// 4. Determine time range (last 60 minutes)
to := time.Now()
from := to.Add(-60 * time.Minute)
// 5. Get upstream backend repository (from global config)
upstreamRepo, err := metricdata.GetUpstreamMetricDataRepo()
if err != nil {
cclog.Debugf("No upstream repository configured, skipping pull for cluster %s", clusterName)
return nil
}
// 6. Query upstream backend for ALL scopes
scopes := []schema.MetricScope{
schema.MetricScopeNode,
schema.MetricScopeCore,
schema.MetricScopeHWThread,
schema.MetricScopeAccelerator,
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
nodeData, err := upstreamRepo.LoadNodeData(
clusterName, metrics, nodes, scopes, from, to, ctx,
)
if err != nil {
cclog.Errorf("Failed to load node data for cluster %s: %s", clusterName, err)
return err
}
// 7. Write data to memorystore
ms := memorystore.GetMemoryStore()
nodesWritten := 0
metricsWritten := 0
errorsEncountered := 0
for node, metricMap := range nodeData {
for metricName, jobMetrics := range metricMap {
for _, jm := range jobMetrics {
if err := writeMetricToMemoryStore(ms, clusterName, node, metricName, jm, from); err != nil {
cclog.Warnf("Failed to write metric %s for node %s: %s", metricName, node, err)
errorsEncountered++
} else {
metricsWritten++
}
}
}
nodesWritten++
}
duration := time.Since(startTime)
if errorsEncountered > 0 {
cclog.Infof("Pulled metrics for cluster %s: %d nodes, %d metrics, %d errors (took %s)",
clusterName, nodesWritten, metricsWritten, errorsEncountered, duration)
} else {
cclog.Debugf("Pulled metrics for cluster %s: %d nodes, %d metrics (took %s)",
clusterName, nodesWritten, metricsWritten, duration)
}
return nil
}
// writeMetricToMemoryStore converts a JobMetric from upstream backend
// into memorystore format and writes it
func writeMetricToMemoryStore(
ms *memorystore.MemoryStore,
cluster, hostname, metricName string,
jm *schema.JobMetric,
fromTime time.Time,
) error {
// For each series (node-level or finer scope)
for _, series := range jm.Series {
// Build selector based on scope
selector := buildSelector(cluster, hostname, series)
// Convert series data to timestamped metric writes
timestep := int64(jm.Timestep)
baseTimestamp := fromTime.Unix()
for i, value := range series.Data {
ts := baseTimestamp + (int64(i) * timestep)
metric := memorystore.Metric{
Name: metricName,
Value: value,
}
if err := ms.Write(selector, ts, []memorystore.Metric{metric}); err != nil {
return fmt.Errorf("writing to memorystore: %w", err)
}
}
}
return nil
}
// buildSelector constructs a selector path for the memorystore
// based on cluster, hostname, and series information
func buildSelector(cluster, hostname string, series schema.Series) []string {
selector := []string{cluster, hostname}
// Add scope-specific components based on series metadata
// JobMetric.Series contains Id field that identifies the specific component
// Examples:
// - Node scope: series.Id might be nil or empty
// - Core scope: series.Id might be "0", "1", etc.
// - HWThread scope: series.Id might be "0", "1", etc.
// - Accelerator scope: series.Id might be "0", "1", etc.
if series.Id != nil && *series.Id != "" {
// The selector format in memorystore is: [cluster, host, "type+id", "subtype+id"]
// For example: ["emmy", "node001", "core0", "hwthread0"]
//
// The series.Id from upstream includes the type prefix (e.g., "core0", "hwthread1")
// We can use it directly as a selector component
selector = append(selector, *series.Id)
}
return selector
}

View File

@@ -34,6 +34,8 @@ type CronFrequency struct {
DurationWorker string `json:"duration-worker"`
// Metric-Footprint Update Worker [Defaults to '10m']
FootprintWorker string `json:"footprint-worker"`
// Metric Pull Worker [Defaults to '60s']
MetricPullWorker string `json:"metric-pull-worker"`
}
var (
@@ -114,6 +116,10 @@ func Start(cronCfg, archiveConfig json.RawMessage) {
RegisterLdapSyncService(lc.SyncInterval)
}
if config.Keys.UpstreamMetricRepository != nil {
RegisterMetricPullWorker()
}
RegisterFootprintWorker()
RegisterUpdateDurationWorker()
RegisterCommitJobService()
@@ -124,6 +130,8 @@ func Start(cronCfg, archiveConfig json.RawMessage) {
// Shutdown stops the task manager and its scheduler.
func Shutdown() {
if s != nil {
s.Shutdown()
if err := s.Shutdown(); err != nil {
cclog.Errorf("taskmanager Shutdown: error stopping scheduler: %v", err)
}
}
}

View File

@@ -10,7 +10,7 @@ import (
"math"
"time"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/memorystore"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
@@ -58,12 +58,6 @@ func RegisterFootprintWorker() {
allMetrics = append(allMetrics, mc.Name)
}
repo, err := metricdata.GetMetricDataRepo(cluster.Name)
if err != nil {
cclog.Errorf("no metric data repository configured for '%s'", cluster.Name)
continue
}
pendingStatements := []sq.UpdateBuilder{}
for _, job := range jobs {
@@ -72,7 +66,7 @@ func RegisterFootprintWorker() {
sJob := time.Now()
jobStats, err := repo.LoadStats(job, allMetrics, context.Background())
jobStats, err := memorystore.LoadStats(job, allMetrics, context.Background())
if err != nil {
cclog.Errorf("error wile loading job data stats for footprint update: %v", err)
ce++

View File

@@ -262,7 +262,7 @@ func RenderTemplate(rw http.ResponseWriter, file string, page *Page) {
if page.Clusters == nil {
for _, c := range config.Clusters {
page.Clusters = append(page.Clusters, config.ClusterConfig{Name: c.Name, FilterRanges: c.FilterRanges, MetricDataRepository: nil})
page.Clusters = append(page.Clusters, config.ClusterConfig{Name: c.Name, FilterRanges: c.FilterRanges})
}
}