Remove metricData usage

Is replaced by builtin memorystore API
This commit is contained in:
2025-12-21 13:29:43 +01:00
parent d23f20f42a
commit 50df63a2d2
15 changed files with 116 additions and 126 deletions

View File

@@ -284,9 +284,9 @@ func initSubsystems() error {
} }
// Initialize metricdata // Initialize metricdata
if err := metricdata.Init(); err != nil { // if err := metricdata.Init(); err != nil {
return fmt.Errorf("initializing metricdata repository: %w", err) // return fmt.Errorf("initializing metricdata repository: %w", err)
} // }
// Initialize upstream metricdata repositories for pull worker // Initialize upstream metricdata repositories for pull worker
if err := metricdata.InitUpstreamRepos(); err != nil { if err := metricdata.InitUpstreamRepos(); err != nil {
@@ -335,7 +335,6 @@ func runServer(ctx context.Context) error {
cclog.Debug("Metric store configuration not found, skipping memorystore initialization") cclog.Debug("Metric store configuration not found, skipping memorystore initialization")
} }
// Start archiver and task manager // Start archiver and task manager
archiver.Start(repository.GetJobRepository(), ctx) archiver.Start(repository.GetJobRepository(), ctx)
taskmanager.Start(ccconf.GetPackageConfig("cron"), ccconf.GetPackageConfig("archive")) taskmanager.Start(ccconf.GetPackageConfig("cron"), ccconf.GetPackageConfig("archive"))

View File

@@ -37,11 +37,6 @@
"clusters": [ "clusters": [
{ {
"name": "fritz", "name": "fritz",
"metricDataRepository": {
"kind": "cc-metric-store-internal",
"url": "http://localhost:8082",
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJFZERTQSJ9.eyJ1c2VyIjoiYWRtaW4iLCJyb2xlcyI6WyJST0xFX0FETUlOIiwiUk9MRV9BTkFMWVNUIiwiUk9MRV9VU0VSIl19.d-3_3FZTsadPjDEdsWrrQ7nS0edMAR4zjl-eK7rJU3HziNBfI9PDHDIpJVHTNN5E5SlLGLFXctWyKAkwhXL-Dw"
},
"filterRanges": { "filterRanges": {
"numNodes": { "numNodes": {
"from": 1, "from": 1,
@@ -59,11 +54,6 @@
}, },
{ {
"name": "alex", "name": "alex",
"metricDataRepository": {
"kind": "cc-metric-store-internal",
"url": "http://localhost:8082",
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJFZERTQSJ9.eyJ1c2VyIjoiYWRtaW4iLCJyb2xlcyI6WyJST0xFX0FETUlOIiwiUk9MRV9BTkFMWVNUIiwiUk9MRV9VU0VSIl19.d-3_3FZTsadPjDEdsWrrQ7nS0edMAR4zjl-eK7rJU3HziNBfI9PDHDIpJVHTNN5E5SlLGLFXctWyKAkwhXL-Dw"
},
"filterRanges": { "filterRanges": {
"numNodes": { "numNodes": {
"from": 1, "from": 1,

View File

@@ -29,11 +29,6 @@
"clusters": [ "clusters": [
{ {
"name": "test", "name": "test",
"metricDataRepository": {
"kind": "cc-metric-store",
"url": "http://localhost:8082",
"token": "eyJhbGciOiJF-E-pQBQ"
},
"filterRanges": { "filterRanges": {
"numNodes": { "numNodes": {
"from": 1, "from": 1,

View File

@@ -78,6 +78,9 @@ type ProgramConfig struct {
// If exists, will enable dynamic zoom in frontend metric plots using the configured values // If exists, will enable dynamic zoom in frontend metric plots using the configured values
EnableResampling *ResampleConfig `json:"resampling"` EnableResampling *ResampleConfig `json:"resampling"`
// Global upstream metric repository configuration for metric pull workers
UpstreamMetricRepository *json.RawMessage `json:"upstreamMetricRepository,omitempty"`
} }
type ResampleConfig struct { type ResampleConfig struct {
@@ -115,8 +118,6 @@ type FilterRanges struct {
type ClusterConfig struct { type ClusterConfig struct {
Name string `json:"name"` Name string `json:"name"`
FilterRanges *FilterRanges `json:"filterRanges"` FilterRanges *FilterRanges `json:"filterRanges"`
MetricDataRepository json.RawMessage `json:"metricDataRepository"`
UpstreamMetricRepository *json.RawMessage `json:"upstreamMetricRepository,omitempty"`
} }
var Clusters []*ClusterConfig var Clusters []*ClusterConfig

View File

@@ -119,6 +119,23 @@ var configSchema = `
} }
}, },
"required": ["trigger", "resolutions"] "required": ["trigger", "resolutions"]
},
"upstreamMetricRepository": {
"description": "Global upstream metric repository configuration for metric pull workers",
"type": "object",
"properties": {
"kind": {
"type": "string",
"enum": ["influxdb", "prometheus", "cc-metric-store", "cc-metric-store-internal", "test"]
},
"url": {
"type": "string"
},
"token": {
"type": "string"
}
},
"required": ["kind"]
} }
}, },
"required": ["apiAllowedIPs"] "required": ["apiAllowedIPs"]

View File

@@ -60,7 +60,6 @@ func setup(t *testing.T) *repository.JobRepository {
"clusters": [ "clusters": [
{ {
"name": "testcluster", "name": "testcluster",
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
"filterRanges": { "filterRanges": {
"numNodes": { "from": 1, "to": 64 }, "numNodes": { "from": 1, "to": 64 },
"duration": { "from": 0, "to": 86400 }, "duration": { "from": 0, "to": 86400 },
@@ -69,7 +68,6 @@ func setup(t *testing.T) *repository.JobRepository {
}, },
{ {
"name": "fritz", "name": "fritz",
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
"filterRanges": { "filterRanges": {
"numNodes": { "from": 1, "to": 944 }, "numNodes": { "from": 1, "to": 944 },
"duration": { "from": 0, "to": 86400 }, "duration": { "from": 0, "to": 86400 },
@@ -78,7 +76,6 @@ func setup(t *testing.T) *repository.JobRepository {
}, },
{ {
"name": "taurus", "name": "taurus",
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
"filterRanges": { "filterRanges": {
"numNodes": { "from": 1, "to": 4000 }, "numNodes": { "from": 1, "to": 4000 },
"duration": { "from": 0, "to": 604800 }, "duration": { "from": 0, "to": 604800 },

View File

@@ -37,63 +37,62 @@ type MetricDataRepository interface {
LoadNodeListData(cluster, subCluster string, nodes, metrics []string, scopes []schema.MetricScope, resolution int, from, to time.Time, ctx context.Context) (map[string]schema.JobData, error) LoadNodeListData(cluster, subCluster string, nodes, metrics []string, scopes []schema.MetricScope, resolution int, from, to time.Time, ctx context.Context) (map[string]schema.JobData, error)
} }
var ( var upstreamMetricDataRepo MetricDataRepository
metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{}
upstreamMetricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{}
)
func Init() error { // func Init() error {
for _, cluster := range config.Clusters { // for _, cluster := range config.Clusters {
if cluster.MetricDataRepository != nil { // if cluster.MetricDataRepository != nil {
var kind struct { // var kind struct {
Kind string `json:"kind"` // Kind string `json:"kind"`
} // }
if err := json.Unmarshal(cluster.MetricDataRepository, &kind); err != nil { // if err := json.Unmarshal(cluster.MetricDataRepository, &kind); err != nil {
cclog.Warn("Error while unmarshaling raw json MetricDataRepository") // cclog.Warn("Error while unmarshaling raw json MetricDataRepository")
return err // return err
} // }
//
// var mdr MetricDataRepository
// switch kind.Kind {
// case "cc-metric-store":
// mdr = &CCMetricStore{}
// case "prometheus":
// mdr = &PrometheusDataRepository{}
// case "test":
// mdr = &TestMetricDataRepository{}
// default:
// return fmt.Errorf("METRICDATA/METRICDATA > Unknown MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name)
// }
//
// if err := mdr.Init(cluster.MetricDataRepository); err != nil {
// cclog.Errorf("Error initializing MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name)
// return err
// }
// metricDataRepos[cluster.Name] = mdr
// }
// }
// return nil
// }
var mdr MetricDataRepository // func GetMetricDataRepo(cluster string) (MetricDataRepository, error) {
switch kind.Kind { // var err error
case "cc-metric-store": // repo, ok := metricDataRepos[cluster]
mdr = &CCMetricStore{} //
case "prometheus": // if !ok {
mdr = &PrometheusDataRepository{} // err = fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster)
case "test": // }
mdr = &TestMetricDataRepository{} //
default: // return repo, err
return fmt.Errorf("METRICDATA/METRICDATA > Unknown MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name) // }
}
if err := mdr.Init(cluster.MetricDataRepository); err != nil { // InitUpstreamRepos initializes global upstream metric data repository for the pull worker
cclog.Errorf("Error initializing MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name) func InitUpstreamRepos() error {
return err if config.Keys.UpstreamMetricRepository == nil {
}
metricDataRepos[cluster.Name] = mdr
}
}
return nil return nil
} }
func GetMetricDataRepo(cluster string) (MetricDataRepository, error) {
var err error
repo, ok := metricDataRepos[cluster]
if !ok {
err = fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster)
}
return repo, err
}
// InitUpstreamRepos initializes upstream metric data repositories for the pull worker
func InitUpstreamRepos() error {
for _, cluster := range config.Clusters {
if cluster.UpstreamMetricRepository != nil {
var kind struct { var kind struct {
Kind string `json:"kind"` Kind string `json:"kind"`
} }
if err := json.Unmarshal(*cluster.UpstreamMetricRepository, &kind); err != nil { if err := json.Unmarshal(*config.Keys.UpstreamMetricRepository, &kind); err != nil {
cclog.Warn("Error while unmarshaling raw json UpstreamMetricRepository") cclog.Warn("Error while unmarshaling raw json UpstreamMetricRepository")
return err return err
} }
@@ -107,25 +106,22 @@ func InitUpstreamRepos() error {
case "test": case "test":
mdr = &TestMetricDataRepository{} mdr = &TestMetricDataRepository{}
default: default:
return fmt.Errorf("METRICDATA/METRICDATA > Unknown UpstreamMetricRepository %v for cluster %v", kind.Kind, cluster.Name) return fmt.Errorf("METRICDATA/METRICDATA > Unknown UpstreamMetricRepository %v", kind.Kind)
} }
if err := mdr.Init(*cluster.UpstreamMetricRepository); err != nil { if err := mdr.Init(*config.Keys.UpstreamMetricRepository); err != nil {
cclog.Errorf("Error initializing UpstreamMetricRepository %v for cluster %v", kind.Kind, cluster.Name) cclog.Errorf("Error initializing UpstreamMetricRepository %v", kind.Kind)
return err return err
} }
upstreamMetricDataRepos[cluster.Name] = mdr upstreamMetricDataRepo = mdr
cclog.Infof("Initialized upstream metric repository '%s' for cluster '%s'", kind.Kind, cluster.Name) cclog.Infof("Initialized global upstream metric repository '%s'", kind.Kind)
}
}
return nil return nil
} }
// GetUpstreamMetricDataRepo returns the upstream metric data repository for a given cluster // GetUpstreamMetricDataRepo returns the global upstream metric data repository
func GetUpstreamMetricDataRepo(cluster string) (MetricDataRepository, error) { func GetUpstreamMetricDataRepo() (MetricDataRepository, error) {
repo, ok := upstreamMetricDataRepos[cluster] if upstreamMetricDataRepo == nil {
if !ok { return nil, fmt.Errorf("METRICDATA/METRICDATA > no upstream metric data repository configured")
return nil, fmt.Errorf("METRICDATA/METRICDATA > no upstream metric data repository configured for '%s'", cluster)
} }
return repo, nil return upstreamMetricDataRepo, nil
} }

View File

@@ -2,6 +2,7 @@
// All rights reserved. This file is part of cc-backend. // All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package metricdata package metricdata
import ( import (

View File

@@ -42,7 +42,6 @@ func nodeTestSetup(t *testing.T) {
"clusters": [ "clusters": [
{ {
"name": "testcluster", "name": "testcluster",
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
"filterRanges": { "filterRanges": {
"numNodes": { "from": 1, "to": 64 }, "numNodes": { "from": 1, "to": 64 },
"duration": { "from": 0, "to": 86400 }, "duration": { "from": 0, "to": 86400 },

Binary file not shown.

View File

@@ -31,7 +31,6 @@ func setupUserTest(t *testing.T) *UserCfgRepo {
"clusters": [ "clusters": [
{ {
"name": "testcluster", "name": "testcluster",
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
"filterRanges": { "filterRanges": {
"numNodes": { "from": 1, "to": 64 }, "numNodes": { "from": 1, "to": 64 },
"duration": { "from": 0, "to": 86400 }, "duration": { "from": 0, "to": 86400 },

View File

@@ -11,8 +11,8 @@ import (
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/memorystore" "github.com/ClusterCockpit/cc-backend/internal/memorystore"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema" "github.com/ClusterCockpit/cc-lib/schema"
@@ -36,7 +36,6 @@ func RegisterMetricPullWorker() {
// Register one worker per cluster // Register one worker per cluster
registered := 0 registered := 0
for _, cluster := range config.Clusters { for _, cluster := range config.Clusters {
cluster := cluster // capture for closure
_, err := s.NewJob( _, err := s.NewJob(
gocron.DurationJob(d), gocron.DurationJob(d),
@@ -83,10 +82,10 @@ func pullMetricsForCluster(clusterName string) error {
to := time.Now() to := time.Now()
from := to.Add(-60 * time.Minute) from := to.Add(-60 * time.Minute)
// 5. Get upstream backend repository (from separate config) // 5. Get upstream backend repository (from global config)
upstreamRepo, err := metricdata.GetUpstreamMetricDataRepo(clusterName) upstreamRepo, err := metricdata.GetUpstreamMetricDataRepo()
if err != nil { if err != nil {
cclog.Debugf("No upstream repository configured for cluster %s, skipping pull", clusterName) cclog.Debugf("No upstream repository configured, skipping pull for cluster %s", clusterName)
return nil return nil
} }

View File

@@ -116,10 +116,11 @@ func Start(cronCfg, archiveConfig json.RawMessage) {
RegisterLdapSyncService(lc.SyncInterval) RegisterLdapSyncService(lc.SyncInterval)
} }
RegisterMetricPullWorker()
RegisterFootprintWorker() RegisterFootprintWorker()
RegisterUpdateDurationWorker() RegisterUpdateDurationWorker()
RegisterCommitJobService() RegisterCommitJobService()
RegisterMetricPullWorker()
s.Start() s.Start()
} }
@@ -127,6 +128,8 @@ func Start(cronCfg, archiveConfig json.RawMessage) {
// Shutdown stops the task manager and its scheduler. // Shutdown stops the task manager and its scheduler.
func Shutdown() { func Shutdown() {
if s != nil { if s != nil {
s.Shutdown() if err := s.Shutdown(); err != nil {
cclog.Errorf("taskmanager Shutdown: error stopping scheduler: %v", err)
}
} }
} }

View File

@@ -10,7 +10,7 @@ import (
"math" "math"
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/memorystore"
"github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema" "github.com/ClusterCockpit/cc-lib/schema"
@@ -58,12 +58,6 @@ func RegisterFootprintWorker() {
allMetrics = append(allMetrics, mc.Name) allMetrics = append(allMetrics, mc.Name)
} }
repo, err := metricdata.GetMetricDataRepo(cluster.Name)
if err != nil {
cclog.Errorf("no metric data repository configured for '%s'", cluster.Name)
continue
}
pendingStatements := []sq.UpdateBuilder{} pendingStatements := []sq.UpdateBuilder{}
for _, job := range jobs { for _, job := range jobs {
@@ -72,7 +66,7 @@ func RegisterFootprintWorker() {
sJob := time.Now() sJob := time.Now()
jobStats, err := repo.LoadStats(job, allMetrics, context.Background()) jobStats, err := memorystore.LoadStats(job, allMetrics, context.Background())
if err != nil { if err != nil {
cclog.Errorf("error wile loading job data stats for footprint update: %v", err) cclog.Errorf("error wile loading job data stats for footprint update: %v", err)
ce++ ce++

View File

@@ -262,7 +262,7 @@ func RenderTemplate(rw http.ResponseWriter, file string, page *Page) {
if page.Clusters == nil { if page.Clusters == nil {
for _, c := range config.Clusters { for _, c := range config.Clusters {
page.Clusters = append(page.Clusters, config.ClusterConfig{Name: c.Name, FilterRanges: c.FilterRanges, MetricDataRepository: nil}) page.Clusters = append(page.Clusters, config.ClusterConfig{Name: c.Name, FilterRanges: c.FilterRanges})
} }
} }