diff --git a/CLAUDE.md b/CLAUDE.md index a8d56571..ef030cd5 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -151,9 +151,11 @@ applied automatically on startup. Version tracking in `version` table. ## Configuration - **config.json**: Main configuration (clusters, metric repositories, archive settings) - - `main.apiSubjects`: NATS subject configuration (optional) - - `subjectJobEvent`: Subject for job start/stop events (e.g., "cc.job.event") - - `subjectNodeState`: Subject for node state updates (e.g., "cc.node.state") + - `main.api-subjects`: NATS subject configuration (optional) + - `subject-job-event`: Subject for job start/stop events (e.g., "cc.job.event") + - `subject-node-state`: Subject for node state updates (e.g., "cc.node.state") + - `job-concurrency`: Worker goroutines for job events (default: 8) + - `node-concurrency`: Worker goroutines for node state events (default: 2) - `nats`: NATS client connection configuration (optional) - `address`: NATS server address (e.g., "nats://localhost:4222") - `username`: Authentication username (optional) @@ -241,13 +243,19 @@ The backend supports a NATS-based API as an alternative to the REST API for job ```json { "main": { - "apiSubjects": { - "subjectJobEvent": "cc.job.event", - "subjectNodeState": "cc.node.state" + "api-subjects": { + "subject-job-event": "cc.job.event", + "subject-node-state": "cc.node.state", + "job-concurrency": 8, + "node-concurrency": 2 } } } ``` + - `subject-job-event` (required): NATS subject for job start/stop events + - `subject-node-state` (required): NATS subject for node state updates + - `job-concurrency` (optional, default: 8): Number of concurrent worker goroutines for job events + - `node-concurrency` (optional, default: 2): Number of concurrent worker goroutines for node state events ### Message Format @@ -292,9 +300,10 @@ job,function=stop_job event="{\"jobId\":123,\"cluster\":\"test\",\"startTime\":1 ### Implementation Notes - NATS API mirrors REST API functionality but uses messaging -- Job start/stop events are processed asynchronously +- Job start/stop events are processed asynchronously via configurable worker pools - Duplicate job detection is handled (same as REST API) - All validation rules from REST API apply +- Node state updates include health checks against the metric store (identical to REST handler): nodes are grouped by subcluster, metric configurations are fetched, and `HealthCheck()` is called per subcluster. Nodes default to `MonitoringStateFailed` if no health data is available. - Messages are logged; no responses are sent back to publishers - If NATS client is unavailable, API subscriptions are skipped (logged as warning) diff --git a/internal/api/nats.go b/internal/api/nats.go index 1834ff28..db229a04 100644 --- a/internal/api/nats.go +++ b/internal/api/nats.go @@ -8,6 +8,7 @@ package api import ( "database/sql" "encoding/json" + "maps" "strings" "sync" "time" @@ -15,7 +16,10 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/archiver" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/importer" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatch" "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/metricstore" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" "github.com/ClusterCockpit/cc-lib/v2/nats" @@ -24,6 +28,12 @@ import ( influx "github.com/ClusterCockpit/cc-line-protocol/v2/lineprotocol" ) +// natsMessage wraps a raw NATS message with its subject for channel-based processing. +type natsMessage struct { + subject string + data []byte +} + // NatsAPI provides NATS subscription-based handlers for Job and Node operations. // It mirrors the functionality of the REST API but uses NATS messaging with // InfluxDB line protocol as the message format. @@ -64,23 +74,60 @@ type NatsAPI struct { // RepositoryMutex protects job creation operations from race conditions // when checking for duplicate jobs during startJob calls. RepositoryMutex sync.Mutex - // jobSem limits concurrent NATS job event processing to prevent goroutine accumulation - // when the database is under contention. - jobSem chan struct{} - // nodeSem limits concurrent NATS node state processing. - nodeSem chan struct{} + // jobCh receives job event messages for processing by worker goroutines. + jobCh chan natsMessage + // nodeCh receives node state messages for processing by worker goroutines. + nodeCh chan natsMessage } -// NewNatsAPI creates a new NatsAPI instance with default dependencies. +// NewNatsAPI creates a new NatsAPI instance with channel-based worker pools. +// Concurrency is configured via NATSConfig (defaults: JobConcurrency=8, NodeConcurrency=2). func NewNatsAPI() *NatsAPI { - return &NatsAPI{ + jobConc := 8 + nodeConc := 2 + + if s := config.Keys.APISubjects; s != nil { + if s.JobConcurrency > 0 { + jobConc = s.JobConcurrency + } + if s.NodeConcurrency > 0 { + nodeConc = s.NodeConcurrency + } + } + + api := &NatsAPI{ JobRepository: repository.GetJobRepository(), - jobSem: make(chan struct{}, 8), - nodeSem: make(chan struct{}, 2), + jobCh: make(chan natsMessage, jobConc), + nodeCh: make(chan natsMessage, nodeConc), + } + + // Start worker goroutines + for range jobConc { + go api.jobWorker() + } + for range nodeConc { + go api.nodeWorker() + } + + return api +} + +// jobWorker processes job event messages from the job channel. +func (api *NatsAPI) jobWorker() { + for msg := range api.jobCh { + api.handleJobEvent(msg.subject, msg.data) + } +} + +// nodeWorker processes node state messages from the node channel. +func (api *NatsAPI) nodeWorker() { + for msg := range api.nodeCh { + api.handleNodeState(msg.subject, msg.data) } } // StartSubscriptions registers all NATS subscriptions for Job and Node APIs. +// Messages are delivered to buffered channels and processed by worker goroutines. // Returns an error if the NATS client is not available or subscription fails. func (api *NatsAPI) StartSubscriptions() error { client := nats.GetClient() @@ -90,14 +137,17 @@ func (api *NatsAPI) StartSubscriptions() error { } if config.Keys.APISubjects != nil { - s := config.Keys.APISubjects - if err := client.Subscribe(s.SubjectJobEvent, api.handleJobEvent); err != nil { + if err := client.Subscribe(s.SubjectJobEvent, func(subject string, data []byte) { + api.jobCh <- natsMessage{subject: subject, data: data} + }); err != nil { return err } - if err := client.Subscribe(s.SubjectNodeState, api.handleNodeState); err != nil { + if err := client.Subscribe(s.SubjectNodeState, func(subject string, data []byte) { + api.nodeCh <- natsMessage{subject: subject, data: data} + }); err != nil { return err } @@ -144,9 +194,6 @@ func (api *NatsAPI) processJobEvent(msg lp.CCMessage) { // // Example: job,function=start_job event="{\"jobId\":1001,...}" 1234567890000000000 func (api *NatsAPI) handleJobEvent(subject string, data []byte) { - api.jobSem <- struct{}{} - defer func() { <-api.jobSem }() - if len(data) == 0 { cclog.Warnf("NATS %s: received empty message", subject) return @@ -355,16 +402,55 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) { repo := repository.GetNodeRepository() requestReceived := time.Now().Unix() + // Build nodeList per subcluster for health check + m := make(map[string][]string) + metricNames := make(map[string][]string) + healthResults := make(map[string]metricstore.HealthCheckResult) + + for _, node := range req.Nodes { + if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil { + m[sc] = append(m[sc], node.Hostname) + } + } + + for sc := range m { + if sc != "" { + metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc) + metricNames[sc] = metricListToNames(metricList) + } + } + + // Perform health check against metric store + healthRepo, err := metricdispatch.GetHealthCheckRepo(req.Cluster) + if err != nil { + cclog.Warnf("NATS nodestate: no metric store for cluster %s, skipping health check: %v", req.Cluster, err) + } else { + for sc, nl := range m { + if sc != "" { + if results, err := healthRepo.HealthCheck(req.Cluster, nl, metricNames[sc]); err == nil { + maps.Copy(healthResults, results) + } + } + } + } + updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes)) for _, node := range req.Nodes { state := determineState(node.States) + healthState := schema.MonitoringStateFailed + var healthMetrics string + if result, ok := healthResults[node.Hostname]; ok { + healthState = result.State + healthMetrics = result.HealthMetrics + } nodeState := schema.NodeStateDB{ TimeStamp: requestReceived, NodeState: state, CpusAllocated: node.CpusAllocated, MemoryAllocated: node.MemoryAllocated, GpusAllocated: node.GpusAllocated, - HealthState: schema.MonitoringStateFull, + HealthState: healthState, + HealthMetrics: healthMetrics, JobsRunning: node.JobsRunning, } updates = append(updates, repository.NodeStateUpdate{ @@ -387,9 +473,6 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) { // // Example: nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[...]}" 1234567890000000000 func (api *NatsAPI) handleNodeState(subject string, data []byte) { - api.nodeSem <- struct{}{} - defer func() { <-api.nodeSem }() - if len(data) == 0 { cclog.Warnf("NATS %s: received empty message", subject) return diff --git a/internal/config/config.go b/internal/config/config.go index 517c920e..677b24e4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -117,6 +117,8 @@ type ResampleConfig struct { type NATSConfig struct { SubjectJobEvent string `json:"subject-job-event"` SubjectNodeState string `json:"subject-node-state"` + JobConcurrency int `json:"job-concurrency"` + NodeConcurrency int `json:"node-concurrency"` } type IntRange struct { diff --git a/internal/config/schema.go b/internal/config/schema.go index 26eacab9..195dfaeb 100644 --- a/internal/config/schema.go +++ b/internal/config/schema.go @@ -121,6 +121,14 @@ var configSchema = ` "subject-node-state": { "description": "NATS subject for node state updates", "type": "string" + }, + "job-concurrency": { + "description": "Number of concurrent worker goroutines for processing job events (default: 8).", + "type": "integer" + }, + "node-concurrency": { + "description": "Number of concurrent worker goroutines for processing node state events (default: 2).", + "type": "integer" } }, "required": ["subject-job-event", "subject-node-state"]