diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index dddbddf..30f7798 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -284,9 +284,9 @@ func initSubsystems() error { } // Initialize metricdata - if err := metricdata.Init(); err != nil { - return fmt.Errorf("initializing metricdata repository: %w", err) - } + // if err := metricdata.Init(); err != nil { + // return fmt.Errorf("initializing metricdata repository: %w", err) + // } // Initialize upstream metricdata repositories for pull worker if err := metricdata.InitUpstreamRepos(); err != nil { @@ -335,7 +335,6 @@ func runServer(ctx context.Context) error { cclog.Debug("Metric store configuration not found, skipping memorystore initialization") } - // Start archiver and task manager archiver.Start(repository.GetJobRepository(), ctx) taskmanager.Start(ccconf.GetPackageConfig("cron"), ccconf.GetPackageConfig("archive")) diff --git a/configs/config-demo.json b/configs/config-demo.json index 58366fb..80a1f65 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -37,11 +37,6 @@ "clusters": [ { "name": "fritz", - "metricDataRepository": { - "kind": "cc-metric-store-internal", - "url": "http://localhost:8082", - "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJFZERTQSJ9.eyJ1c2VyIjoiYWRtaW4iLCJyb2xlcyI6WyJST0xFX0FETUlOIiwiUk9MRV9BTkFMWVNUIiwiUk9MRV9VU0VSIl19.d-3_3FZTsadPjDEdsWrrQ7nS0edMAR4zjl-eK7rJU3HziNBfI9PDHDIpJVHTNN5E5SlLGLFXctWyKAkwhXL-Dw" - }, "filterRanges": { "numNodes": { "from": 1, @@ -59,11 +54,6 @@ }, { "name": "alex", - "metricDataRepository": { - "kind": "cc-metric-store-internal", - "url": "http://localhost:8082", - "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJFZERTQSJ9.eyJ1c2VyIjoiYWRtaW4iLCJyb2xlcyI6WyJST0xFX0FETUlOIiwiUk9MRV9BTkFMWVNUIiwiUk9MRV9VU0VSIl19.d-3_3FZTsadPjDEdsWrrQ7nS0edMAR4zjl-eK7rJU3HziNBfI9PDHDIpJVHTNN5E5SlLGLFXctWyKAkwhXL-Dw" - }, "filterRanges": { "numNodes": { "from": 1, diff --git a/configs/config.json b/configs/config.json index 88a9e93..f32d48f 100644 --- a/configs/config.json +++ b/configs/config.json @@ -29,11 +29,6 @@ "clusters": [ { "name": "test", - "metricDataRepository": { - "kind": "cc-metric-store", - "url": "http://localhost:8082", - "token": "eyJhbGciOiJF-E-pQBQ" - }, "filterRanges": { "numNodes": { "from": 1, diff --git a/internal/config/config.go b/internal/config/config.go index 1c302f0..b594c6d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -78,6 +78,9 @@ type ProgramConfig struct { // If exists, will enable dynamic zoom in frontend metric plots using the configured values EnableResampling *ResampleConfig `json:"resampling"` + + // Global upstream metric repository configuration for metric pull workers + UpstreamMetricRepository *json.RawMessage `json:"upstreamMetricRepository,omitempty"` } type ResampleConfig struct { @@ -113,10 +116,8 @@ type FilterRanges struct { } type ClusterConfig struct { - Name string `json:"name"` - FilterRanges *FilterRanges `json:"filterRanges"` - MetricDataRepository json.RawMessage `json:"metricDataRepository"` - UpstreamMetricRepository *json.RawMessage `json:"upstreamMetricRepository,omitempty"` + Name string `json:"name"` + FilterRanges *FilterRanges `json:"filterRanges"` } var Clusters []*ClusterConfig diff --git a/internal/config/schema.go b/internal/config/schema.go index 6003a50..f57461d 100644 --- a/internal/config/schema.go +++ b/internal/config/schema.go @@ -119,6 +119,23 @@ var configSchema = ` } }, "required": ["trigger", "resolutions"] + }, + "upstreamMetricRepository": { + "description": "Global upstream metric repository configuration for metric pull workers", + "type": "object", + "properties": { + "kind": { + "type": "string", + "enum": ["influxdb", "prometheus", "cc-metric-store", "cc-metric-store-internal", "test"] + }, + "url": { + "type": "string" + }, + "token": { + "type": "string" + } + }, + "required": ["kind"] } }, "required": ["apiAllowedIPs"] diff --git a/internal/importer/importer_test.go b/internal/importer/importer_test.go index 470f760..5922e45 100644 --- a/internal/importer/importer_test.go +++ b/internal/importer/importer_test.go @@ -60,7 +60,6 @@ func setup(t *testing.T) *repository.JobRepository { "clusters": [ { "name": "testcluster", - "metricDataRepository": {"kind": "test", "url": "bla:8081"}, "filterRanges": { "numNodes": { "from": 1, "to": 64 }, "duration": { "from": 0, "to": 86400 }, @@ -69,7 +68,6 @@ func setup(t *testing.T) *repository.JobRepository { }, { "name": "fritz", - "metricDataRepository": {"kind": "test", "url": "bla:8081"}, "filterRanges": { "numNodes": { "from": 1, "to": 944 }, "duration": { "from": 0, "to": 86400 }, @@ -78,7 +76,6 @@ func setup(t *testing.T) *repository.JobRepository { }, { "name": "taurus", - "metricDataRepository": {"kind": "test", "url": "bla:8081"}, "filterRanges": { "numNodes": { "from": 1, "to": 4000 }, "duration": { "from": 0, "to": 604800 }, diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index df33ab4..32ef1cd 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -37,95 +37,91 @@ type MetricDataRepository interface { LoadNodeListData(cluster, subCluster string, nodes, metrics []string, scopes []schema.MetricScope, resolution int, from, to time.Time, ctx context.Context) (map[string]schema.JobData, error) } -var ( - metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} - upstreamMetricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} -) +var upstreamMetricDataRepo MetricDataRepository -func Init() error { - for _, cluster := range config.Clusters { - if cluster.MetricDataRepository != nil { - var kind struct { - Kind string `json:"kind"` - } - if err := json.Unmarshal(cluster.MetricDataRepository, &kind); err != nil { - cclog.Warn("Error while unmarshaling raw json MetricDataRepository") - return err - } +// func Init() error { +// for _, cluster := range config.Clusters { +// if cluster.MetricDataRepository != nil { +// var kind struct { +// Kind string `json:"kind"` +// } +// if err := json.Unmarshal(cluster.MetricDataRepository, &kind); err != nil { +// cclog.Warn("Error while unmarshaling raw json MetricDataRepository") +// return err +// } +// +// var mdr MetricDataRepository +// switch kind.Kind { +// case "cc-metric-store": +// mdr = &CCMetricStore{} +// case "prometheus": +// mdr = &PrometheusDataRepository{} +// case "test": +// mdr = &TestMetricDataRepository{} +// default: +// return fmt.Errorf("METRICDATA/METRICDATA > Unknown MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name) +// } +// +// if err := mdr.Init(cluster.MetricDataRepository); err != nil { +// cclog.Errorf("Error initializing MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name) +// return err +// } +// metricDataRepos[cluster.Name] = mdr +// } +// } +// return nil +// } - var mdr MetricDataRepository - switch kind.Kind { - case "cc-metric-store": - mdr = &CCMetricStore{} - case "prometheus": - mdr = &PrometheusDataRepository{} - case "test": - mdr = &TestMetricDataRepository{} - default: - return fmt.Errorf("METRICDATA/METRICDATA > Unknown MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name) - } +// func GetMetricDataRepo(cluster string) (MetricDataRepository, error) { +// var err error +// repo, ok := metricDataRepos[cluster] +// +// if !ok { +// err = fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) +// } +// +// return repo, err +// } - if err := mdr.Init(cluster.MetricDataRepository); err != nil { - cclog.Errorf("Error initializing MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name) - return err - } - metricDataRepos[cluster.Name] = mdr - } - } - return nil -} - -func GetMetricDataRepo(cluster string) (MetricDataRepository, error) { - var err error - repo, ok := metricDataRepos[cluster] - - if !ok { - err = fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) - } - - return repo, err -} - -// InitUpstreamRepos initializes upstream metric data repositories for the pull worker +// InitUpstreamRepos initializes global upstream metric data repository for the pull worker func InitUpstreamRepos() error { - for _, cluster := range config.Clusters { - if cluster.UpstreamMetricRepository != nil { - var kind struct { - Kind string `json:"kind"` - } - if err := json.Unmarshal(*cluster.UpstreamMetricRepository, &kind); err != nil { - cclog.Warn("Error while unmarshaling raw json UpstreamMetricRepository") - return err - } - - var mdr MetricDataRepository - switch kind.Kind { - case "cc-metric-store": - mdr = &CCMetricStore{} - case "prometheus": - mdr = &PrometheusDataRepository{} - case "test": - mdr = &TestMetricDataRepository{} - default: - return fmt.Errorf("METRICDATA/METRICDATA > Unknown UpstreamMetricRepository %v for cluster %v", kind.Kind, cluster.Name) - } - - if err := mdr.Init(*cluster.UpstreamMetricRepository); err != nil { - cclog.Errorf("Error initializing UpstreamMetricRepository %v for cluster %v", kind.Kind, cluster.Name) - return err - } - upstreamMetricDataRepos[cluster.Name] = mdr - cclog.Infof("Initialized upstream metric repository '%s' for cluster '%s'", kind.Kind, cluster.Name) - } + if config.Keys.UpstreamMetricRepository == nil { + return nil } + + var kind struct { + Kind string `json:"kind"` + } + if err := json.Unmarshal(*config.Keys.UpstreamMetricRepository, &kind); err != nil { + cclog.Warn("Error while unmarshaling raw json UpstreamMetricRepository") + return err + } + + var mdr MetricDataRepository + switch kind.Kind { + case "cc-metric-store": + mdr = &CCMetricStore{} + case "prometheus": + mdr = &PrometheusDataRepository{} + case "test": + mdr = &TestMetricDataRepository{} + default: + return fmt.Errorf("METRICDATA/METRICDATA > Unknown UpstreamMetricRepository %v", kind.Kind) + } + + if err := mdr.Init(*config.Keys.UpstreamMetricRepository); err != nil { + cclog.Errorf("Error initializing UpstreamMetricRepository %v", kind.Kind) + return err + } + upstreamMetricDataRepo = mdr + cclog.Infof("Initialized global upstream metric repository '%s'", kind.Kind) return nil } -// GetUpstreamMetricDataRepo returns the upstream metric data repository for a given cluster -func GetUpstreamMetricDataRepo(cluster string) (MetricDataRepository, error) { - repo, ok := upstreamMetricDataRepos[cluster] - if !ok { - return nil, fmt.Errorf("METRICDATA/METRICDATA > no upstream metric data repository configured for '%s'", cluster) +// GetUpstreamMetricDataRepo returns the global upstream metric data repository +func GetUpstreamMetricDataRepo() (MetricDataRepository, error) { + if upstreamMetricDataRepo == nil { + return nil, fmt.Errorf("METRICDATA/METRICDATA > no upstream metric data repository configured") } - return repo, nil + return upstreamMetricDataRepo, nil } diff --git a/internal/metricdata/prometheus.go b/internal/metricdata/prometheus.go index 66c5bc1..288e045 100644 --- a/internal/metricdata/prometheus.go +++ b/internal/metricdata/prometheus.go @@ -2,6 +2,7 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package metricdata import ( diff --git a/internal/repository/node_test.go b/internal/repository/node_test.go index 466f51e..f5a2487 100644 --- a/internal/repository/node_test.go +++ b/internal/repository/node_test.go @@ -42,7 +42,6 @@ func nodeTestSetup(t *testing.T) { "clusters": [ { "name": "testcluster", - "metricDataRepository": {"kind": "test", "url": "bla:8081"}, "filterRanges": { "numNodes": { "from": 1, "to": 64 }, "duration": { "from": 0, "to": 86400 }, diff --git a/internal/repository/testdata/job.db b/internal/repository/testdata/job.db index 5c5a692..575e0f2 100644 Binary files a/internal/repository/testdata/job.db and b/internal/repository/testdata/job.db differ diff --git a/internal/repository/userConfig_test.go b/internal/repository/userConfig_test.go index b6f6843..50444f7 100644 --- a/internal/repository/userConfig_test.go +++ b/internal/repository/userConfig_test.go @@ -31,7 +31,6 @@ func setupUserTest(t *testing.T) *UserCfgRepo { "clusters": [ { "name": "testcluster", - "metricDataRepository": {"kind": "test", "url": "bla:8081"}, "filterRanges": { "numNodes": { "from": 1, "to": 64 }, "duration": { "from": 0, "to": 86400 }, diff --git a/internal/taskmanager/metricPullWorker.go b/internal/taskmanager/metricPullWorker.go index 8912314..1361e56 100644 --- a/internal/taskmanager/metricPullWorker.go +++ b/internal/taskmanager/metricPullWorker.go @@ -11,8 +11,8 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/memorystore" + "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/schema" @@ -36,7 +36,6 @@ func RegisterMetricPullWorker() { // Register one worker per cluster registered := 0 for _, cluster := range config.Clusters { - cluster := cluster // capture for closure _, err := s.NewJob( gocron.DurationJob(d), @@ -83,10 +82,10 @@ func pullMetricsForCluster(clusterName string) error { to := time.Now() from := to.Add(-60 * time.Minute) - // 5. Get upstream backend repository (from separate config) - upstreamRepo, err := metricdata.GetUpstreamMetricDataRepo(clusterName) + // 5. Get upstream backend repository (from global config) + upstreamRepo, err := metricdata.GetUpstreamMetricDataRepo() if err != nil { - cclog.Debugf("No upstream repository configured for cluster %s, skipping pull", clusterName) + cclog.Debugf("No upstream repository configured, skipping pull for cluster %s", clusterName) return nil } diff --git a/internal/taskmanager/taskManager.go b/internal/taskmanager/taskManager.go index 1c8abf8..f58c676 100644 --- a/internal/taskmanager/taskManager.go +++ b/internal/taskmanager/taskManager.go @@ -116,10 +116,11 @@ func Start(cronCfg, archiveConfig json.RawMessage) { RegisterLdapSyncService(lc.SyncInterval) } + RegisterMetricPullWorker() + RegisterFootprintWorker() RegisterUpdateDurationWorker() RegisterCommitJobService() - RegisterMetricPullWorker() s.Start() } @@ -127,6 +128,8 @@ func Start(cronCfg, archiveConfig json.RawMessage) { // Shutdown stops the task manager and its scheduler. func Shutdown() { if s != nil { - s.Shutdown() + if err := s.Shutdown(); err != nil { + cclog.Errorf("taskmanager Shutdown: error stopping scheduler: %v", err) + } } } diff --git a/internal/taskmanager/updateFootprintService.go b/internal/taskmanager/updateFootprintService.go index ae9512c..d440aca 100644 --- a/internal/taskmanager/updateFootprintService.go +++ b/internal/taskmanager/updateFootprintService.go @@ -10,7 +10,7 @@ import ( "math" "time" - "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/internal/memorystore" "github.com/ClusterCockpit/cc-backend/pkg/archive" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/schema" @@ -58,12 +58,6 @@ func RegisterFootprintWorker() { allMetrics = append(allMetrics, mc.Name) } - repo, err := metricdata.GetMetricDataRepo(cluster.Name) - if err != nil { - cclog.Errorf("no metric data repository configured for '%s'", cluster.Name) - continue - } - pendingStatements := []sq.UpdateBuilder{} for _, job := range jobs { @@ -72,7 +66,7 @@ func RegisterFootprintWorker() { sJob := time.Now() - jobStats, err := repo.LoadStats(job, allMetrics, context.Background()) + jobStats, err := memorystore.LoadStats(job, allMetrics, context.Background()) if err != nil { cclog.Errorf("error wile loading job data stats for footprint update: %v", err) ce++ diff --git a/web/web.go b/web/web.go index 31d7002..534f456 100644 --- a/web/web.go +++ b/web/web.go @@ -262,7 +262,7 @@ func RenderTemplate(rw http.ResponseWriter, file string, page *Page) { if page.Clusters == nil { for _, c := range config.Clusters { - page.Clusters = append(page.Clusters, config.ClusterConfig{Name: c.Name, FilterRanges: c.FilterRanges, MetricDataRepository: nil}) + page.Clusters = append(page.Clusters, config.ClusterConfig{Name: c.Name, FilterRanges: c.FilterRanges}) } }