Add configuration options and enable dynamic memory management through cc-backend callback

This commit is contained in:
2026-01-28 12:59:05 +01:00
parent 28f5ffe9c4
commit dd23f49364
7 changed files with 191 additions and 77 deletions

View File

@@ -0,0 +1,54 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-metric-store.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package api
import (
"encoding/json"
"fmt"
"net/http"
"time"
)
// BackendNodeProvider implements metricstore.NodeProvider by querying
// the cc-backend /api/jobs/used_nodes endpoint.
type BackendNodeProvider struct {
backendUrl string
client *http.Client
}
// NewBackendNodeProvider creates a new BackendNodeProvider that queries
// the given cc-backend URL for used nodes information.
func NewBackendNodeProvider(backendUrl string) *BackendNodeProvider {
return &BackendNodeProvider{
backendUrl: backendUrl,
client: &http.Client{
Timeout: 10 * time.Second,
},
}
}
// GetUsedNodes returns a map of cluster names to sorted lists of unique hostnames
// that are currently in use by jobs that started before the given timestamp.
func (p *BackendNodeProvider) GetUsedNodes(ts int64) (map[string][]string, error) {
url := fmt.Sprintf("%s/api/jobs/used_nodes?ts=%d", p.backendUrl, ts)
resp, err := p.client.Get(url)
if err != nil {
return nil, fmt.Errorf("querying used nodes from backend: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("backend returned status %d", resp.StatusCode)
}
var result map[string][]string
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decoding used nodes response: %w", err)
}
return result, nil
}

View File

@@ -10,57 +10,20 @@ import (
"encoding/json"
"fmt"
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
)
// For aggregation over multiple values at different cpus/sockets/..., not time!
type AggregationStrategy int
const (
NoAggregation AggregationStrategy = iota
SumAggregation
AvgAggregation
)
func (as *AggregationStrategy) UnmarshalJSON(data []byte) error {
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}
switch str {
case "":
*as = NoAggregation
case "sum":
*as = SumAggregation
case "avg":
*as = AvgAggregation
default:
return fmt.Errorf("invalid aggregation strategy: %#v", str)
}
return nil
}
type MetricConfig struct {
// Interval in seconds at which measurements will arive.
Frequency int64 `json:"frequency"`
// Can be 'sum', 'avg' or null. Describes how to aggregate metrics from the same timestep over the hierarchy.
Aggregation AggregationStrategy `json:"aggregation"`
// Private, used internally...
Offset int
}
var metrics map[string]MetricConfig
var metrics map[string]metricstore.MetricConfig
type Config struct {
Address string `json:"addr"`
CertFile string `json:"https-cert-file"`
KeyFile string `json:"https-key-file"`
User string `json:"user"`
Group string `json:"group"`
Debug struct {
Address string `json:"addr"`
CertFile string `json:"https-cert-file"`
KeyFile string `json:"https-key-file"`
User string `json:"user"`
Group string `json:"group"`
BackendURL string `json:"backend-url"`
Debug struct {
DumpToFile string `json:"dump-to-file"`
EnableGops bool `json:"gops"`
} `json:"debug"`
@@ -69,13 +32,32 @@ type Config struct {
var Keys Config
type metricConfigJSON struct {
Frequency int64 `json:"frequency"`
Aggregation string `json:"aggregation"`
}
func InitMetrics(metricConfig json.RawMessage) {
Validate(metricConfigSchema, metricConfig)
var tempMetrics map[string]metricConfigJSON
dec := json.NewDecoder(bytes.NewReader(metricConfig))
dec.DisallowUnknownFields()
if err := dec.Decode(&metrics); err != nil {
if err := dec.Decode(&tempMetrics); err != nil {
cclog.Abortf("Config Init: Could not decode config file '%s'.\nError: %s\n", metricConfig, err.Error())
}
metrics = make(map[string]metricstore.MetricConfig)
for name, cfg := range tempMetrics {
agg, err := metricstore.AssignAggregationStrategy(cfg.Aggregation)
if err != nil {
cclog.Warnf("Could not parse aggregation strategy for metric '%s': %s", name, err.Error())
}
metrics[name] = metricstore.MetricConfig{
Frequency: cfg.Frequency,
Aggregation: agg,
}
}
}
func Init(mainConfig json.RawMessage) {
@@ -93,3 +75,7 @@ func GetMetricFrequency(metricName string) (int64, error) {
}
return 0, fmt.Errorf("metric %s not found", metricName)
}
func GetMetrics() map[string]metricstore.MetricConfig {
return metrics
}

View File

@@ -29,6 +29,10 @@ var configSchema = `
"description": "Drop root permissions once the port was taken. Only applicable if using privileged port.",
"type": "string"
},
"backend-url": {
"description": "URL of cc-backend for querying job information (e.g., 'https://localhost:8080').",
"type": "string"
},
"debug": {
"description": "Debug options.",
"type": "object",