diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 9464ccf..0fbdd35 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -288,6 +288,11 @@ func initSubsystems() error { return fmt.Errorf("initializing metricdata repository: %w", err) } + // Initialize upstream metricdata repositories for pull worker + if err := metricdata.InitUpstreamRepos(); err != nil { + return fmt.Errorf("initializing upstream metricdata repositories: %w", err) + } + // Handle database re-initialization if flagReinitDB { if err := importer.InitDB(); err != nil { diff --git a/internal/config/config.go b/internal/config/config.go index b7b8ed0..1c302f0 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -113,9 +113,10 @@ type FilterRanges struct { } type ClusterConfig struct { - Name string `json:"name"` - FilterRanges *FilterRanges `json:"filterRanges"` - MetricDataRepository json.RawMessage `json:"metricDataRepository"` + Name string `json:"name"` + FilterRanges *FilterRanges `json:"filterRanges"` + MetricDataRepository json.RawMessage `json:"metricDataRepository"` + UpstreamMetricRepository *json.RawMessage `json:"upstreamMetricRepository,omitempty"` } var Clusters []*ClusterConfig diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index 0748a8d..9a20001 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -39,6 +39,7 @@ type MetricDataRepository interface { } var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} +var upstreamMetricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} func Init() error { for _, cluster := range config.Clusters { @@ -86,3 +87,47 @@ func GetMetricDataRepo(cluster string) (MetricDataRepository, error) { return repo, err } + +// InitUpstreamRepos initializes upstream metric data repositories for the pull worker +func InitUpstreamRepos() error { + for _, cluster := range config.Clusters { + if cluster.UpstreamMetricRepository != nil { + var kind struct { + Kind string `json:"kind"` + } + if err := json.Unmarshal(*cluster.UpstreamMetricRepository, &kind); err != nil { + cclog.Warn("Error while unmarshaling raw json UpstreamMetricRepository") + return err + } + + var mdr MetricDataRepository + switch kind.Kind { + case "cc-metric-store": + mdr = &CCMetricStore{} + case "prometheus": + mdr = &PrometheusDataRepository{} + case "test": + mdr = &TestMetricDataRepository{} + default: + return fmt.Errorf("METRICDATA/METRICDATA > Unknown UpstreamMetricRepository %v for cluster %v", kind.Kind, cluster.Name) + } + + if err := mdr.Init(*cluster.UpstreamMetricRepository); err != nil { + cclog.Errorf("Error initializing UpstreamMetricRepository %v for cluster %v", kind.Kind, cluster.Name) + return err + } + upstreamMetricDataRepos[cluster.Name] = mdr + cclog.Infof("Initialized upstream metric repository '%s' for cluster '%s'", kind.Kind, cluster.Name) + } + } + return nil +} + +// GetUpstreamMetricDataRepo returns the upstream metric data repository for a given cluster +func GetUpstreamMetricDataRepo(cluster string) (MetricDataRepository, error) { + repo, ok := upstreamMetricDataRepos[cluster] + if !ok { + return nil, fmt.Errorf("METRICDATA/METRICDATA > no upstream metric data repository configured for '%s'", cluster) + } + return repo, nil +} diff --git a/internal/taskmanager/metricPullWorker.go b/internal/taskmanager/metricPullWorker.go new file mode 100644 index 0000000..8912314 --- /dev/null +++ b/internal/taskmanager/metricPullWorker.go @@ -0,0 +1,200 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package taskmanager + +import ( + "context" + "fmt" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/memorystore" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + cclog "github.com/ClusterCockpit/cc-lib/ccLogger" + "github.com/ClusterCockpit/cc-lib/schema" + "github.com/go-co-op/gocron/v2" +) + +// RegisterMetricPullWorker registers a background worker that pulls metric data +// from upstream backends and populates the internal memorystore +func RegisterMetricPullWorker() { + d, err := parseDuration(Keys.MetricPullWorker) + if err != nil { + cclog.Warnf("Could not parse duration for metric pull worker, using default 60s") + d = 60 * time.Second + } + + if d == 0 { + cclog.Info("Metric pull worker disabled (interval is zero)") + return + } + + // Register one worker per cluster + registered := 0 + for _, cluster := range config.Clusters { + cluster := cluster // capture for closure + + _, err := s.NewJob( + gocron.DurationJob(d), + gocron.NewTask(func() { + if err := pullMetricsForCluster(cluster.Name); err != nil { + cclog.Errorf("Metric pull failed for cluster %s: %s", cluster.Name, err) + } + }), + gocron.WithStartAt(gocron.WithStartImmediately()), + ) + if err != nil { + cclog.Errorf("Failed to register metric pull worker for cluster %s: %s", cluster.Name, err) + } else { + registered++ + } + } + + if registered > 0 { + cclog.Infof("Metric pull worker registered for %d clusters (interval: %s)", registered, d) + } +} + +// pullMetricsForCluster pulls metric data for all nodes in a cluster from the upstream backend +func pullMetricsForCluster(clusterName string) error { + startTime := time.Now() + + // 1. Get cluster configuration (includes all nodes) + cluster := archive.GetCluster(clusterName) + if cluster == nil { + return fmt.Errorf("cluster %s not found in configuration", clusterName) + } + + // 2. Use nil for nodes to query all nodes in the cluster + // The LoadNodeData implementation will default to all nodes when nil is passed + var nodes []string = nil + + // 3. Get all metrics for this cluster + metrics := []string{} + for _, mc := range cluster.MetricConfig { + metrics = append(metrics, mc.Name) + } + + // 4. Determine time range (last 60 minutes) + to := time.Now() + from := to.Add(-60 * time.Minute) + + // 5. Get upstream backend repository (from separate config) + upstreamRepo, err := metricdata.GetUpstreamMetricDataRepo(clusterName) + if err != nil { + cclog.Debugf("No upstream repository configured for cluster %s, skipping pull", clusterName) + return nil + } + + // 6. Query upstream backend for ALL scopes + scopes := []schema.MetricScope{ + schema.MetricScopeNode, + schema.MetricScopeCore, + schema.MetricScopeHWThread, + schema.MetricScopeAccelerator, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + nodeData, err := upstreamRepo.LoadNodeData( + clusterName, metrics, nodes, scopes, from, to, ctx, + ) + if err != nil { + cclog.Errorf("Failed to load node data for cluster %s: %s", clusterName, err) + return err + } + + // 7. Write data to memorystore + ms := memorystore.GetMemoryStore() + nodesWritten := 0 + metricsWritten := 0 + errorsEncountered := 0 + + for node, metricMap := range nodeData { + for metricName, jobMetrics := range metricMap { + for _, jm := range jobMetrics { + if err := writeMetricToMemoryStore(ms, clusterName, node, metricName, jm, from); err != nil { + cclog.Warnf("Failed to write metric %s for node %s: %s", metricName, node, err) + errorsEncountered++ + } else { + metricsWritten++ + } + } + } + nodesWritten++ + } + + duration := time.Since(startTime) + if errorsEncountered > 0 { + cclog.Infof("Pulled metrics for cluster %s: %d nodes, %d metrics, %d errors (took %s)", + clusterName, nodesWritten, metricsWritten, errorsEncountered, duration) + } else { + cclog.Debugf("Pulled metrics for cluster %s: %d nodes, %d metrics (took %s)", + clusterName, nodesWritten, metricsWritten, duration) + } + + return nil +} + +// writeMetricToMemoryStore converts a JobMetric from upstream backend +// into memorystore format and writes it +func writeMetricToMemoryStore( + ms *memorystore.MemoryStore, + cluster, hostname, metricName string, + jm *schema.JobMetric, + fromTime time.Time, +) error { + // For each series (node-level or finer scope) + for _, series := range jm.Series { + // Build selector based on scope + selector := buildSelector(cluster, hostname, series) + + // Convert series data to timestamped metric writes + timestep := int64(jm.Timestep) + baseTimestamp := fromTime.Unix() + + for i, value := range series.Data { + ts := baseTimestamp + (int64(i) * timestep) + + metric := memorystore.Metric{ + Name: metricName, + Value: value, + } + + if err := ms.Write(selector, ts, []memorystore.Metric{metric}); err != nil { + return fmt.Errorf("writing to memorystore: %w", err) + } + } + } + return nil +} + +// buildSelector constructs a selector path for the memorystore +// based on cluster, hostname, and series information +func buildSelector(cluster, hostname string, series schema.Series) []string { + selector := []string{cluster, hostname} + + // Add scope-specific components based on series metadata + // JobMetric.Series contains Id field that identifies the specific component + // Examples: + // - Node scope: series.Id might be nil or empty + // - Core scope: series.Id might be "0", "1", etc. + // - HWThread scope: series.Id might be "0", "1", etc. + // - Accelerator scope: series.Id might be "0", "1", etc. + + if series.Id != nil && *series.Id != "" { + // The selector format in memorystore is: [cluster, host, "type+id", "subtype+id"] + // For example: ["emmy", "node001", "core0", "hwthread0"] + // + // The series.Id from upstream includes the type prefix (e.g., "core0", "hwthread1") + // We can use it directly as a selector component + selector = append(selector, *series.Id) + } + + return selector +} diff --git a/internal/taskmanager/taskManager.go b/internal/taskmanager/taskManager.go index 57f2d88..1c8abf8 100644 --- a/internal/taskmanager/taskManager.go +++ b/internal/taskmanager/taskManager.go @@ -34,6 +34,8 @@ type CronFrequency struct { DurationWorker string `json:"duration-worker"` // Metric-Footprint Update Worker [Defaults to '10m'] FootprintWorker string `json:"footprint-worker"` + // Metric Pull Worker [Defaults to '60s'] + MetricPullWorker string `json:"metric-pull-worker"` } var ( @@ -117,6 +119,7 @@ func Start(cronCfg, archiveConfig json.RawMessage) { RegisterFootprintWorker() RegisterUpdateDurationWorker() RegisterCommitJobService() + RegisterMetricPullWorker() s.Start() }