This commit is contained in:
Christoph Kluge
2026-01-13 16:59:57 +01:00
88 changed files with 1670 additions and 3127 deletions

View File

@@ -23,8 +23,8 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
"github.com/ClusterCockpit/cc-backend/internal/metricstore"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
ccconf "github.com/ClusterCockpit/cc-lib/v2/ccConfig"
@@ -157,11 +157,7 @@ func setup(t *testing.T) *api.RestAPI {
// Load and check main configuration
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {
if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil {
config.Init(cfg, clustercfg)
} else {
cclog.Abort("Cluster configuration must be present")
}
config.Init(cfg)
} else {
cclog.Abort("Main configuration must be present")
}
@@ -173,9 +169,7 @@ func setup(t *testing.T) *api.RestAPI {
t.Fatal(err)
}
if err := metricdata.Init(); err != nil {
t.Fatal(err)
}
// metricstore initialization removed - it's initialized via callback in tests
archiver.Start(repository.GetJobRepository(), context.Background())
@@ -221,7 +215,7 @@ func TestRestApi(t *testing.T) {
},
}
metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
metricstore.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
return testData, nil
}
@@ -366,7 +360,7 @@ func TestRestApi(t *testing.T) {
}
t.Run("CheckArchive", func(t *testing.T) {
data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60)
data, err := metricdispatch.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60)
if err != nil {
t.Fatal(err)
}

View File

@@ -22,7 +22,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/importer"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
@@ -293,7 +293,7 @@ func (api *RestAPI) getCompleteJobByID(rw http.ResponseWriter, r *http.Request)
}
if r.URL.Query().Get("all-metrics") == "true" {
data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context(), resolution)
data, err = metricdispatch.LoadData(job, nil, scopes, r.Context(), resolution)
if err != nil {
cclog.Warnf("REST: error while loading all-metrics job data for JobID %d on %s", job.JobID, job.Cluster)
return
@@ -389,7 +389,7 @@ func (api *RestAPI) getJobByID(rw http.ResponseWriter, r *http.Request) {
resolution = max(resolution, mc.Timestep)
}
data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context(), resolution)
data, err := metricdispatch.LoadData(job, metrics, scopes, r.Context(), resolution)
if err != nil {
cclog.Warnf("REST: error while loading job data for JobID %d on %s", job.JobID, job.Cluster)
return

View File

@@ -15,7 +15,7 @@ import (
"strconv"
"strings"
"github.com/ClusterCockpit/cc-backend/internal/memorystore"
"github.com/ClusterCockpit/cc-backend/internal/metricstore"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/influxdata/line-protocol/v2/lineprotocol"
@@ -58,7 +58,7 @@ func freeMetrics(rw http.ResponseWriter, r *http.Request) {
return
}
ms := memorystore.GetMemoryStore()
ms := metricstore.GetMemoryStore()
n := 0
for _, sel := range selectors {
bn, err := ms.Free(sel, to)
@@ -97,9 +97,9 @@ func writeMetrics(rw http.ResponseWriter, r *http.Request) {
return
}
ms := memorystore.GetMemoryStore()
ms := metricstore.GetMemoryStore()
dec := lineprotocol.NewDecoderWithBytes(bytes)
if err := memorystore.DecodeLine(dec, ms, r.URL.Query().Get("cluster")); err != nil {
if err := metricstore.DecodeLine(dec, ms, r.URL.Query().Get("cluster")); err != nil {
cclog.Errorf("/api/write error: %s", err.Error())
handleError(err, http.StatusBadRequest, rw)
return
@@ -129,7 +129,7 @@ func debugMetrics(rw http.ResponseWriter, r *http.Request) {
selector = strings.Split(raw, ":")
}
ms := memorystore.GetMemoryStore()
ms := metricstore.GetMemoryStore()
if err := ms.DebugDump(bufio.NewWriter(rw), selector); err != nil {
handleError(err, http.StatusBadRequest, rw)
return
@@ -162,7 +162,7 @@ func metricsHealth(rw http.ResponseWriter, r *http.Request) {
selector := []string{rawCluster, rawNode}
ms := memorystore.GetMemoryStore()
ms := metricstore.GetMemoryStore()
if err := ms.HealthCheck(bufio.NewWriter(rw), selector); err != nil {
handleError(err, http.StatusBadRequest, rw)
return

View File

@@ -18,7 +18,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/metricstore"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
ccconf "github.com/ClusterCockpit/cc-lib/v2/ccConfig"
@@ -151,11 +151,7 @@ func setupNatsTest(t *testing.T) *NatsAPI {
// Load and check main configuration
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {
if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil {
config.Init(cfg, clustercfg)
} else {
cclog.Abort("Cluster configuration must be present")
}
config.Init(cfg)
} else {
cclog.Abort("Main configuration must be present")
}
@@ -167,9 +163,7 @@ func setupNatsTest(t *testing.T) *NatsAPI {
t.Fatal(err)
}
if err := metricdata.Init(); err != nil {
t.Fatal(err)
}
// metricstore initialization removed - it's initialized via callback in tests
archiver.Start(repository.GetJobRepository(), context.Background())
@@ -564,7 +558,7 @@ func TestNatsHandleStopJob(t *testing.T) {
},
}
metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
metricstore.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
return testData, nil
}

View File

@@ -106,7 +106,7 @@ Data is archived at the highest available resolution (typically 60s intervals).
```go
// In archiver.go ArchiveJob() function
jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 300)
jobData, err := metricdispatch.LoadData(job, allMetrics, scopes, ctx, 300)
// 0 = highest resolution
// 300 = 5-minute resolution
```
@@ -185,6 +185,6 @@ Internal state is protected by:
## Dependencies
- `internal/repository`: Database operations for job metadata
- `internal/metricDataDispatcher`: Loading metric data from various backends
- `internal/metricdispatch`: Loading metric data from various backends
- `pkg/archive`: Archive backend abstraction (filesystem, S3, SQLite)
- `cc-lib/schema`: Job and metric data structures

View File

@@ -10,7 +10,7 @@ import (
"math"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
@@ -60,7 +60,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.Job, error) {
scopes = append(scopes, schema.MetricScopeAccelerator)
}
jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 0) // 0 Resulotion-Value retrieves highest res (60s)
jobData, err := metricdispatch.LoadData(job, allMetrics, scopes, ctx, 0) // 0 Resulotion-Value retrieves highest res (60s)
if err != nil {
cclog.Error("Error wile loading job data for archiving")
return nil, err

View File

@@ -40,7 +40,7 @@ type Authenticator interface {
// authenticator should attempt the login. This method should not perform
// expensive operations or actual authentication.
CanLogin(user *schema.User, username string, rw http.ResponseWriter, r *http.Request) (*schema.User, bool)
// Login performs the actually authentication for the user.
// It returns the authenticated user or an error if authentication fails.
// The user parameter may be nil if the user doesn't exist in the database yet.
@@ -65,13 +65,13 @@ var ipUserLimiters sync.Map
func getIPUserLimiter(ip, username string) *rate.Limiter {
key := ip + ":" + username
now := time.Now()
if entry, ok := ipUserLimiters.Load(key); ok {
rle := entry.(*rateLimiterEntry)
rle.lastUsed = now
return rle.limiter
}
// More aggressive rate limiting: 5 attempts per 15 minutes
newLimiter := rate.NewLimiter(rate.Every(15*time.Minute/5), 5)
ipUserLimiters.Store(key, &rateLimiterEntry{
@@ -176,7 +176,7 @@ func (auth *Authentication) AuthViaSession(
func Init(authCfg *json.RawMessage) {
initOnce.Do(func() {
authInstance = &Authentication{}
// Start background cleanup of rate limiters
startRateLimiterCleanup()
@@ -272,7 +272,7 @@ func handleUserSync(user *schema.User, syncUserOnLogin, updateUserOnLogin bool)
cclog.Errorf("Error while loading user '%s': %v", user.Username, err)
return
}
if err == sql.ErrNoRows && syncUserOnLogin { // Add new user
if err := r.AddUser(user); err != nil {
cclog.Errorf("Error while adding user '%s' to DB: %v", user.Username, err)

View File

@@ -15,25 +15,25 @@ import (
func TestGetIPUserLimiter(t *testing.T) {
ip := "192.168.1.1"
username := "testuser"
// Get limiter for the first time
limiter1 := getIPUserLimiter(ip, username)
if limiter1 == nil {
t.Fatal("Expected limiter to be created")
}
// Get the same limiter again
limiter2 := getIPUserLimiter(ip, username)
if limiter1 != limiter2 {
t.Error("Expected to get the same limiter instance")
}
// Get a different limiter for different user
limiter3 := getIPUserLimiter(ip, "otheruser")
if limiter1 == limiter3 {
t.Error("Expected different limiter for different user")
}
// Get a different limiter for different IP
limiter4 := getIPUserLimiter("192.168.1.2", username)
if limiter1 == limiter4 {
@@ -45,16 +45,16 @@ func TestGetIPUserLimiter(t *testing.T) {
func TestRateLimiterBehavior(t *testing.T) {
ip := "10.0.0.1"
username := "ratelimituser"
limiter := getIPUserLimiter(ip, username)
// Should allow first 5 attempts
for i := 0; i < 5; i++ {
if !limiter.Allow() {
t.Errorf("Request %d should be allowed within rate limit", i+1)
}
}
// 6th attempt should be blocked
if limiter.Allow() {
t.Error("Request 6 should be blocked by rate limiter")
@@ -65,19 +65,19 @@ func TestRateLimiterBehavior(t *testing.T) {
func TestCleanupOldRateLimiters(t *testing.T) {
// Clear all existing limiters first to avoid interference from other tests
cleanupOldRateLimiters(time.Now().Add(24 * time.Hour))
// Create some new rate limiters
limiter1 := getIPUserLimiter("1.1.1.1", "user1")
limiter2 := getIPUserLimiter("2.2.2.2", "user2")
if limiter1 == nil || limiter2 == nil {
t.Fatal("Failed to create test limiters")
}
// Cleanup limiters older than 1 second from now (should keep both)
time.Sleep(10 * time.Millisecond) // Small delay to ensure timestamp difference
cleanupOldRateLimiters(time.Now().Add(-1 * time.Second))
// Verify they still exist (should get same instance)
if getIPUserLimiter("1.1.1.1", "user1") != limiter1 {
t.Error("Limiter 1 was incorrectly cleaned up")
@@ -85,10 +85,10 @@ func TestCleanupOldRateLimiters(t *testing.T) {
if getIPUserLimiter("2.2.2.2", "user2") != limiter2 {
t.Error("Limiter 2 was incorrectly cleaned up")
}
// Cleanup limiters older than 1 hour from now (should remove both)
cleanupOldRateLimiters(time.Now().Add(2 * time.Hour))
// Getting them again should create new instances
newLimiter1 := getIPUserLimiter("1.1.1.1", "user1")
if newLimiter1 == limiter1 {
@@ -107,14 +107,14 @@ func TestIPv4Extraction(t *testing.T) {
{"IPv4 without port", "192.168.1.1", "192.168.1.1"},
{"Localhost with port", "127.0.0.1:3000", "127.0.0.1"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.input
if host, _, err := net.SplitHostPort(result); err == nil {
result = host
}
if result != tt.expected {
t.Errorf("Expected %s, got %s", tt.expected, result)
}
@@ -122,7 +122,7 @@ func TestIPv4Extraction(t *testing.T) {
}
}
// TestIPv6Extraction tests extracting IPv6 addresses
// TestIPv6Extraction tests extracting IPv6 addresses
func TestIPv6Extraction(t *testing.T) {
tests := []struct {
name string
@@ -134,14 +134,14 @@ func TestIPv6Extraction(t *testing.T) {
{"IPv6 without port", "2001:db8::1", "2001:db8::1"},
{"IPv6 localhost", "::1", "::1"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.input
if host, _, err := net.SplitHostPort(result); err == nil {
result = host
}
if result != tt.expected {
t.Errorf("Expected %s, got %s", tt.expected, result)
}
@@ -160,14 +160,14 @@ func TestIPExtractionEdgeCases(t *testing.T) {
{"Empty string", "", ""},
{"Just port", ":8080", ""},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.input
if host, _, err := net.SplitHostPort(result); err == nil {
result = host
}
if result != tt.expected {
t.Errorf("Expected %s, got %s", tt.expected, result)
}

View File

@@ -101,20 +101,20 @@ func (ja *JWTAuthenticator) AuthViaJWT(
// Token is valid, extract payload
claims := token.Claims.(jwt.MapClaims)
// Use shared helper to get user from JWT claims
var user *schema.User
user, err = getUserFromJWT(claims, Keys.JwtConfig.ValidateUser, schema.AuthToken, -1)
if err != nil {
return nil, err
}
// If not validating user, we only get roles from JWT (no projects for this auth method)
if !Keys.JwtConfig.ValidateUser {
user.Roles = extractRolesFromClaims(claims, false)
user.Projects = nil // Standard JWT auth doesn't include projects
}
return user, nil
}

View File

@@ -146,13 +146,13 @@ func (ja *JWTCookieSessionAuthenticator) Login(
}
claims := token.Claims.(jwt.MapClaims)
// Use shared helper to get user from JWT claims
user, err = getUserFromJWT(claims, jc.ValidateUser, schema.AuthSession, schema.AuthViaToken)
if err != nil {
return nil, err
}
// Sync or update user if configured
if !jc.ValidateUser && (jc.SyncUserOnLogin || jc.UpdateUserOnLogin) {
handleTokenUser(user)

View File

@@ -28,7 +28,7 @@ func extractStringFromClaims(claims jwt.MapClaims, key string) string {
// If validateRoles is true, only valid roles are returned
func extractRolesFromClaims(claims jwt.MapClaims, validateRoles bool) []string {
var roles []string
if rawroles, ok := claims["roles"].([]any); ok {
for _, rr := range rawroles {
if r, ok := rr.(string); ok {
@@ -42,14 +42,14 @@ func extractRolesFromClaims(claims jwt.MapClaims, validateRoles bool) []string {
}
}
}
return roles
}
// extractProjectsFromClaims extracts projects from JWT claims
func extractProjectsFromClaims(claims jwt.MapClaims) []string {
projects := make([]string, 0)
if rawprojs, ok := claims["projects"].([]any); ok {
for _, pp := range rawprojs {
if p, ok := pp.(string); ok {
@@ -61,7 +61,7 @@ func extractProjectsFromClaims(claims jwt.MapClaims) []string {
projects = append(projects, projSlice...)
}
}
return projects
}
@@ -72,14 +72,14 @@ func extractNameFromClaims(claims jwt.MapClaims) string {
if name, ok := claims["name"].(string); ok {
return name
}
// Try nested structure: {name: {values: [...]}}
if wrap, ok := claims["name"].(map[string]any); ok {
if vals, ok := wrap["values"].([]any); ok {
if len(vals) == 0 {
return ""
}
name := fmt.Sprintf("%v", vals[0])
for i := 1; i < len(vals); i++ {
name += fmt.Sprintf(" %v", vals[i])
@@ -87,7 +87,7 @@ func extractNameFromClaims(claims jwt.MapClaims) string {
return name
}
}
return ""
}
@@ -100,7 +100,7 @@ func getUserFromJWT(claims jwt.MapClaims, validateUser bool, authType schema.Aut
if sub == "" {
return nil, errors.New("missing 'sub' claim in JWT")
}
if validateUser {
// Validate user against database
ur := repository.GetUserRepository()
@@ -109,22 +109,22 @@ func getUserFromJWT(claims jwt.MapClaims, validateUser bool, authType schema.Aut
cclog.Errorf("Error while loading user '%v': %v", sub, err)
return nil, fmt.Errorf("database error: %w", err)
}
// Deny any logins for unknown usernames
if user == nil || err == sql.ErrNoRows {
cclog.Warn("Could not find user from JWT in internal database.")
return nil, errors.New("unknown user")
}
// Return database user (with database roles)
return user, nil
}
// Create user from JWT claims
name := extractNameFromClaims(claims)
roles := extractRolesFromClaims(claims, true) // Validate roles
projects := extractProjectsFromClaims(claims)
return &schema.User{
Username: sub,
Name: name,

View File

@@ -19,7 +19,7 @@ func TestExtractStringFromClaims(t *testing.T) {
"email": "test@example.com",
"age": 25, // not a string
}
tests := []struct {
name string
key string
@@ -30,7 +30,7 @@ func TestExtractStringFromClaims(t *testing.T) {
{"Non-existent key", "missing", ""},
{"Non-string value", "age", ""},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := extractStringFromClaims(claims, tt.key)
@@ -88,16 +88,16 @@ func TestExtractRolesFromClaims(t *testing.T) {
expected: []string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := extractRolesFromClaims(tt.claims, tt.validateRoles)
if len(result) != len(tt.expected) {
t.Errorf("Expected %d roles, got %d", len(tt.expected), len(result))
return
}
for i, role := range result {
if i >= len(tt.expected) || role != tt.expected[i] {
t.Errorf("Expected role %s at position %d, got %s", tt.expected[i], i, role)
@@ -141,16 +141,16 @@ func TestExtractProjectsFromClaims(t *testing.T) {
expected: []string{"project1", "project2"}, // Should skip non-strings
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := extractProjectsFromClaims(tt.claims)
if len(result) != len(tt.expected) {
t.Errorf("Expected %d projects, got %d", len(tt.expected), len(result))
return
}
for i, project := range result {
if i >= len(tt.expected) || project != tt.expected[i] {
t.Errorf("Expected project %s at position %d, got %s", tt.expected[i], i, project)
@@ -216,7 +216,7 @@ func TestExtractNameFromClaims(t *testing.T) {
expected: "123 Smith", // Should convert to string
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := extractNameFromClaims(tt.claims)
@@ -235,29 +235,28 @@ func TestGetUserFromJWT_NoValidation(t *testing.T) {
"roles": []any{"user", "admin"},
"projects": []any{"project1", "project2"},
}
user, err := getUserFromJWT(claims, false, schema.AuthToken, -1)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if user.Username != "testuser" {
t.Errorf("Expected username 'testuser', got '%s'", user.Username)
}
if user.Name != "Test User" {
t.Errorf("Expected name 'Test User', got '%s'", user.Name)
}
if len(user.Roles) != 2 {
t.Errorf("Expected 2 roles, got %d", len(user.Roles))
}
if len(user.Projects) != 2 {
t.Errorf("Expected 2 projects, got %d", len(user.Projects))
}
if user.AuthType != schema.AuthToken {
t.Errorf("Expected AuthType %v, got %v", schema.AuthToken, user.AuthType)
}
@@ -268,13 +267,13 @@ func TestGetUserFromJWT_MissingSub(t *testing.T) {
claims := jwt.MapClaims{
"name": "Test User",
}
_, err := getUserFromJWT(claims, false, schema.AuthToken, -1)
if err == nil {
t.Error("Expected error for missing sub claim")
}
if err.Error() != "missing 'sub' claim in JWT" {
t.Errorf("Expected specific error message, got: %v", err)
}

View File

@@ -75,13 +75,13 @@ func (ja *JWTSessionAuthenticator) Login(
}
claims := token.Claims.(jwt.MapClaims)
// Use shared helper to get user from JWT claims
user, err = getUserFromJWT(claims, Keys.JwtConfig.ValidateUser, schema.AuthSession, schema.AuthViaToken)
if err != nil {
return nil, err
}
// Sync or update user if configured
if !Keys.JwtConfig.ValidateUser && (Keys.JwtConfig.SyncUserOnLogin || Keys.JwtConfig.UpdateUserOnLogin) {
handleTokenUser(user)

View File

@@ -59,7 +59,7 @@ func NewOIDC(a *Authentication) *OIDC {
// Use context with timeout for provider initialization
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
provider, err := oidc.NewProvider(ctx, Keys.OpenIDConfig.Provider)
if err != nil {
cclog.Fatal(err)
@@ -119,7 +119,7 @@ func (oa *OIDC) OAuth2Callback(rw http.ResponseWriter, r *http.Request) {
// Exchange authorization code for token with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
token, err := oa.client.Exchange(ctx, code, oauth2.VerifierOption(codeVerifier))
if err != nil {
http.Error(rw, "Failed to exchange token: "+err.Error(), http.StatusInternalServerError)

View File

@@ -111,14 +111,6 @@ type FilterRanges struct {
StartTime *TimeRange `json:"startTime"`
}
type ClusterConfig struct {
Name string `json:"name"`
FilterRanges *FilterRanges `json:"filterRanges"`
MetricDataRepository json.RawMessage `json:"metricDataRepository"`
}
var Clusters []*ClusterConfig
var Keys ProgramConfig = ProgramConfig{
Addr: "localhost:8080",
DisableAuthentication: false,
@@ -132,7 +124,7 @@ var Keys ProgramConfig = ProgramConfig{
ShortRunningJobsDuration: 5 * 60,
}
func Init(mainConfig json.RawMessage, clusterConfig json.RawMessage) {
func Init(mainConfig json.RawMessage) {
Validate(configSchema, mainConfig)
dec := json.NewDecoder(bytes.NewReader(mainConfig))
dec.DisallowUnknownFields()
@@ -140,17 +132,6 @@ func Init(mainConfig json.RawMessage, clusterConfig json.RawMessage) {
cclog.Abortf("Config Init: Could not decode config file '%s'.\nError: %s\n", mainConfig, err.Error())
}
Validate(clustersSchema, clusterConfig)
dec = json.NewDecoder(bytes.NewReader(clusterConfig))
dec.DisallowUnknownFields()
if err := dec.Decode(&Clusters); err != nil {
cclog.Abortf("Config Init: Could not decode config file '%s'.\nError: %s\n", mainConfig, err.Error())
}
if len(Clusters) < 1 {
cclog.Abort("Config Init: At least one cluster required in config. Exited with error.")
}
if Keys.EnableResampling != nil && Keys.EnableResampling.MinimumPoints > 0 {
resampler.SetMinimumRequiredPoints(Keys.EnableResampling.MinimumPoints)
}

View File

@@ -16,11 +16,7 @@ func TestInit(t *testing.T) {
fp := "../../configs/config.json"
ccconf.Init(fp)
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {
if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil {
Init(cfg, clustercfg)
} else {
cclog.Abort("Cluster configuration must be present")
}
Init(cfg)
} else {
cclog.Abort("Main configuration must be present")
}
@@ -34,11 +30,7 @@ func TestInitMinimal(t *testing.T) {
fp := "../../configs/config-demo.json"
ccconf.Init(fp)
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {
if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil {
Init(cfg, clustercfg)
} else {
cclog.Abort("Cluster configuration must be present")
}
Init(cfg)
} else {
cclog.Abort("Main configuration must be present")
}

View File

@@ -138,83 +138,3 @@ var configSchema = `
},
"required": ["apiAllowedIPs"]
}`
var clustersSchema = `
{
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"description": "The name of the cluster.",
"type": "string"
},
"metricDataRepository": {
"description": "Type of the metric data repository for this cluster",
"type": "object",
"properties": {
"kind": {
"type": "string",
"enum": ["influxdb", "prometheus", "cc-metric-store", "cc-metric-store-internal", "test"]
},
"url": {
"type": "string"
},
"token": {
"type": "string"
}
},
"required": ["kind"]
},
"filterRanges": {
"description": "This option controls the slider ranges for the UI controls of numNodes, duration, and startTime.",
"type": "object",
"properties": {
"numNodes": {
"description": "UI slider range for number of nodes",
"type": "object",
"properties": {
"from": {
"type": "integer"
},
"to": {
"type": "integer"
}
},
"required": ["from", "to"]
},
"duration": {
"description": "UI slider range for duration",
"type": "object",
"properties": {
"from": {
"type": "integer"
},
"to": {
"type": "integer"
}
},
"required": ["from", "to"]
},
"startTime": {
"description": "UI slider range for start time",
"type": "object",
"properties": {
"from": {
"type": "string",
"format": "date-time"
},
"to": {
"type": "null"
}
},
"required": ["from", "to"]
}
},
"required": ["numNodes", "duration", "startTime"]
}
},
"required": ["name", "metricDataRepository", "filterRanges"],
"minItems": 1
}
}`

View File

@@ -19,7 +19,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/generated"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
@@ -484,7 +484,7 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str
return nil, err
}
data, err := metricDataDispatcher.LoadData(job, metrics, scopes, ctx, *resolution)
data, err := metricdispatch.LoadData(job, metrics, scopes, ctx, *resolution)
if err != nil {
cclog.Warn("Error while loading job data")
return nil, err
@@ -512,7 +512,7 @@ func (r *queryResolver) JobStats(ctx context.Context, id string, metrics []strin
return nil, err
}
data, err := metricDataDispatcher.LoadJobStats(job, metrics, ctx)
data, err := metricdispatch.LoadJobStats(job, metrics, ctx)
if err != nil {
cclog.Warnf("Error while loading jobStats data for job id %s", id)
return nil, err
@@ -537,7 +537,7 @@ func (r *queryResolver) ScopedJobStats(ctx context.Context, id string, metrics [
return nil, err
}
data, err := metricDataDispatcher.LoadScopedJobStats(job, metrics, scopes, ctx)
data, err := metricdispatch.LoadScopedJobStats(job, metrics, scopes, ctx)
if err != nil {
cclog.Warnf("Error while loading scopedJobStats data for job id %s", id)
return nil, err
@@ -702,7 +702,7 @@ func (r *queryResolver) JobsMetricStats(ctx context.Context, filter []*model.Job
res := []*model.JobStats{}
for _, job := range jobs {
data, err := metricDataDispatcher.LoadJobStats(job, metrics, ctx)
data, err := metricdispatch.LoadJobStats(job, metrics, ctx)
if err != nil {
cclog.Warnf("Error while loading comparison jobStats data for job id %d", job.JobID)
continue
@@ -759,7 +759,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, nodes [
}
}
data, err := metricDataDispatcher.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
data, err := metricdispatch.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
if err != nil {
cclog.Warn("error while loading node data")
return nil, err
@@ -825,7 +825,7 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub
}
}
data, err := metricDataDispatcher.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, *resolution, from, to, ctx)
data, err := metricdispatch.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, *resolution, from, to, ctx)
if err != nil {
cclog.Warn("error while loading node data (Resolver.NodeMetricsList")
return nil, err
@@ -880,7 +880,7 @@ func (r *queryResolver) ClusterMetrics(ctx context.Context, cluster string, metr
// 'nodes' == nil -> Defaults to all nodes of cluster for existing query workflow
scopes := []schema.MetricScope{"node"}
data, err := metricDataDispatcher.LoadNodeData(cluster, metrics, nil, scopes, from, to, ctx)
data, err := metricdispatch.LoadNodeData(cluster, metrics, nil, scopes, from, to, ctx)
if err != nil {
cclog.Warn("error while loading node data")
return nil, err

View File

@@ -13,7 +13,7 @@ import (
"github.com/99designs/gqlgen/graphql"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
)
@@ -55,7 +55,7 @@ func (r *queryResolver) rooflineHeatmap(
// resolution = max(resolution, mc.Timestep)
// }
jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0)
jobdata, err := metricdispatch.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0)
if err != nil {
cclog.Errorf("Error while loading roofline metrics for job %d", job.ID)
return nil, err
@@ -128,7 +128,7 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
continue
}
if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil {
if err := metricdispatch.LoadAverages(job, metrics, avgs, ctx); err != nil {
cclog.Error("Error while loading averages for footprint")
return nil, err
}

View File

@@ -121,11 +121,7 @@ func setup(t *testing.T) *repository.JobRepository {
// Load and check main configuration
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {
if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil {
config.Init(cfg, clustercfg)
} else {
t.Fatal("Cluster configuration must be present")
}
config.Init(cfg)
} else {
t.Fatal("Main configuration must be present")
}

View File

@@ -1,381 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package metricDataDispatcher
import (
"context"
"fmt"
"math"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/lrucache"
"github.com/ClusterCockpit/cc-lib/v2/resampler"
"github.com/ClusterCockpit/cc-lib/v2/schema"
)
var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
func cacheKey(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
resolution int,
) string {
// Duration and StartTime do not need to be in the cache key as StartTime is less unique than
// job.ID and the TTL of the cache entry makes sure it does not stay there forever.
return fmt.Sprintf("%d(%s):[%v],[%v]-%d",
job.ID, job.State, metrics, scopes, resolution)
}
// Fetches the metric data for a job.
func LoadData(job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
resolution int,
) (schema.JobData, error) {
data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ any, ttl time.Duration, size int) {
var jd schema.JobData
var err error
if job.State == schema.JobStateRunning ||
job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving ||
config.Keys.DisableArchive {
repo, err := metricdata.GetMetricDataRepo(job.Cluster)
if err != nil {
return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster), 0, 0
}
if scopes == nil {
scopes = append(scopes, schema.MetricScopeNode)
}
if metrics == nil {
cluster := archive.GetCluster(job.Cluster)
for _, mc := range cluster.MetricConfig {
metrics = append(metrics, mc.Name)
}
}
jd, err = repo.LoadData(job, metrics, scopes, ctx, resolution)
if err != nil {
if len(jd) != 0 {
cclog.Warnf("partial error: %s", err.Error())
// return err, 0, 0 // Reactivating will block archiving on one partial error
} else {
cclog.Error("Error while loading job data from metric repository")
return err, 0, 0
}
}
size = jd.Size()
} else {
var jd_temp schema.JobData
jd_temp, err = archive.GetHandle().LoadJobData(job)
if err != nil {
cclog.Error("Error while loading job data from archive")
return err, 0, 0
}
// Deep copy the cached archive hashmap
jd = metricdata.DeepCopy(jd_temp)
// Resampling for archived data.
// Pass the resolution from frontend here.
for _, v := range jd {
for _, v_ := range v {
timestep := int64(0)
for i := 0; i < len(v_.Series); i += 1 {
v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, int64(v_.Timestep), int64(resolution))
if err != nil {
return err, 0, 0
}
}
v_.Timestep = int(timestep)
}
}
// Avoid sending unrequested data to the client:
if metrics != nil || scopes != nil {
if metrics == nil {
metrics = make([]string, 0, len(jd))
for k := range jd {
metrics = append(metrics, k)
}
}
res := schema.JobData{}
for _, metric := range metrics {
if perscope, ok := jd[metric]; ok {
if len(perscope) > 1 {
subset := make(map[schema.MetricScope]*schema.JobMetric)
for _, scope := range scopes {
if jm, ok := perscope[scope]; ok {
subset[scope] = jm
}
}
if len(subset) > 0 {
perscope = subset
}
}
res[metric] = perscope
}
}
jd = res
}
size = jd.Size()
}
ttl = 5 * time.Hour
if job.State == schema.JobStateRunning {
ttl = 2 * time.Minute
}
// FIXME: Review: Is this really necessary or correct.
// Note: Lines 147-170 formerly known as prepareJobData(jobData, scopes)
// For /monitoring/job/<job> and some other places, flops_any and mem_bw need
// to be available at the scope 'node'. If a job has a lot of nodes,
// statisticsSeries should be available so that a min/median/max Graph can be
// used instead of a lot of single lines.
// NOTE: New StatsSeries will always be calculated as 'min/median/max'
// Existing (archived) StatsSeries can be 'min/mean/max'!
const maxSeriesSize int = 15
for _, scopes := range jd {
for _, jm := range scopes {
if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize {
continue
}
jm.AddStatisticsSeries()
}
}
nodeScopeRequested := false
for _, scope := range scopes {
if scope == schema.MetricScopeNode {
nodeScopeRequested = true
}
}
if nodeScopeRequested {
jd.AddNodeScope("flops_any")
jd.AddNodeScope("mem_bw")
}
// Round Resulting Stat Values
jd.RoundMetricStats()
return jd, ttl, size
})
if err, ok := data.(error); ok {
cclog.Error("Error in returned dataset")
return nil, err
}
return data.(schema.JobData), nil
}
// Used for the jobsFootprint GraphQL-Query. TODO: Rename/Generalize.
func LoadAverages(
job *schema.Job,
metrics []string,
data [][]schema.Float,
ctx context.Context,
) error {
if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here?
}
repo, err := metricdata.GetMetricDataRepo(job.Cluster)
if err != nil {
return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster)
}
stats, err := repo.LoadStats(job, metrics, ctx) // #166 how to handle stats for acc normalizazion?
if err != nil {
cclog.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project)
return err
}
for i, m := range metrics {
nodes, ok := stats[m]
if !ok {
data[i] = append(data[i], schema.NaN)
continue
}
sum := 0.0
for _, node := range nodes {
sum += node.Avg
}
data[i] = append(data[i], schema.Float(sum))
}
return nil
}
// Used for statsTable in frontend: Return scoped statistics by metric.
func LoadScopedJobStats(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
) (schema.ScopedJobStats, error) {
if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
return archive.LoadScopedStatsFromArchive(job, metrics, scopes)
}
repo, err := metricdata.GetMetricDataRepo(job.Cluster)
if err != nil {
return nil, fmt.Errorf("job %d: no metric data repository configured for '%s'", job.JobID, job.Cluster)
}
scopedStats, err := repo.LoadScopedStats(job, metrics, scopes, ctx)
if err != nil {
cclog.Errorf("error while loading scoped statistics for job %d (User %s, Project %s)", job.JobID, job.User, job.Project)
return nil, err
}
return scopedStats, nil
}
// Used for polar plots in frontend: Aggregates statistics for all nodes to single values for job per metric.
func LoadJobStats(
job *schema.Job,
metrics []string,
ctx context.Context,
) (map[string]schema.MetricStatistics, error) {
if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
return archive.LoadStatsFromArchive(job, metrics)
}
data := make(map[string]schema.MetricStatistics, len(metrics))
repo, err := metricdata.GetMetricDataRepo(job.Cluster)
if err != nil {
return data, fmt.Errorf("job %d: no metric data repository configured for '%s'", job.JobID, job.Cluster)
}
stats, err := repo.LoadStats(job, metrics, ctx)
if err != nil {
cclog.Errorf("error while loading statistics for job %d (User %s, Project %s)", job.JobID, job.User, job.Project)
return data, err
}
for _, m := range metrics {
sum, avg, min, max := 0.0, 0.0, 0.0, 0.0
nodes, ok := stats[m]
if !ok {
data[m] = schema.MetricStatistics{Min: min, Avg: avg, Max: max}
continue
}
for _, node := range nodes {
sum += node.Avg
min = math.Min(min, node.Min)
max = math.Max(max, node.Max)
}
data[m] = schema.MetricStatistics{
Avg: (math.Round((sum/float64(job.NumNodes))*100) / 100),
Min: (math.Round(min*100) / 100),
Max: (math.Round(max*100) / 100),
}
}
return data, nil
}
// Used for the classic node/system view. Returns a map of nodes to a map of metrics.
func LoadNodeData(
cluster string,
metrics, nodes []string,
scopes []schema.MetricScope,
from, to time.Time,
ctx context.Context,
) (map[string]map[string][]*schema.JobMetric, error) {
repo, err := metricdata.GetMetricDataRepo(cluster)
if err != nil {
return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster)
}
if metrics == nil {
for _, m := range archive.GetCluster(cluster).MetricConfig {
metrics = append(metrics, m.Name)
}
}
data, err := repo.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
if err != nil {
if len(data) != 0 {
cclog.Warnf("partial error: %s", err.Error())
} else {
cclog.Error("Error while loading node data from metric repository")
return nil, err
}
}
if data == nil {
return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster)
}
return data, nil
}
func LoadNodeListData(
cluster, subCluster string,
nodes []string,
metrics []string,
scopes []schema.MetricScope,
resolution int,
from, to time.Time,
ctx context.Context,
) (map[string]schema.JobData, error) {
repo, err := metricdata.GetMetricDataRepo(cluster)
if err != nil {
return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster)
}
if metrics == nil {
for _, m := range archive.GetCluster(cluster).MetricConfig {
metrics = append(metrics, m.Name)
}
}
data, err := repo.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx)
if err != nil {
if len(data) != 0 {
cclog.Warnf("partial error: %s", err.Error())
} else {
cclog.Error("Error while loading node data from metric repository")
return nil, err
}
}
// NOTE: New StatsSeries will always be calculated as 'min/median/max'
const maxSeriesSize int = 8
for _, jd := range data {
for _, scopes := range jd {
for _, jm := range scopes {
if jm.StatisticsSeries != nil || len(jm.Series) < maxSeriesSize {
continue
}
jm.AddStatisticsSeries()
}
}
}
if data == nil {
return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster)
}
return data, nil
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,88 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package metricdata
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/memorystore"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
)
type MetricDataRepository interface {
// Initialize this MetricDataRepository. One instance of
// this interface will only ever be responsible for one cluster.
Init(rawConfig json.RawMessage) error
// Return the JobData for the given job, only with the requested metrics.
LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error)
// Return a map of metrics to a map of nodes to the metric statistics of the job. node scope only.
LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error)
// Return a map of metrics to a map of scopes to the scoped metric statistics of the job.
LoadScopedStats(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.ScopedJobStats, error)
// Return a map of hosts to a map of metrics at the requested scopes (currently only node) for that node.
LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error)
// Return a map of hosts to a map of metrics to a map of scopes for multiple nodes.
LoadNodeListData(cluster, subCluster string, nodes, metrics []string, scopes []schema.MetricScope, resolution int, from, to time.Time, ctx context.Context) (map[string]schema.JobData, error)
}
var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{}
func Init() error {
for _, cluster := range config.Clusters {
if cluster.MetricDataRepository != nil {
var kind struct {
Kind string `json:"kind"`
}
if err := json.Unmarshal(cluster.MetricDataRepository, &kind); err != nil {
cclog.Warn("Error while unmarshaling raw json MetricDataRepository")
return err
}
var mdr MetricDataRepository
switch kind.Kind {
case "cc-metric-store":
mdr = &CCMetricStore{}
case "cc-metric-store-internal":
mdr = &CCMetricStoreInternal{}
memorystore.InternalCCMSFlag = true
case "prometheus":
mdr = &PrometheusDataRepository{}
case "test":
mdr = &TestMetricDataRepository{}
default:
return fmt.Errorf("METRICDATA/METRICDATA > Unknown MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name)
}
if err := mdr.Init(cluster.MetricDataRepository); err != nil {
cclog.Errorf("Error initializing MetricDataRepository %v for cluster %v", kind.Kind, cluster.Name)
return err
}
metricDataRepos[cluster.Name] = mdr
}
}
return nil
}
func GetMetricDataRepo(cluster string) (MetricDataRepository, error) {
var err error
repo, ok := metricDataRepos[cluster]
if !ok {
err = fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster)
}
return repo, err
}

View File

@@ -1,587 +0,0 @@
// Copyright (C) 2022 DKRZ
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package metricdata
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"math"
"net/http"
"os"
"regexp"
"sort"
"strings"
"sync"
"text/template"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
promapi "github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
promcfg "github.com/prometheus/common/config"
promm "github.com/prometheus/common/model"
)
type PrometheusDataRepositoryConfig struct {
Url string `json:"url"`
Username string `json:"username,omitempty"`
Suffix string `json:"suffix,omitempty"`
Templates map[string]string `json:"query-templates"`
}
type PrometheusDataRepository struct {
client promapi.Client
queryClient promv1.API
suffix string
templates map[string]*template.Template
}
type PromQLArgs struct {
Nodes string
}
type Trie map[rune]Trie
var logOnce sync.Once
func contains(s []schema.MetricScope, str schema.MetricScope) bool {
for _, v := range s {
if v == str {
return true
}
}
return false
}
func MinMaxMean(data []schema.Float) (float64, float64, float64) {
if len(data) == 0 {
return 0.0, 0.0, 0.0
}
min := math.MaxFloat64
max := -math.MaxFloat64
var sum float64
var n float64
for _, val := range data {
if val.IsNaN() {
continue
}
sum += float64(val)
n += 1
if float64(val) > max {
max = float64(val)
}
if float64(val) < min {
min = float64(val)
}
}
return min, max, sum / n
}
// Rewritten from
// https://github.com/ermanh/trieregex/blob/master/trieregex/trieregex.py
func nodeRegex(nodes []string) string {
root := Trie{}
// add runes of each compute node to trie
for _, node := range nodes {
_trie := root
for _, c := range node {
if _, ok := _trie[c]; !ok {
_trie[c] = Trie{}
}
_trie = _trie[c]
}
_trie['*'] = Trie{}
}
// recursively build regex from rune trie
var trieRegex func(trie Trie, reset bool) string
trieRegex = func(trie Trie, reset bool) string {
if reset == true {
trie = root
}
if len(trie) == 0 {
return ""
}
if len(trie) == 1 {
for key, _trie := range trie {
if key == '*' {
return ""
}
return regexp.QuoteMeta(string(key)) + trieRegex(_trie, false)
}
} else {
sequences := []string{}
for key, _trie := range trie {
if key != '*' {
sequences = append(sequences, regexp.QuoteMeta(string(key))+trieRegex(_trie, false))
}
}
sort.Slice(sequences, func(i, j int) bool {
return (-len(sequences[i]) < -len(sequences[j])) || (sequences[i] < sequences[j])
})
var result string
// single edge from this tree node
if len(sequences) == 1 {
result = sequences[0]
if len(result) > 1 {
result = "(?:" + result + ")"
}
// multiple edges, each length 1
} else if s := strings.Join(sequences, ""); len(s) == len(sequences) {
// char or numeric range
if len(s)-1 == int(s[len(s)-1])-int(s[0]) {
result = fmt.Sprintf("[%c-%c]", s[0], s[len(s)-1])
// char or numeric set
} else {
result = "[" + s + "]"
}
// multiple edges of different lengths
} else {
result = "(?:" + strings.Join(sequences, "|") + ")"
}
if _, ok := trie['*']; ok {
result += "?"
}
return result
}
return ""
}
return trieRegex(root, true)
}
func (pdb *PrometheusDataRepository) Init(rawConfig json.RawMessage) error {
var config PrometheusDataRepositoryConfig
// parse config
if err := json.Unmarshal(rawConfig, &config); err != nil {
cclog.Warn("Error while unmarshaling raw json config")
return err
}
// support basic authentication
var rt http.RoundTripper = nil
if prom_pw := os.Getenv("PROMETHEUS_PASSWORD"); prom_pw != "" && config.Username != "" {
prom_pw := promcfg.Secret(prom_pw)
rt = promcfg.NewBasicAuthRoundTripper(promcfg.NewInlineSecret(config.Username), promcfg.NewInlineSecret(string(prom_pw)), promapi.DefaultRoundTripper)
} else {
if config.Username != "" {
return errors.New("METRICDATA/PROMETHEUS > Prometheus username provided, but PROMETHEUS_PASSWORD not set")
}
}
// init client
client, err := promapi.NewClient(promapi.Config{
Address: config.Url,
RoundTripper: rt,
})
if err != nil {
cclog.Error("Error while initializing new prometheus client")
return err
}
// init query client
pdb.client = client
pdb.queryClient = promv1.NewAPI(pdb.client)
// site config
pdb.suffix = config.Suffix
// init query templates
pdb.templates = make(map[string]*template.Template)
for metric, templ := range config.Templates {
pdb.templates[metric], err = template.New(metric).Parse(templ)
if err == nil {
cclog.Debugf("Added PromQL template for %s: %s", metric, templ)
} else {
cclog.Warnf("Failed to parse PromQL template %s for metric %s", templ, metric)
}
}
return nil
}
// TODO: respect scope argument
func (pdb *PrometheusDataRepository) FormatQuery(
metric string,
scope schema.MetricScope,
nodes []string,
cluster string,
) (string, error) {
args := PromQLArgs{}
if len(nodes) > 0 {
args.Nodes = fmt.Sprintf("(%s)%s", nodeRegex(nodes), pdb.suffix)
} else {
args.Nodes = fmt.Sprintf(".*%s", pdb.suffix)
}
buf := &bytes.Buffer{}
if templ, ok := pdb.templates[metric]; ok {
err := templ.Execute(buf, args)
if err != nil {
return "", errors.New(fmt.Sprintf("METRICDATA/PROMETHEUS > Error compiling template %v", templ))
} else {
query := buf.String()
cclog.Debugf("PromQL: %s", query)
return query, nil
}
} else {
return "", errors.New(fmt.Sprintf("METRICDATA/PROMETHEUS > No PromQL for metric %s configured.", metric))
}
}
// Convert PromAPI row to CC schema.Series
func (pdb *PrometheusDataRepository) RowToSeries(
from time.Time,
step int64,
steps int64,
row *promm.SampleStream,
) schema.Series {
ts := from.Unix()
hostname := strings.TrimSuffix(string(row.Metric["exported_instance"]), pdb.suffix)
// init array of expected length with NaN
values := make([]schema.Float, steps+1)
for i := range values {
values[i] = schema.NaN
}
// copy recorded values from prom sample pair
for _, v := range row.Values {
idx := (v.Timestamp.Unix() - ts) / step
values[idx] = schema.Float(v.Value)
}
min, max, mean := MinMaxMean(values)
// output struct
return schema.Series{
Hostname: hostname,
Data: values,
Statistics: schema.MetricStatistics{
Avg: mean,
Min: min,
Max: max,
},
}
}
func (pdb *PrometheusDataRepository) LoadData(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
resolution int,
) (schema.JobData, error) {
// TODO respect requested scope
if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) {
scopes = append(scopes, schema.MetricScopeNode)
}
jobData := make(schema.JobData)
// parse job specs
nodes := make([]string, len(job.Resources))
for i, resource := range job.Resources {
nodes[i] = resource.Hostname
}
from := time.Unix(job.StartTime, 0)
to := time.Unix(job.StartTime+int64(job.Duration), 0)
for _, scope := range scopes {
if scope != schema.MetricScopeNode {
logOnce.Do(func() {
cclog.Infof("Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)
})
continue
}
for _, metric := range metrics {
metricConfig := archive.GetMetricConfig(job.Cluster, metric)
if metricConfig == nil {
cclog.Warnf("Error in LoadData: Metric %s for cluster %s not configured", metric, job.Cluster)
return nil, errors.New("Prometheus config error")
}
query, err := pdb.FormatQuery(metric, scope, nodes, job.Cluster)
if err != nil {
cclog.Warn("Error while formatting prometheus query")
return nil, err
}
// ranged query over all job nodes
r := promv1.Range{
Start: from,
End: to,
Step: time.Duration(metricConfig.Timestep * 1e9),
}
result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r)
if err != nil {
cclog.Errorf("Prometheus query error in LoadData: %v\nQuery: %s", err, query)
return nil, errors.New("Prometheus query error")
}
if len(warnings) > 0 {
cclog.Warnf("Warnings: %v\n", warnings)
}
// init data structures
if _, ok := jobData[metric]; !ok {
jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric)
}
jobMetric, ok := jobData[metric][scope]
if !ok {
jobMetric = &schema.JobMetric{
Unit: metricConfig.Unit,
Timestep: metricConfig.Timestep,
Series: make([]schema.Series, 0),
}
}
step := int64(metricConfig.Timestep)
steps := int64(to.Sub(from).Seconds()) / step
// iter rows of host, metric, values
for _, row := range result.(promm.Matrix) {
jobMetric.Series = append(jobMetric.Series,
pdb.RowToSeries(from, step, steps, row))
}
// only add metric if at least one host returned data
if !ok && len(jobMetric.Series) > 0 {
jobData[metric][scope] = jobMetric
}
// sort by hostname to get uniform coloring
sort.Slice(jobMetric.Series, func(i, j int) bool {
return (jobMetric.Series[i].Hostname < jobMetric.Series[j].Hostname)
})
}
}
return jobData, nil
}
// TODO change implementation to precomputed/cached stats
func (pdb *PrometheusDataRepository) LoadStats(
job *schema.Job,
metrics []string,
ctx context.Context,
) (map[string]map[string]schema.MetricStatistics, error) {
// map of metrics of nodes of stats
stats := map[string]map[string]schema.MetricStatistics{}
data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/)
if err != nil {
cclog.Warn("Error while loading job for stats")
return nil, err
}
for metric, metricData := range data {
stats[metric] = make(map[string]schema.MetricStatistics)
for _, series := range metricData[schema.MetricScopeNode].Series {
stats[metric][series.Hostname] = series.Statistics
}
}
return stats, nil
}
func (pdb *PrometheusDataRepository) LoadNodeData(
cluster string,
metrics, nodes []string,
scopes []schema.MetricScope,
from, to time.Time,
ctx context.Context,
) (map[string]map[string][]*schema.JobMetric, error) {
t0 := time.Now()
// Map of hosts of metrics of value slices
data := make(map[string]map[string][]*schema.JobMetric)
// query db for each metric
// TODO: scopes seems to be always empty
if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) {
scopes = append(scopes, schema.MetricScopeNode)
}
for _, scope := range scopes {
if scope != schema.MetricScopeNode {
logOnce.Do(func() {
cclog.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)
})
continue
}
for _, metric := range metrics {
metricConfig := archive.GetMetricConfig(cluster, metric)
if metricConfig == nil {
cclog.Warnf("Error in LoadNodeData: Metric %s for cluster %s not configured", metric, cluster)
return nil, errors.New("Prometheus config error")
}
query, err := pdb.FormatQuery(metric, scope, nodes, cluster)
if err != nil {
cclog.Warn("Error while formatting prometheus query")
return nil, err
}
// ranged query over all nodes
r := promv1.Range{
Start: from,
End: to,
Step: time.Duration(metricConfig.Timestep * 1e9),
}
result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r)
if err != nil {
cclog.Errorf("Prometheus query error in LoadNodeData: %v\n", err)
return nil, errors.New("Prometheus query error")
}
if len(warnings) > 0 {
cclog.Warnf("Warnings: %v\n", warnings)
}
step := int64(metricConfig.Timestep)
steps := int64(to.Sub(from).Seconds()) / step
// iter rows of host, metric, values
for _, row := range result.(promm.Matrix) {
hostname := strings.TrimSuffix(string(row.Metric["exported_instance"]), pdb.suffix)
hostdata, ok := data[hostname]
if !ok {
hostdata = make(map[string][]*schema.JobMetric)
data[hostname] = hostdata
}
// output per host and metric
hostdata[metric] = append(hostdata[metric], &schema.JobMetric{
Unit: metricConfig.Unit,
Timestep: metricConfig.Timestep,
Series: []schema.Series{pdb.RowToSeries(from, step, steps, row)},
},
)
}
}
}
t1 := time.Since(t0)
cclog.Debugf("LoadNodeData of %v nodes took %s", len(data), t1)
return data, nil
}
// Implemented by NHR@FAU; Used in Job-View StatsTable
func (pdb *PrometheusDataRepository) LoadScopedStats(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
) (schema.ScopedJobStats, error) {
// Assumption: pdb.loadData() only returns series node-scope - use node scope for statsTable
scopedJobStats := make(schema.ScopedJobStats)
data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/)
if err != nil {
cclog.Warn("Error while loading job for scopedJobStats")
return nil, err
}
for metric, metricData := range data {
for _, scope := range scopes {
if scope != schema.MetricScopeNode {
logOnce.Do(func() {
cclog.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)
})
continue
}
if _, ok := scopedJobStats[metric]; !ok {
scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats)
}
if _, ok := scopedJobStats[metric][scope]; !ok {
scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0)
}
for _, series := range metricData[scope].Series {
scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{
Hostname: series.Hostname,
Data: &series.Statistics,
})
}
}
}
return scopedJobStats, nil
}
// Implemented by NHR@FAU; Used in NodeList-View
func (pdb *PrometheusDataRepository) LoadNodeListData(
cluster, subCluster string,
nodes []string,
metrics []string,
scopes []schema.MetricScope,
resolution int,
from, to time.Time,
ctx context.Context,
) (map[string]schema.JobData, error) {
// Assumption: pdb.loadData() only returns series node-scope - use node scope for NodeList
// Fetch Data, based on pdb.LoadNodeData()
t0 := time.Now()
// Map of hosts of jobData
data := make(map[string]schema.JobData)
// query db for each metric
// TODO: scopes seems to be always empty
if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) {
scopes = append(scopes, schema.MetricScopeNode)
}
for _, scope := range scopes {
if scope != schema.MetricScopeNode {
logOnce.Do(func() {
cclog.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)
})
continue
}
for _, metric := range metrics {
metricConfig := archive.GetMetricConfig(cluster, metric)
if metricConfig == nil {
cclog.Warnf("Error in LoadNodeListData: Metric %s for cluster %s not configured", metric, cluster)
return nil, errors.New("Prometheus config error")
}
query, err := pdb.FormatQuery(metric, scope, nodes, cluster)
if err != nil {
cclog.Warn("Error while formatting prometheus query")
return nil, err
}
// ranged query over all nodes
r := promv1.Range{
Start: from,
End: to,
Step: time.Duration(metricConfig.Timestep * 1e9),
}
result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r)
if err != nil {
cclog.Errorf("Prometheus query error in LoadNodeData: %v\n", err)
return nil, errors.New("Prometheus query error")
}
if len(warnings) > 0 {
cclog.Warnf("Warnings: %v\n", warnings)
}
step := int64(metricConfig.Timestep)
steps := int64(to.Sub(from).Seconds()) / step
// iter rows of host, metric, values
for _, row := range result.(promm.Matrix) {
hostname := strings.TrimSuffix(string(row.Metric["exported_instance"]), pdb.suffix)
hostdata, ok := data[hostname]
if !ok {
hostdata = make(schema.JobData)
data[hostname] = hostdata
}
metricdata, ok := hostdata[metric]
if !ok {
metricdata = make(map[schema.MetricScope]*schema.JobMetric)
data[hostname][metric] = metricdata
}
// output per host, metric and scope
scopeData, ok := metricdata[scope]
if !ok {
scopeData = &schema.JobMetric{
Unit: metricConfig.Unit,
Timestep: metricConfig.Timestep,
Series: []schema.Series{pdb.RowToSeries(from, step, steps, row)},
}
data[hostname][metric][scope] = scopeData
}
}
}
}
t1 := time.Since(t0)
cclog.Debugf("LoadNodeListData of %v nodes took %s", len(data), t1)
return data, nil
}

View File

@@ -1,118 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package metricdata
import (
"context"
"encoding/json"
"time"
"github.com/ClusterCockpit/cc-lib/v2/schema"
)
var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
panic("TODO")
}
// TestMetricDataRepository is only a mock for unit-testing.
type TestMetricDataRepository struct{}
func (tmdr *TestMetricDataRepository) Init(_ json.RawMessage) error {
return nil
}
func (tmdr *TestMetricDataRepository) LoadData(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
resolution int,
) (schema.JobData, error) {
return TestLoadDataCallback(job, metrics, scopes, ctx, resolution)
}
func (tmdr *TestMetricDataRepository) LoadStats(
job *schema.Job,
metrics []string,
ctx context.Context,
) (map[string]map[string]schema.MetricStatistics, error) {
panic("TODO")
}
func (tmdr *TestMetricDataRepository) LoadScopedStats(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
) (schema.ScopedJobStats, error) {
panic("TODO")
}
func (tmdr *TestMetricDataRepository) LoadNodeData(
cluster string,
metrics, nodes []string,
scopes []schema.MetricScope,
from, to time.Time,
ctx context.Context,
) (map[string]map[string][]*schema.JobMetric, error) {
panic("TODO")
}
func (tmdr *TestMetricDataRepository) LoadNodeListData(
cluster, subCluster string,
nodes []string,
metrics []string,
scopes []schema.MetricScope,
resolution int,
from, to time.Time,
ctx context.Context,
) (map[string]schema.JobData, error) {
panic("TODO")
}
func DeepCopy(jdTemp schema.JobData) schema.JobData {
jd := make(schema.JobData, len(jdTemp))
for k, v := range jdTemp {
jd[k] = make(map[schema.MetricScope]*schema.JobMetric, len(jdTemp[k]))
for k_, v_ := range v {
jd[k][k_] = new(schema.JobMetric)
jd[k][k_].Series = make([]schema.Series, len(v_.Series))
for i := 0; i < len(v_.Series); i += 1 {
jd[k][k_].Series[i].Data = make([]schema.Float, len(v_.Series[i].Data))
copy(jd[k][k_].Series[i].Data, v_.Series[i].Data)
jd[k][k_].Series[i].Hostname = v_.Series[i].Hostname
jd[k][k_].Series[i].Id = v_.Series[i].Id
jd[k][k_].Series[i].Statistics.Avg = v_.Series[i].Statistics.Avg
jd[k][k_].Series[i].Statistics.Min = v_.Series[i].Statistics.Min
jd[k][k_].Series[i].Statistics.Max = v_.Series[i].Statistics.Max
}
jd[k][k_].Timestep = v_.Timestep
jd[k][k_].Unit.Base = v_.Unit.Base
jd[k][k_].Unit.Prefix = v_.Unit.Prefix
if v_.StatisticsSeries != nil {
// Init Slices
jd[k][k_].StatisticsSeries = new(schema.StatsSeries)
jd[k][k_].StatisticsSeries.Max = make([]schema.Float, len(v_.StatisticsSeries.Max))
jd[k][k_].StatisticsSeries.Min = make([]schema.Float, len(v_.StatisticsSeries.Min))
jd[k][k_].StatisticsSeries.Median = make([]schema.Float, len(v_.StatisticsSeries.Median))
jd[k][k_].StatisticsSeries.Mean = make([]schema.Float, len(v_.StatisticsSeries.Mean))
// Copy Data
copy(jd[k][k_].StatisticsSeries.Max, v_.StatisticsSeries.Max)
copy(jd[k][k_].StatisticsSeries.Min, v_.StatisticsSeries.Min)
copy(jd[k][k_].StatisticsSeries.Median, v_.StatisticsSeries.Median)
copy(jd[k][k_].StatisticsSeries.Mean, v_.StatisticsSeries.Mean)
// Handle Percentiles
for k__, v__ := range v_.StatisticsSeries.Percentiles {
jd[k][k_].StatisticsSeries.Percentiles[k__] = make([]schema.Float, len(v__))
copy(jd[k][k_].StatisticsSeries.Percentiles[k__], v__)
}
} else {
jd[k][k_].StatisticsSeries = v_.StatisticsSeries
}
}
}
return jd
}

View File

@@ -0,0 +1,490 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
// Package metricdispatch provides a unified interface for loading and caching job metric data.
//
// This package serves as a central dispatcher that routes metric data requests to the appropriate
// backend based on job state. For running jobs, data is fetched from the metric store (e.g., cc-metric-store).
// For completed jobs, data is retrieved from the file-based job archive.
//
// # Key Features
//
// - Automatic backend selection based on job state (running vs. archived)
// - LRU cache for performance optimization (128 MB default cache size)
// - Data resampling using Largest Triangle Three Bucket algorithm for archived data
// - Automatic statistics series generation for jobs with many nodes
// - Support for scoped metrics (node, socket, accelerator, core)
//
// # Cache Behavior
//
// Cached data has different TTL (time-to-live) values depending on job state:
// - Running jobs: 2 minutes (data changes frequently)
// - Completed jobs: 5 hours (data is static)
//
// The cache key is based on job ID, state, requested metrics, scopes, and resolution.
//
// # Usage
//
// The primary entry point is LoadData, which automatically handles both running and archived jobs:
//
// jobData, err := metricdispatch.LoadData(job, metrics, scopes, ctx, resolution)
// if err != nil {
// // Handle error
// }
//
// For statistics only, use LoadJobStats, LoadScopedJobStats, or LoadAverages depending on the required format.
package metricdispatch
import (
"context"
"fmt"
"math"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/metricstore"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/lrucache"
"github.com/ClusterCockpit/cc-lib/v2/resampler"
"github.com/ClusterCockpit/cc-lib/v2/schema"
)
// cache is an LRU cache with 128 MB capacity for storing loaded job metric data.
// The cache reduces load on both the metric store and archive backends.
var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
// cacheKey generates a unique cache key for a job's metric data based on job ID, state,
// requested metrics, scopes, and resolution. Duration and StartTime are intentionally excluded
// because job.ID is more unique and the cache TTL ensures entries don't persist indefinitely.
func cacheKey(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
resolution int,
) string {
return fmt.Sprintf("%d(%s):[%v],[%v]-%d",
job.ID, job.State, metrics, scopes, resolution)
}
// LoadData retrieves metric data for a job from the appropriate backend (memory store for running jobs,
// archive for completed jobs) and applies caching, resampling, and statistics generation as needed.
//
// For running jobs or when archive is disabled, data is fetched from the metric store.
// For completed archived jobs, data is loaded from the job archive and resampled if needed.
//
// Parameters:
// - job: The job for which to load metric data
// - metrics: List of metric names to load (nil loads all metrics for the cluster)
// - scopes: Metric scopes to include (nil defaults to node scope)
// - ctx: Context for cancellation and timeouts
// - resolution: Target number of data points for resampling (only applies to archived data)
//
// Returns the loaded job data and any error encountered. For partial errors (some metrics failed),
// the function returns the successfully loaded data with a warning logged.
func LoadData(job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
resolution int,
) (schema.JobData, error) {
data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ any, ttl time.Duration, size int) {
var jd schema.JobData
var err error
if job.State == schema.JobStateRunning ||
job.MonitoringStatus == schema.MonitoringStatusRunningOrArchiving ||
config.Keys.DisableArchive {
if scopes == nil {
scopes = append(scopes, schema.MetricScopeNode)
}
if metrics == nil {
cluster := archive.GetCluster(job.Cluster)
for _, mc := range cluster.MetricConfig {
metrics = append(metrics, mc.Name)
}
}
jd, err = metricstore.LoadData(job, metrics, scopes, ctx, resolution)
if err != nil {
if len(jd) != 0 {
cclog.Warnf("partial error loading metrics from store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
} else {
cclog.Errorf("failed to load job data from metric store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
return err, 0, 0
}
}
size = jd.Size()
} else {
var jdTemp schema.JobData
jdTemp, err = archive.GetHandle().LoadJobData(job)
if err != nil {
cclog.Errorf("failed to load job data from archive for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
return err, 0, 0
}
jd = deepCopy(jdTemp)
// Resample archived data using Largest Triangle Three Bucket algorithm to reduce data points
// to the requested resolution, improving transfer performance and client-side rendering.
for _, v := range jd {
for _, v_ := range v {
timestep := int64(0)
for i := 0; i < len(v_.Series); i += 1 {
v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, int64(v_.Timestep), int64(resolution))
if err != nil {
return err, 0, 0
}
}
v_.Timestep = int(timestep)
}
}
// Filter job data to only include requested metrics and scopes, avoiding unnecessary data transfer.
if metrics != nil || scopes != nil {
if metrics == nil {
metrics = make([]string, 0, len(jd))
for k := range jd {
metrics = append(metrics, k)
}
}
res := schema.JobData{}
for _, metric := range metrics {
if perscope, ok := jd[metric]; ok {
if len(perscope) > 1 {
subset := make(map[schema.MetricScope]*schema.JobMetric)
for _, scope := range scopes {
if jm, ok := perscope[scope]; ok {
subset[scope] = jm
}
}
if len(subset) > 0 {
perscope = subset
}
}
res[metric] = perscope
}
}
jd = res
}
size = jd.Size()
}
ttl = 5 * time.Hour
if job.State == schema.JobStateRunning {
ttl = 2 * time.Minute
}
// Generate statistics series for jobs with many nodes to enable min/median/max graphs
// instead of overwhelming the UI with individual node lines. Note that newly calculated
// statistics use min/median/max, while archived statistics may use min/mean/max.
const maxSeriesSize int = 15
for _, scopes := range jd {
for _, jm := range scopes {
if jm.StatisticsSeries != nil || len(jm.Series) <= maxSeriesSize {
continue
}
jm.AddStatisticsSeries()
}
}
nodeScopeRequested := false
for _, scope := range scopes {
if scope == schema.MetricScopeNode {
nodeScopeRequested = true
}
}
if nodeScopeRequested {
jd.AddNodeScope("flops_any")
jd.AddNodeScope("mem_bw")
}
// Round Resulting Stat Values
jd.RoundMetricStats()
return jd, ttl, size
})
if err, ok := data.(error); ok {
cclog.Errorf("error in cached dataset for job %d: %s", job.JobID, err.Error())
return nil, err
}
return data.(schema.JobData), nil
}
// LoadAverages computes average values for the specified metrics across all nodes of a job.
// For running jobs, it loads statistics from the metric store. For completed jobs, it uses
// the pre-calculated averages from the job archive. The results are appended to the data slice.
func LoadAverages(
job *schema.Job,
metrics []string,
data [][]schema.Float,
ctx context.Context,
) error {
if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here?
}
stats, err := metricstore.LoadStats(job, metrics, ctx)
if err != nil {
cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
return err
}
for i, m := range metrics {
nodes, ok := stats[m]
if !ok {
data[i] = append(data[i], schema.NaN)
continue
}
sum := 0.0
for _, node := range nodes {
sum += node.Avg
}
data[i] = append(data[i], schema.Float(sum))
}
return nil
}
// LoadScopedJobStats retrieves job statistics organized by metric scope (node, socket, core, accelerator).
// For running jobs, statistics are computed from the metric store. For completed jobs, pre-calculated
// statistics are loaded from the job archive.
func LoadScopedJobStats(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
) (schema.ScopedJobStats, error) {
if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
return archive.LoadScopedStatsFromArchive(job, metrics, scopes)
}
scopedStats, err := metricstore.LoadScopedStats(job, metrics, scopes, ctx)
if err != nil {
cclog.Errorf("failed to load scoped statistics from metric store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
return nil, err
}
return scopedStats, nil
}
// LoadJobStats retrieves aggregated statistics (min/avg/max) for each requested metric across all job nodes.
// For running jobs, statistics are computed from the metric store. For completed jobs, pre-calculated
// statistics are loaded from the job archive.
func LoadJobStats(
job *schema.Job,
metrics []string,
ctx context.Context,
) (map[string]schema.MetricStatistics, error) {
if job.State != schema.JobStateRunning && !config.Keys.DisableArchive {
return archive.LoadStatsFromArchive(job, metrics)
}
data := make(map[string]schema.MetricStatistics, len(metrics))
stats, err := metricstore.LoadStats(job, metrics, ctx)
if err != nil {
cclog.Errorf("failed to load statistics from metric store for job %d (user: %s, project: %s): %s",
job.JobID, job.User, job.Project, err.Error())
return data, err
}
for _, m := range metrics {
sum, avg, min, max := 0.0, 0.0, 0.0, 0.0
nodes, ok := stats[m]
if !ok {
data[m] = schema.MetricStatistics{Min: min, Avg: avg, Max: max}
continue
}
for _, node := range nodes {
sum += node.Avg
min = math.Min(min, node.Min)
max = math.Max(max, node.Max)
}
data[m] = schema.MetricStatistics{
Avg: (math.Round((sum/float64(job.NumNodes))*100) / 100),
Min: (math.Round(min*100) / 100),
Max: (math.Round(max*100) / 100),
}
}
return data, nil
}
// LoadNodeData retrieves metric data for specific nodes in a cluster within a time range.
// This is used for node monitoring views and system status pages. Data is always fetched from
// the metric store (not the archive) since it's for current/recent node status monitoring.
//
// Returns a nested map structure: node -> metric -> scoped data.
func LoadNodeData(
cluster string,
metrics, nodes []string,
scopes []schema.MetricScope,
from, to time.Time,
ctx context.Context,
) (map[string]map[string][]*schema.JobMetric, error) {
if metrics == nil {
for _, m := range archive.GetCluster(cluster).MetricConfig {
metrics = append(metrics, m.Name)
}
}
data, err := metricstore.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
if err != nil {
if len(data) != 0 {
cclog.Warnf("partial error loading node data from metric store for cluster %s: %s", cluster, err.Error())
} else {
cclog.Errorf("failed to load node data from metric store for cluster %s: %s", cluster, err.Error())
return nil, err
}
}
if data == nil {
return nil, fmt.Errorf("metric store for cluster '%s' does not support node data queries", cluster)
}
return data, nil
}
// LoadNodeListData retrieves time-series metric data for multiple nodes within a time range,
// with optional resampling and automatic statistics generation for large datasets.
// This is used for comparing multiple nodes or displaying node status over time.
//
// Returns a map of node names to their job-like metric data structures.
func LoadNodeListData(
cluster, subCluster string,
nodes []string,
metrics []string,
scopes []schema.MetricScope,
resolution int,
from, to time.Time,
ctx context.Context,
) (map[string]schema.JobData, error) {
if metrics == nil {
for _, m := range archive.GetCluster(cluster).MetricConfig {
metrics = append(metrics, m.Name)
}
}
data, err := metricstore.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx)
if err != nil {
if len(data) != 0 {
cclog.Warnf("partial error loading node list data from metric store for cluster %s, subcluster %s: %s",
cluster, subCluster, err.Error())
} else {
cclog.Errorf("failed to load node list data from metric store for cluster %s, subcluster %s: %s",
cluster, subCluster, err.Error())
return nil, err
}
}
// Generate statistics series for datasets with many series to improve visualization performance.
// Statistics are calculated as min/median/max.
const maxSeriesSize int = 8
for _, jd := range data {
for _, scopes := range jd {
for _, jm := range scopes {
if jm.StatisticsSeries != nil || len(jm.Series) < maxSeriesSize {
continue
}
jm.AddStatisticsSeries()
}
}
}
if data == nil {
return nil, fmt.Errorf("metric store for cluster '%s' does not support node list queries", cluster)
}
return data, nil
}
// deepCopy creates a deep copy of JobData to prevent cache corruption when modifying
// archived data (e.g., during resampling). This ensures the cached archive data remains
// immutable while allowing per-request transformations.
func deepCopy(source schema.JobData) schema.JobData {
result := make(schema.JobData, len(source))
for metricName, scopeMap := range source {
result[metricName] = make(map[schema.MetricScope]*schema.JobMetric, len(scopeMap))
for scope, jobMetric := range scopeMap {
result[metricName][scope] = copyJobMetric(jobMetric)
}
}
return result
}
func copyJobMetric(src *schema.JobMetric) *schema.JobMetric {
dst := &schema.JobMetric{
Timestep: src.Timestep,
Unit: src.Unit,
Series: make([]schema.Series, len(src.Series)),
}
for i := range src.Series {
dst.Series[i] = copySeries(&src.Series[i])
}
if src.StatisticsSeries != nil {
dst.StatisticsSeries = copyStatisticsSeries(src.StatisticsSeries)
}
return dst
}
func copySeries(src *schema.Series) schema.Series {
dst := schema.Series{
Hostname: src.Hostname,
Id: src.Id,
Statistics: src.Statistics,
Data: make([]schema.Float, len(src.Data)),
}
copy(dst.Data, src.Data)
return dst
}
func copyStatisticsSeries(src *schema.StatsSeries) *schema.StatsSeries {
dst := &schema.StatsSeries{
Min: make([]schema.Float, len(src.Min)),
Mean: make([]schema.Float, len(src.Mean)),
Median: make([]schema.Float, len(src.Median)),
Max: make([]schema.Float, len(src.Max)),
}
copy(dst.Min, src.Min)
copy(dst.Mean, src.Mean)
copy(dst.Median, src.Median)
copy(dst.Max, src.Max)
if len(src.Percentiles) > 0 {
dst.Percentiles = make(map[int][]schema.Float, len(src.Percentiles))
for percentile, values := range src.Percentiles {
dst.Percentiles[percentile] = make([]schema.Float, len(values))
copy(dst.Percentiles[percentile], values)
}
}
return dst
}

View File

@@ -0,0 +1,125 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package metricdispatch
import (
"testing"
"github.com/ClusterCockpit/cc-lib/v2/schema"
)
func TestDeepCopy(t *testing.T) {
nodeId := "0"
original := schema.JobData{
"cpu_load": {
schema.MetricScopeNode: &schema.JobMetric{
Timestep: 60,
Unit: schema.Unit{Base: "load", Prefix: ""},
Series: []schema.Series{
{
Hostname: "node001",
Id: &nodeId,
Data: []schema.Float{1.0, 2.0, 3.0},
Statistics: schema.MetricStatistics{
Min: 1.0,
Avg: 2.0,
Max: 3.0,
},
},
},
StatisticsSeries: &schema.StatsSeries{
Min: []schema.Float{1.0, 1.5, 2.0},
Mean: []schema.Float{2.0, 2.5, 3.0},
Median: []schema.Float{2.0, 2.5, 3.0},
Max: []schema.Float{3.0, 3.5, 4.0},
Percentiles: map[int][]schema.Float{
25: {1.5, 2.0, 2.5},
75: {2.5, 3.0, 3.5},
},
},
},
},
}
copied := deepCopy(original)
original["cpu_load"][schema.MetricScopeNode].Series[0].Data[0] = 999.0
original["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Min[0] = 888.0
original["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Percentiles[25][0] = 777.0
if copied["cpu_load"][schema.MetricScopeNode].Series[0].Data[0] != 1.0 {
t.Errorf("Series data was not deeply copied: got %v, want 1.0",
copied["cpu_load"][schema.MetricScopeNode].Series[0].Data[0])
}
if copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Min[0] != 1.0 {
t.Errorf("StatisticsSeries was not deeply copied: got %v, want 1.0",
copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Min[0])
}
if copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Percentiles[25][0] != 1.5 {
t.Errorf("Percentiles was not deeply copied: got %v, want 1.5",
copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Percentiles[25][0])
}
if copied["cpu_load"][schema.MetricScopeNode].Timestep != 60 {
t.Errorf("Timestep not copied correctly: got %v, want 60",
copied["cpu_load"][schema.MetricScopeNode].Timestep)
}
if copied["cpu_load"][schema.MetricScopeNode].Series[0].Hostname != "node001" {
t.Errorf("Hostname not copied correctly: got %v, want node001",
copied["cpu_load"][schema.MetricScopeNode].Series[0].Hostname)
}
}
func TestDeepCopyNilStatisticsSeries(t *testing.T) {
original := schema.JobData{
"mem_used": {
schema.MetricScopeNode: &schema.JobMetric{
Timestep: 60,
Series: []schema.Series{
{
Hostname: "node001",
Data: []schema.Float{1.0, 2.0},
},
},
StatisticsSeries: nil,
},
},
}
copied := deepCopy(original)
if copied["mem_used"][schema.MetricScopeNode].StatisticsSeries != nil {
t.Errorf("StatisticsSeries should be nil, got %v",
copied["mem_used"][schema.MetricScopeNode].StatisticsSeries)
}
}
func TestDeepCopyEmptyPercentiles(t *testing.T) {
original := schema.JobData{
"cpu_load": {
schema.MetricScopeNode: &schema.JobMetric{
Timestep: 60,
Series: []schema.Series{},
StatisticsSeries: &schema.StatsSeries{
Min: []schema.Float{1.0},
Mean: []schema.Float{2.0},
Median: []schema.Float{2.0},
Max: []schema.Float{3.0},
Percentiles: nil,
},
},
},
}
copied := deepCopy(original)
if copied["cpu_load"][schema.MetricScopeNode].StatisticsSeries.Percentiles != nil {
t.Errorf("Percentiles should be nil when source is nil/empty")
}
}

View File

@@ -3,10 +3,11 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package memorystore
package metricstore
import (
"errors"
"fmt"
"math"
"github.com/ClusterCockpit/cc-lib/v2/schema"
@@ -124,6 +125,9 @@ func FetchData(req APIQueryRequest) (*APIQueryResponse, error) {
req.WithData = true
ms := GetMemoryStore()
if ms == nil {
return nil, fmt.Errorf("memorystore not initialized")
}
response := APIQueryResponse{
Results: make([][]APIMetricData, 0, len(req.Queries)),

View File

@@ -3,7 +3,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package memorystore
package metricstore
import (
"archive/zip"

View File

@@ -3,7 +3,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package memorystore
package metricstore
import (
"bufio"
@@ -24,8 +24,10 @@ import (
"github.com/linkedin/goavro/v2"
)
var NumAvroWorkers int = DefaultAvroWorkers
var startUp bool = true
var (
NumAvroWorkers int = DefaultAvroWorkers
startUp bool = true
)
func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) {
levels := make([]*AvroLevel, 0)

View File

@@ -3,7 +3,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package memorystore
package metricstore
import (
"context"

View File

@@ -3,7 +3,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package memorystore
package metricstore
import (
"sync"

View File

@@ -3,7 +3,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package memorystore
package metricstore
import (
"errors"

View File

@@ -3,7 +3,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package memorystore
package metricstore
import (
"bufio"

View File

@@ -3,7 +3,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package memorystore
package metricstore
import (
"fmt"
@@ -19,8 +19,6 @@ const (
DefaultAvroCheckpointInterval = time.Minute
)
var InternalCCMSFlag bool = false
type MetricStoreConfig struct {
// Number of concurrent workers for checkpoint and archive operations.
// If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10)
@@ -35,8 +33,8 @@ type MetricStoreConfig struct {
DumpToFile string `json:"dump-to-file"`
EnableGops bool `json:"gops"`
} `json:"debug"`
RetentionInMemory string `json:"retention-in-memory"`
Archive struct {
RetentionInMemory string `json:"retention-in-memory"`
Archive struct {
Interval string `json:"interval"`
RootDir string `json:"directory"`
DeleteInstead bool `json:"delete-instead"`

View File

@@ -3,7 +3,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package memorystore
package metricstore
const configSchema = `{
"type": "object",

View File

@@ -3,7 +3,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package memorystore
package metricstore
import (
"bufio"

View File

@@ -3,7 +3,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package memorystore
package metricstore
import (
"bufio"

View File

@@ -3,7 +3,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package memorystore
package metricstore
import (
"sync"

View File

@@ -3,7 +3,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package memorystore
package metricstore
import (
"context"

View File

@@ -3,7 +3,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
// Package memorystore provides an efficient in-memory time-series metric storage system
// Package metricstore provides an efficient in-memory time-series metric storage system
// with support for hierarchical data organization, checkpointing, and archiving.
//
// The package organizes metrics in a tree structure (cluster → host → component) and
@@ -17,7 +17,7 @@
// - Concurrent checkpoint/archive workers
// - Support for sum and average aggregation
// - NATS integration for metric ingestion
package memorystore
package metricstore
import (
"bytes"
@@ -208,15 +208,6 @@ func Shutdown() {
cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files)
}
func getName(m *MemoryStore, i int) string {
for key, val := range m.Metrics {
if val.offset == i {
return key
}
}
return ""
}
func Retention(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore()
@@ -245,7 +236,7 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
t := time.Now().Add(-d)
cclog.Infof("[METRICSTORE]> start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
freed, err := ms.Free(nil, t.Unix())
if err != nil {
if err != nil {
cclog.Errorf("[METRICSTORE]> freeing up buffers failed: %s\n", err.Error())
} else {
cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freed)

View File

@@ -3,7 +3,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package memorystore
package metricstore
import (
"testing"
@@ -131,7 +131,7 @@ func TestBufferWrite(t *testing.T) {
func TestBufferRead(t *testing.T) {
b := newBuffer(100, 10)
// Write some test data
b.write(100, schema.Float(1.0))
b.write(110, schema.Float(2.0))

View File

@@ -3,56 +3,41 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package metricdata
package metricstore
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/ClusterCockpit/cc-backend/internal/memorystore"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
)
// Bloat Code
type CCMetricStoreConfigInternal struct {
Kind string `json:"kind"`
Url string `json:"url"`
Token string `json:"token"`
// TestLoadDataCallback allows tests to override LoadData behavior
var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error)
// If metrics are known to this MetricDataRepository under a different
// name than in the `metricConfig` section of the 'cluster.json',
// provide this optional mapping of local to remote name for this metric.
Renamings map[string]string `json:"metricRenamings"`
}
// Bloat Code
type CCMetricStoreInternal struct{}
// Bloat Code
func (ccms *CCMetricStoreInternal) Init(rawConfig json.RawMessage) error {
return nil
}
func (ccms *CCMetricStoreInternal) LoadData(
func LoadData(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
resolution int,
) (schema.JobData, error) {
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, int64(resolution))
if TestLoadDataCallback != nil {
return TestLoadDataCallback(job, metrics, scopes, ctx, resolution)
}
queries, assignedScope, err := buildQueries(job, metrics, scopes, int64(resolution))
if err != nil {
cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error())
return nil, err
}
req := memorystore.APIQueryRequest{
req := APIQueryRequest{
Cluster: job.Cluster,
From: job.StartTime,
To: job.StartTime + int64(job.Duration),
@@ -61,7 +46,7 @@ func (ccms *CCMetricStoreInternal) LoadData(
WithData: true,
}
resBody, err := memorystore.FetchData(req)
resBody, err := FetchData(req)
if err != nil {
cclog.Errorf("Error while fetching data : %s", err.Error())
return nil, err
@@ -149,13 +134,13 @@ var (
acceleratorString = string(schema.MetricScopeAccelerator)
)
func (ccms *CCMetricStoreInternal) buildQueries(
func buildQueries(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
resolution int64,
) ([]memorystore.APIQuery, []schema.MetricScope, error) {
queries := make([]memorystore.APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
) ([]APIQuery, []schema.MetricScope, error) {
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
assignedScope := []schema.MetricScope{}
subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster)
@@ -217,7 +202,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
continue
}
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: false,
@@ -235,7 +220,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
continue
}
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: true,
@@ -249,7 +234,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// HWThread -> HWThead
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: false,
@@ -265,7 +250,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
for _, core := range cores {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: true,
@@ -282,7 +267,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
for _, socket := range sockets {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: true,
@@ -297,7 +282,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// HWThread -> Node
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: true,
@@ -312,7 +297,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// Core -> Core
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: false,
@@ -328,7 +313,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromCores(hwthreads)
for _, socket := range sockets {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: true,
@@ -344,7 +329,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// Core -> Node
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode {
cores, _ := topology.GetCoresFromHWThreads(hwthreads)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: true,
@@ -359,7 +344,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// MemoryDomain -> MemoryDomain
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain {
sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: false,
@@ -374,7 +359,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// MemoryDoman -> Node
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode {
sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: true,
@@ -389,7 +374,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// Socket -> Socket
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: false,
@@ -404,7 +389,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// Socket -> Node
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode {
sockets, _ := topology.GetSocketsFromHWThreads(hwthreads)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Aggregate: true,
@@ -418,7 +403,7 @@ func (ccms *CCMetricStoreInternal) buildQueries(
// Node -> Node
if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: host.Hostname,
Resolution: resolution,
@@ -435,18 +420,18 @@ func (ccms *CCMetricStoreInternal) buildQueries(
return queries, assignedScope, nil
}
func (ccms *CCMetricStoreInternal) LoadStats(
func LoadStats(
job *schema.Job,
metrics []string,
ctx context.Context,
) (map[string]map[string]schema.MetricStatistics, error) {
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization?
queries, _, err := buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization?
if err != nil {
cclog.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error())
return nil, err
}
req := memorystore.APIQueryRequest{
req := APIQueryRequest{
Cluster: job.Cluster,
From: job.StartTime,
To: job.StartTime + int64(job.Duration),
@@ -455,7 +440,7 @@ func (ccms *CCMetricStoreInternal) LoadStats(
WithData: false,
}
resBody, err := memorystore.FetchData(req)
resBody, err := FetchData(req)
if err != nil {
cclog.Errorf("Error while fetching data : %s", err.Error())
return nil, err
@@ -492,20 +477,19 @@ func (ccms *CCMetricStoreInternal) LoadStats(
return stats, nil
}
// Used for Job-View Statistics Table
func (ccms *CCMetricStoreInternal) LoadScopedStats(
func LoadScopedStats(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
) (schema.ScopedJobStats, error) {
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, 0)
queries, assignedScope, err := buildQueries(job, metrics, scopes, 0)
if err != nil {
cclog.Errorf("Error while building queries for jobId %d, Metrics %v, Scopes %v: %s", job.JobID, metrics, scopes, err.Error())
return nil, err
}
req := memorystore.APIQueryRequest{
req := APIQueryRequest{
Cluster: job.Cluster,
From: job.StartTime,
To: job.StartTime + int64(job.Duration),
@@ -514,7 +498,7 @@ func (ccms *CCMetricStoreInternal) LoadScopedStats(
WithData: false,
}
resBody, err := memorystore.FetchData(req)
resBody, err := FetchData(req)
if err != nil {
cclog.Errorf("Error while fetching data : %s", err.Error())
return nil, err
@@ -583,15 +567,14 @@ func (ccms *CCMetricStoreInternal) LoadScopedStats(
return scopedJobStats, nil
}
// Used for Systems-View Node-Overview
func (ccms *CCMetricStoreInternal) LoadNodeData(
func LoadNodeData(
cluster string,
metrics, nodes []string,
scopes []schema.MetricScope,
from, to time.Time,
ctx context.Context,
) (map[string]map[string][]*schema.JobMetric, error) {
req := memorystore.APIQueryRequest{
req := APIQueryRequest{
Cluster: cluster,
From: from.Unix(),
To: to.Unix(),
@@ -604,7 +587,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData(
} else {
for _, node := range nodes {
for _, metric := range metrics {
req.Queries = append(req.Queries, memorystore.APIQuery{
req.Queries = append(req.Queries, APIQuery{
Hostname: node,
Metric: metric,
Resolution: 0, // Default for Node Queries: Will return metric $Timestep Resolution
@@ -613,7 +596,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData(
}
}
resBody, err := memorystore.FetchData(req)
resBody, err := FetchData(req)
if err != nil {
cclog.Errorf("Error while fetching data : %s", err.Error())
return nil, err
@@ -622,7 +605,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData(
var errors []string
data := make(map[string]map[string][]*schema.JobMetric)
for i, res := range resBody.Results {
var query memorystore.APIQuery
var query APIQuery
if resBody.Queries != nil {
query = resBody.Queries[i]
} else {
@@ -673,8 +656,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeData(
return data, nil
}
// Used for Systems-View Node-List
func (ccms *CCMetricStoreInternal) LoadNodeListData(
func LoadNodeListData(
cluster, subCluster string,
nodes []string,
metrics []string,
@@ -683,15 +665,14 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData(
from, to time.Time,
ctx context.Context,
) (map[string]schema.JobData, error) {
// Note: Order of node data is not guaranteed after this point
queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, int64(resolution))
queries, assignedScope, err := buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, int64(resolution))
if err != nil {
cclog.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error())
return nil, err
}
req := memorystore.APIQueryRequest{
req := APIQueryRequest{
Cluster: cluster,
Queries: queries,
From: from.Unix(),
@@ -700,7 +681,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData(
WithData: true,
}
resBody, err := memorystore.FetchData(req)
resBody, err := FetchData(req)
if err != nil {
cclog.Errorf("Error while fetching data : %s", err.Error())
return nil, err
@@ -709,7 +690,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData(
var errors []string
data := make(map[string]schema.JobData)
for i, row := range resBody.Results {
var query memorystore.APIQuery
var query APIQuery
if resBody.Queries != nil {
query = resBody.Queries[i]
} else {
@@ -789,15 +770,15 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData(
return data, nil
}
func (ccms *CCMetricStoreInternal) buildNodeQueries(
func buildNodeQueries(
cluster string,
subCluster string,
nodes []string,
metrics []string,
scopes []schema.MetricScope,
resolution int64,
) ([]memorystore.APIQuery, []schema.MetricScope, error) {
queries := make([]memorystore.APIQuery, 0, len(metrics)*len(scopes)*len(nodes))
) ([]APIQuery, []schema.MetricScope, error) {
queries := make([]APIQuery, 0, len(metrics)*len(scopes)*len(nodes))
assignedScope := []schema.MetricScope{}
// Get Topol before loop if subCluster given
@@ -812,7 +793,6 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
}
for _, metric := range metrics {
metric := metric
mc := archive.GetMetricConfig(cluster, metric)
if mc == nil {
// return nil, fmt.Errorf("METRICDATA/CCMS > metric '%s' is not specified for cluster '%s'", metric, cluster)
@@ -880,7 +860,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
continue
}
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: false,
@@ -898,7 +878,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
continue
}
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
@@ -912,7 +892,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// HWThread -> HWThead
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: false,
@@ -928,7 +908,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(topology.Node)
for _, core := range cores {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
@@ -945,7 +925,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(topology.Node)
for _, socket := range sockets {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
@@ -960,7 +940,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// HWThread -> Node
if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
@@ -975,7 +955,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// Core -> Core
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore {
cores, _ := topology.GetCoresFromHWThreads(topology.Node)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: false,
@@ -991,7 +971,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromCores(topology.Node)
for _, socket := range sockets {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
@@ -1007,7 +987,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// Core -> Node
if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode {
cores, _ := topology.GetCoresFromHWThreads(topology.Node)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
@@ -1022,7 +1002,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// MemoryDomain -> MemoryDomain
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain {
sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: false,
@@ -1037,7 +1017,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// MemoryDoman -> Node
if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode {
sockets, _ := topology.GetMemoryDomainsFromHWThreads(topology.Node)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
@@ -1052,7 +1032,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// Socket -> Socket
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket {
sockets, _ := topology.GetSocketsFromHWThreads(topology.Node)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: false,
@@ -1067,7 +1047,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// Socket -> Node
if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode {
sockets, _ := topology.GetSocketsFromHWThreads(topology.Node)
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Aggregate: true,
@@ -1081,7 +1061,7 @@ func (ccms *CCMetricStoreInternal) buildNodeQueries(
// Node -> Node
if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode {
queries = append(queries, memorystore.APIQuery{
queries = append(queries, APIQuery{
Metric: metric,
Hostname: hostname,
Resolution: resolution,

View File

@@ -3,7 +3,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package memorystore
package metricstore
import (
"errors"

View File

@@ -561,7 +561,6 @@ func (r *NodeRepository) GetNodesForList(
nodeFilter string,
page *model.PageRequest,
) ([]string, map[string]string, int, bool, error) {
// Init Return Vars
nodes := make([]string, 0)
stateMap := make(map[string]string)

View File

@@ -144,11 +144,7 @@ func nodeTestSetup(t *testing.T) {
// Load and check main configuration
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {
if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil {
config.Init(cfg, clustercfg)
} else {
cclog.Abort("Cluster configuration must be present")
}
config.Init(cfg)
} else {
cclog.Abort("Main configuration must be present")
}

View File

@@ -12,7 +12,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
@@ -766,7 +766,7 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram(
continue
}
if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil {
if err := metricdispatch.LoadAverages(job, metrics, avgs, ctx); err != nil {
cclog.Errorf("Error while loading averages for histogram: %s", err)
return nil
}

View File

@@ -58,11 +58,7 @@ func setupUserTest(t *testing.T) *UserCfgRepo {
// Load and check main configuration
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {
if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil {
config.Init(cfg, clustercfg)
} else {
t.Fatal("Cluster configuration must be present")
}
config.Init(cfg)
} else {
t.Fatal("Main configuration must be present")
}

View File

@@ -1 +0,0 @@
alf

View File

@@ -1,7 +0,0 @@
calc_rate
qmdffgen
dynamic
evbopt
explore
black_box
poly_qmdff

View File

@@ -1,3 +0,0 @@
chroma
qdp
qmp

View File

@@ -1 +0,0 @@
cp2k

View File

@@ -1 +0,0 @@
cpmd

View File

@@ -1 +0,0 @@
flame

View File

@@ -1,3 +0,0 @@
gromacs
gmx
mdrun

View File

@@ -1 +0,0 @@
julia

View File

@@ -1 +0,0 @@
lmp

View File

@@ -1 +0,0 @@
matlab

View File

@@ -1 +0,0 @@
openfoam

View File

@@ -1 +0,0 @@
orca

View File

@@ -1,4 +0,0 @@
python
pip
anaconda
conda

View File

@@ -1,2 +0,0 @@
starccm+
-podkey

View File

@@ -1,10 +0,0 @@
dscf
grad
ridft
rdgrad
ricc2
statpt
aoforce
escf
egrad
odft

View File

@@ -1,2 +0,0 @@
vasp
VASP

View File

@@ -2,15 +2,16 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package tagger
import (
"bytes"
"embed"
"encoding/json"
"fmt"
"maps"
"os"
"path/filepath"
"strings"
"text/template"
@@ -23,8 +24,16 @@ import (
"github.com/expr-lang/expr/vm"
)
//go:embed jobclasses/*
var jobClassFiles embed.FS
const (
// defaultJobClassConfigPath is the default path for job classification configuration
defaultJobClassConfigPath = "./var/tagger/jobclasses"
// tagTypeJobClass is the tag type identifier for job classification tags
tagTypeJobClass = "jobClass"
// jobClassConfigDirMatch is the directory name used for matching filesystem events
jobClassConfigDirMatch = "jobclasses"
// parametersFileName is the name of the parameters configuration file
parametersFileName = "parameters.json"
)
// Variable defines a named expression that can be computed and reused in rules.
// Variables are evaluated before the main rule and their results are added to the environment.
@@ -45,21 +54,21 @@ type ruleVariable struct {
// and the final rule expression that determines if the job matches the classification.
type RuleFormat struct {
// Name is a human-readable description of the rule
Name string `json:"name"`
Name string `json:"name"`
// Tag is the classification tag to apply if the rule matches
Tag string `json:"tag"`
Tag string `json:"tag"`
// Parameters are shared values referenced in the rule (e.g., thresholds)
Parameters []string `json:"parameters"`
Parameters []string `json:"parameters"`
// Metrics are the job metrics required for this rule (e.g., "cpu_load", "mem_used")
Metrics []string `json:"metrics"`
Metrics []string `json:"metrics"`
// Requirements are boolean expressions that must be true for the rule to apply
Requirements []string `json:"requirements"`
Requirements []string `json:"requirements"`
// Variables are computed values used in the rule expression
Variables []Variable `json:"variables"`
Variables []Variable `json:"variables"`
// Rule is the boolean expression that determines if the job matches
Rule string `json:"rule"`
Rule string `json:"rule"`
// Hint is a template string that generates a message when the rule matches
Hint string `json:"hint"`
Hint string `json:"hint"`
}
type ruleInfo struct {
@@ -75,29 +84,29 @@ type ruleInfo struct {
// This interface allows for easier testing and decoupling from the concrete repository implementation.
type JobRepository interface {
// HasTag checks if a job already has a specific tag
HasTag(jobId int64, tagType string, tagName string) bool
HasTag(jobID int64, tagType string, tagName string) bool
// AddTagOrCreateDirect adds a tag to a job or creates it if it doesn't exist
AddTagOrCreateDirect(jobId int64, tagType string, tagName string) (tagId int64, err error)
AddTagOrCreateDirect(jobID int64, tagType string, tagName string) (tagID int64, err error)
// UpdateMetadata updates job metadata with a key-value pair
UpdateMetadata(job *schema.Job, key, val string) (err error)
}
// JobClassTagger classifies jobs based on configurable rules that evaluate job metrics and properties.
// Rules are loaded from embedded JSON files and can be dynamically reloaded from a watched directory.
// Rules are loaded from an external configuration directory and can be dynamically reloaded when files change.
// When a job matches a rule, it is tagged with the corresponding classification and an optional hint message.
type JobClassTagger struct {
// rules maps classification tags to their compiled rule information
rules map[string]ruleInfo
rules map[string]ruleInfo
// parameters are shared values (e.g., thresholds) used across multiple rules
parameters map[string]any
parameters map[string]any
// tagType is the type of tag ("jobClass")
tagType string
tagType string
// cfgPath is the path to watch for configuration changes
cfgPath string
cfgPath string
// repo provides access to job database operations
repo JobRepository
repo JobRepository
// getStatistics retrieves job statistics for analysis
getStatistics func(job *schema.Job) (map[string]schema.JobStatistics, error)
getStatistics func(job *schema.Job) (map[string]schema.JobStatistics, error)
// getMetricConfig retrieves metric configuration (limits) for a cluster
getMetricConfig func(cluster, subCluster string) map[string]*schema.Metric
}
@@ -169,7 +178,7 @@ func (t *JobClassTagger) prepareRule(b []byte, fns string) {
// EventMatch checks if a filesystem event should trigger configuration reload.
// It returns true if the event path contains "jobclasses".
func (t *JobClassTagger) EventMatch(s string) bool {
return strings.Contains(s, "jobclasses")
return strings.Contains(s, jobClassConfigDirMatch)
}
// EventCallback is triggered when the configuration directory changes.
@@ -181,9 +190,10 @@ func (t *JobClassTagger) EventCallback() {
cclog.Fatal(err)
}
if util.CheckFileExists(t.cfgPath + "/parameters.json") {
parametersFile := filepath.Join(t.cfgPath, parametersFileName)
if util.CheckFileExists(parametersFile) {
cclog.Info("Merge parameters")
b, err := os.ReadFile(t.cfgPath + "/parameters.json")
b, err := os.ReadFile(parametersFile)
if err != nil {
cclog.Warnf("prepareRule() > open file error: %v", err)
}
@@ -198,13 +208,13 @@ func (t *JobClassTagger) EventCallback() {
for _, fn := range files {
fns := fn.Name()
if fns != "parameters.json" {
if fns != parametersFileName {
cclog.Debugf("Process: %s", fns)
filename := fmt.Sprintf("%s/%s", t.cfgPath, fns)
filename := filepath.Join(t.cfgPath, fns)
b, err := os.ReadFile(filename)
if err != nil {
cclog.Warnf("prepareRule() > open file error: %v", err)
return
continue
}
t.prepareRule(b, fns)
}
@@ -213,7 +223,8 @@ func (t *JobClassTagger) EventCallback() {
func (t *JobClassTagger) initParameters() error {
cclog.Info("Initialize parameters")
b, err := jobClassFiles.ReadFile("jobclasses/parameters.json")
parametersFile := filepath.Join(t.cfgPath, parametersFileName)
b, err := os.ReadFile(parametersFile)
if err != nil {
cclog.Warnf("prepareRule() > open file error: %v", err)
return err
@@ -227,13 +238,20 @@ func (t *JobClassTagger) initParameters() error {
return nil
}
// Register initializes the JobClassTagger by loading parameters and classification rules.
// It loads embedded configuration files and sets up a file watch on ./var/tagger/jobclasses
// if it exists, allowing for dynamic configuration updates without restarting the application.
// Returns an error if the embedded configuration files cannot be read or parsed.
// Register initializes the JobClassTagger by loading parameters and classification rules from external folder.
// It sets up a file watch on ./var/tagger/jobclasses if it exists, allowing for
// dynamic configuration updates without restarting the application.
// Returns an error if the configuration path does not exist or cannot be read.
func (t *JobClassTagger) Register() error {
t.cfgPath = "./var/tagger/jobclasses"
t.tagType = "jobClass"
if t.cfgPath == "" {
t.cfgPath = defaultJobClassConfigPath
}
t.tagType = tagTypeJobClass
t.rules = make(map[string]ruleInfo)
if !util.CheckFileExists(t.cfgPath) {
return fmt.Errorf("configuration path does not exist: %s", t.cfgPath)
}
err := t.initParameters()
if err != nil {
@@ -241,31 +259,28 @@ func (t *JobClassTagger) Register() error {
return err
}
files, err := jobClassFiles.ReadDir("jobclasses")
files, err := os.ReadDir(t.cfgPath)
if err != nil {
return fmt.Errorf("error reading app folder: %#v", err)
return fmt.Errorf("error reading jobclasses folder: %#v", err)
}
t.rules = make(map[string]ruleInfo)
for _, fn := range files {
fns := fn.Name()
if fns != "parameters.json" {
filename := fmt.Sprintf("jobclasses/%s", fns)
if fns != parametersFileName {
cclog.Infof("Process: %s", fns)
filename := filepath.Join(t.cfgPath, fns)
b, err := jobClassFiles.ReadFile(filename)
b, err := os.ReadFile(filename)
if err != nil {
cclog.Warnf("prepareRule() > open file error: %v", err)
return err
continue
}
t.prepareRule(b, fns)
}
}
if util.CheckFileExists(t.cfgPath) {
t.EventCallback()
cclog.Infof("Setup file watch for %s", t.cfgPath)
util.AddListener(t.cfgPath, t)
}
cclog.Infof("Setup file watch for %s", t.cfgPath)
util.AddListener(t.cfgPath, t)
t.repo = repository.GetJobRepository()
t.getStatistics = archive.GetStatistics

View File

@@ -13,13 +13,13 @@ type MockJobRepository struct {
mock.Mock
}
func (m *MockJobRepository) HasTag(jobId int64, tagType string, tagName string) bool {
args := m.Called(jobId, tagType, tagName)
func (m *MockJobRepository) HasTag(jobID int64, tagType string, tagName string) bool {
args := m.Called(jobID, tagType, tagName)
return args.Bool(0)
}
func (m *MockJobRepository) AddTagOrCreateDirect(jobId int64, tagType string, tagName string) (tagId int64, err error) {
args := m.Called(jobId, tagType, tagName)
func (m *MockJobRepository) AddTagOrCreateDirect(jobID int64, tagType string, tagName string) (tagID int64, err error) {
args := m.Called(jobID, tagType, tagName)
return args.Get(0).(int64), args.Error(1)
}

View File

@@ -7,9 +7,7 @@ package tagger
import (
"bufio"
"embed"
"fmt"
"io/fs"
"os"
"path/filepath"
"regexp"
@@ -21,8 +19,14 @@ import (
"github.com/ClusterCockpit/cc-lib/v2/util"
)
//go:embed apps/*
var appFiles embed.FS
const (
// defaultConfigPath is the default path for application tagging configuration
defaultConfigPath = "./var/tagger/apps"
// tagTypeApp is the tag type identifier for application tags
tagTypeApp = "app"
// configDirMatch is the directory name used for matching filesystem events
configDirMatch = "apps"
)
type appInfo struct {
tag string
@@ -30,19 +34,19 @@ type appInfo struct {
}
// AppTagger detects applications by matching patterns in job scripts.
// It loads application patterns from embedded files and can dynamically reload
// configuration from a watched directory. When a job script matches a pattern,
// It loads application patterns from an external configuration directory and can dynamically reload
// configuration when files change. When a job script matches a pattern,
// the corresponding application tag is automatically applied.
type AppTagger struct {
// apps maps application tags to their matching patterns
apps map[string]appInfo
apps map[string]appInfo
// tagType is the type of tag ("app")
tagType string
// cfgPath is the path to watch for configuration changes
cfgPath string
}
func (t *AppTagger) scanApp(f fs.File, fns string) {
func (t *AppTagger) scanApp(f *os.File, fns string) {
scanner := bufio.NewScanner(f)
ai := appInfo{tag: strings.TrimSuffix(fns, filepath.Ext(fns)), strings: make([]string, 0)}
@@ -56,7 +60,7 @@ func (t *AppTagger) scanApp(f fs.File, fns string) {
// EventMatch checks if a filesystem event should trigger configuration reload.
// It returns true if the event path contains "apps".
func (t *AppTagger) EventMatch(s string) bool {
return strings.Contains(s, "apps")
return strings.Contains(s, configDirMatch)
}
// EventCallback is triggered when the configuration directory changes.
@@ -71,43 +75,50 @@ func (t *AppTagger) EventCallback() {
for _, fn := range files {
fns := fn.Name()
cclog.Debugf("Process: %s", fns)
f, err := os.Open(fmt.Sprintf("%s/%s", t.cfgPath, fns))
f, err := os.Open(filepath.Join(t.cfgPath, fns))
if err != nil {
cclog.Errorf("error opening app file %s: %#v", fns, err)
continue
}
t.scanApp(f, fns)
f.Close()
}
}
// Register initializes the AppTagger by loading application patterns from embedded files.
// It also sets up a file watch on ./var/tagger/apps if it exists, allowing for
// Register initializes the AppTagger by loading application patterns from external folder.
// It sets up a file watch on ./var/tagger/apps if it exists, allowing for
// dynamic configuration updates without restarting the application.
// Returns an error if the embedded application files cannot be read.
// Returns an error if the configuration path does not exist or cannot be read.
func (t *AppTagger) Register() error {
t.cfgPath = "./var/tagger/apps"
t.tagType = "app"
if t.cfgPath == "" {
t.cfgPath = defaultConfigPath
}
t.tagType = tagTypeApp
t.apps = make(map[string]appInfo, 0)
files, err := appFiles.ReadDir("apps")
if !util.CheckFileExists(t.cfgPath) {
return fmt.Errorf("configuration path does not exist: %s", t.cfgPath)
}
files, err := os.ReadDir(t.cfgPath)
if err != nil {
return fmt.Errorf("error reading app folder: %#v", err)
}
t.apps = make(map[string]appInfo, 0)
for _, fn := range files {
fns := fn.Name()
cclog.Debugf("Process: %s", fns)
f, err := appFiles.Open(fmt.Sprintf("apps/%s", fns))
f, err := os.Open(filepath.Join(t.cfgPath, fns))
if err != nil {
return fmt.Errorf("error opening app file %s: %#v", fns, err)
cclog.Errorf("error opening app file %s: %#v", fns, err)
continue
}
defer f.Close()
t.scanApp(f, fns)
f.Close()
}
if util.CheckFileExists(t.cfgPath) {
t.EventCallback()
cclog.Infof("Setup file watch for %s", t.cfgPath)
util.AddListener(t.cfgPath, t)
}
cclog.Infof("Setup file watch for %s", t.cfgPath)
util.AddListener(t.cfgPath, t)
return nil
}

View File

@@ -5,6 +5,8 @@
package tagger
import (
"os"
"path/filepath"
"testing"
"github.com/ClusterCockpit/cc-backend/internal/repository"
@@ -29,28 +31,88 @@ func noErr(tb testing.TB, err error) {
}
}
func TestRegister(t *testing.T) {
var tagger AppTagger
func setupAppTaggerTestDir(t *testing.T) string {
t.Helper()
err := tagger.Register()
testDir := t.TempDir()
appsDir := filepath.Join(testDir, "apps")
err := os.MkdirAll(appsDir, 0o755)
noErr(t, err)
srcDir := "../../configs/tagger/apps"
files, err := os.ReadDir(srcDir)
noErr(t, err)
for _, file := range files {
if file.IsDir() {
continue
}
srcPath := filepath.Join(srcDir, file.Name())
dstPath := filepath.Join(appsDir, file.Name())
data, err := os.ReadFile(srcPath)
noErr(t, err)
err = os.WriteFile(dstPath, data, 0o644)
noErr(t, err)
}
return appsDir
}
func TestRegister(t *testing.T) {
appsDir := setupAppTaggerTestDir(t)
var tagger AppTagger
tagger.cfgPath = appsDir
tagger.tagType = tagTypeApp
tagger.apps = make(map[string]appInfo, 0)
files, err := os.ReadDir(appsDir)
noErr(t, err)
for _, fn := range files {
if fn.IsDir() {
continue
}
fns := fn.Name()
f, err := os.Open(filepath.Join(appsDir, fns))
noErr(t, err)
tagger.scanApp(f, fns)
f.Close()
}
if len(tagger.apps) != 16 {
t.Errorf("wrong summary for diagnostic \ngot: %d \nwant: 16", len(tagger.apps))
}
}
func TestMatch(t *testing.T) {
appsDir := setupAppTaggerTestDir(t)
r := setup(t)
job, err := r.FindByIDDirect(317)
noErr(t, err)
var tagger AppTagger
tagger.cfgPath = appsDir
tagger.tagType = tagTypeApp
tagger.apps = make(map[string]appInfo, 0)
err = tagger.Register()
files, err := os.ReadDir(appsDir)
noErr(t, err)
for _, fn := range files {
if fn.IsDir() {
continue
}
fns := fn.Name()
f, err := os.Open(filepath.Join(appsDir, fns))
noErr(t, err)
tagger.scanApp(f, fns)
f.Close()
}
tagger.Match(job)
if !r.HasTag(317, "app", "vasp") {

View File

@@ -1,26 +0,0 @@
{
"name": "Excessive CPU load",
"tag": "excessiveload",
"parameters": [
"excessivecpuload_threshold_factor",
"job_min_duration_seconds",
"sampling_interval_seconds"
],
"metrics": ["cpu_load"],
"requirements": [
"job.shared == \"none\"",
"job.duration > job_min_duration_seconds"
],
"variables": [
{
"name": "load_threshold",
"expr": "cpu_load.limits.peak * excessivecpuload_threshold_factor"
},
{
"name": "load_perc",
"expr": "1.0 - (cpu_load.avg / cpu_load.limits.peak)"
}
],
"rule": "cpu_load.avg > load_threshold",
"hint": "This job was detected as excessiveload because the average cpu load {{.cpu_load.avg}} falls above the threshold {{.load_threshold}}."
}

View File

@@ -1,22 +0,0 @@
{
"name": "Low ressource utilization",
"tag": "lowutilization",
"parameters": ["job_min_duration_seconds"],
"metrics": ["flops_any", "mem_bw"],
"requirements": [
"job.shared == \"none\"",
"job.duration > job_min_duration_seconds"
],
"variables": [
{
"name": "mem_bw_perc",
"expr": "1.0 - (mem_bw.avg / mem_bw.limits.peak)"
},
{
"name": "flops_any_perc",
"expr": "1.0 - (flops_any.avg / flops_any.limits.peak)"
}
],
"rule": "flops_any.avg < flops_any.limits.alert && mem_bw.avg < mem_bw.limits.alert",
"hint": "This job was detected as low utilization because the average flop rate {{.flops_any.avg}} falls below the threshold {{.flops_any.limits.alert}}."
}

View File

@@ -1,26 +0,0 @@
{
"name": "Low CPU load",
"tag": "lowload",
"parameters": [
"lowcpuload_threshold_factor",
"job_min_duration_seconds",
"sampling_interval_seconds"
],
"metrics": ["cpu_load"],
"requirements": [
"job.shared == \"none\"",
"job.duration > job_min_duration_seconds"
],
"variables": [
{
"name": "load_threshold",
"expr": "job.numCores * lowcpuload_threshold_factor"
},
{
"name": "load_perc",
"expr": "1.0 - (cpu_load.avg / cpu_load.limits.peak)"
}
],
"rule": "cpu_load.avg < cpu_load.limits.caution",
"hint": "This job was detected as lowload because the average cpu load {{.cpu_load}} falls below the threshold {{.cpu_load.limits.caution}}."
}

View File

@@ -1,14 +0,0 @@
{
"lowcpuload_threshold_factor": 0.9,
"excessivecpuload_threshold_factor": 1.1,
"highmemoryusage_threshold_factor": 0.9,
"node_load_imbalance_threshold_factor": 0.1,
"core_load_imbalance_threshold_factor": 0.1,
"high_memory_load_threshold_factor": 0.9,
"lowgpuload_threshold_factor": 0.7,
"memory_leak_slope_threshold": 0.1,
"job_min_duration_seconds": 600.0,
"sampling_interval_seconds": 30.0,
"cpu_load_pre_cutoff_samples": 11.0,
"cpu_load_core_pre_cutoff_samples": 6.0
}

View File

@@ -10,7 +10,7 @@ import (
"math"
"time"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/metricstore"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
@@ -58,12 +58,6 @@ func RegisterFootprintWorker() {
allMetrics = append(allMetrics, mc.Name)
}
repo, err := metricdata.GetMetricDataRepo(cluster.Name)
if err != nil {
cclog.Errorf("no metric data repository configured for '%s'", cluster.Name)
continue
}
pendingStatements := []sq.UpdateBuilder{}
for _, job := range jobs {
@@ -72,7 +66,7 @@ func RegisterFootprintWorker() {
sJob := time.Now()
jobStats, err := repo.LoadStats(job, allMetrics, context.Background())
jobStats, err := metricstore.LoadStats(job, allMetrics, context.Background())
if err != nil {
cclog.Errorf("error wile loading job data stats for footprint update: %v", err)
ce++