Provide idential nodestate functionality in NATS API

Entire-Checkpoint: 3a40b75edd68
This commit is contained in:
2026-03-16 12:13:14 +01:00
parent df93dbed63
commit 09d0ba71d2
4 changed files with 128 additions and 26 deletions

View File

@@ -8,6 +8,7 @@ package api
import (
"database/sql"
"encoding/json"
"maps"
"strings"
"sync"
"time"
@@ -15,7 +16,10 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/archiver"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/importer"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
"github.com/ClusterCockpit/cc-lib/v2/nats"
@@ -24,6 +28,12 @@ import (
influx "github.com/ClusterCockpit/cc-line-protocol/v2/lineprotocol"
)
// natsMessage wraps a raw NATS message with its subject for channel-based processing.
type natsMessage struct {
subject string
data []byte
}
// NatsAPI provides NATS subscription-based handlers for Job and Node operations.
// It mirrors the functionality of the REST API but uses NATS messaging with
// InfluxDB line protocol as the message format.
@@ -64,23 +74,60 @@ type NatsAPI struct {
// RepositoryMutex protects job creation operations from race conditions
// when checking for duplicate jobs during startJob calls.
RepositoryMutex sync.Mutex
// jobSem limits concurrent NATS job event processing to prevent goroutine accumulation
// when the database is under contention.
jobSem chan struct{}
// nodeSem limits concurrent NATS node state processing.
nodeSem chan struct{}
// jobCh receives job event messages for processing by worker goroutines.
jobCh chan natsMessage
// nodeCh receives node state messages for processing by worker goroutines.
nodeCh chan natsMessage
}
// NewNatsAPI creates a new NatsAPI instance with default dependencies.
// NewNatsAPI creates a new NatsAPI instance with channel-based worker pools.
// Concurrency is configured via NATSConfig (defaults: JobConcurrency=8, NodeConcurrency=2).
func NewNatsAPI() *NatsAPI {
return &NatsAPI{
jobConc := 8
nodeConc := 2
if s := config.Keys.APISubjects; s != nil {
if s.JobConcurrency > 0 {
jobConc = s.JobConcurrency
}
if s.NodeConcurrency > 0 {
nodeConc = s.NodeConcurrency
}
}
api := &NatsAPI{
JobRepository: repository.GetJobRepository(),
jobSem: make(chan struct{}, 8),
nodeSem: make(chan struct{}, 2),
jobCh: make(chan natsMessage, jobConc),
nodeCh: make(chan natsMessage, nodeConc),
}
// Start worker goroutines
for range jobConc {
go api.jobWorker()
}
for range nodeConc {
go api.nodeWorker()
}
return api
}
// jobWorker processes job event messages from the job channel.
func (api *NatsAPI) jobWorker() {
for msg := range api.jobCh {
api.handleJobEvent(msg.subject, msg.data)
}
}
// nodeWorker processes node state messages from the node channel.
func (api *NatsAPI) nodeWorker() {
for msg := range api.nodeCh {
api.handleNodeState(msg.subject, msg.data)
}
}
// StartSubscriptions registers all NATS subscriptions for Job and Node APIs.
// Messages are delivered to buffered channels and processed by worker goroutines.
// Returns an error if the NATS client is not available or subscription fails.
func (api *NatsAPI) StartSubscriptions() error {
client := nats.GetClient()
@@ -90,14 +137,17 @@ func (api *NatsAPI) StartSubscriptions() error {
}
if config.Keys.APISubjects != nil {
s := config.Keys.APISubjects
if err := client.Subscribe(s.SubjectJobEvent, api.handleJobEvent); err != nil {
if err := client.Subscribe(s.SubjectJobEvent, func(subject string, data []byte) {
api.jobCh <- natsMessage{subject: subject, data: data}
}); err != nil {
return err
}
if err := client.Subscribe(s.SubjectNodeState, api.handleNodeState); err != nil {
if err := client.Subscribe(s.SubjectNodeState, func(subject string, data []byte) {
api.nodeCh <- natsMessage{subject: subject, data: data}
}); err != nil {
return err
}
@@ -144,9 +194,6 @@ func (api *NatsAPI) processJobEvent(msg lp.CCMessage) {
//
// Example: job,function=start_job event="{\"jobId\":1001,...}" 1234567890000000000
func (api *NatsAPI) handleJobEvent(subject string, data []byte) {
api.jobSem <- struct{}{}
defer func() { <-api.jobSem }()
if len(data) == 0 {
cclog.Warnf("NATS %s: received empty message", subject)
return
@@ -355,16 +402,55 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) {
repo := repository.GetNodeRepository()
requestReceived := time.Now().Unix()
// Build nodeList per subcluster for health check
m := make(map[string][]string)
metricNames := make(map[string][]string)
healthResults := make(map[string]metricstore.HealthCheckResult)
for _, node := range req.Nodes {
if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil {
m[sc] = append(m[sc], node.Hostname)
}
}
for sc := range m {
if sc != "" {
metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc)
metricNames[sc] = metricListToNames(metricList)
}
}
// Perform health check against metric store
healthRepo, err := metricdispatch.GetHealthCheckRepo(req.Cluster)
if err != nil {
cclog.Warnf("NATS nodestate: no metric store for cluster %s, skipping health check: %v", req.Cluster, err)
} else {
for sc, nl := range m {
if sc != "" {
if results, err := healthRepo.HealthCheck(req.Cluster, nl, metricNames[sc]); err == nil {
maps.Copy(healthResults, results)
}
}
}
}
updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes))
for _, node := range req.Nodes {
state := determineState(node.States)
healthState := schema.MonitoringStateFailed
var healthMetrics string
if result, ok := healthResults[node.Hostname]; ok {
healthState = result.State
healthMetrics = result.HealthMetrics
}
nodeState := schema.NodeStateDB{
TimeStamp: requestReceived,
NodeState: state,
CpusAllocated: node.CpusAllocated,
MemoryAllocated: node.MemoryAllocated,
GpusAllocated: node.GpusAllocated,
HealthState: schema.MonitoringStateFull,
HealthState: healthState,
HealthMetrics: healthMetrics,
JobsRunning: node.JobsRunning,
}
updates = append(updates, repository.NodeStateUpdate{
@@ -387,9 +473,6 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) {
//
// Example: nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[...]}" 1234567890000000000
func (api *NatsAPI) handleNodeState(subject string, data []byte) {
api.nodeSem <- struct{}{}
defer func() { <-api.nodeSem }()
if len(data) == 0 {
cclog.Warnf("NATS %s: received empty message", subject)
return

View File

@@ -117,6 +117,8 @@ type ResampleConfig struct {
type NATSConfig struct {
SubjectJobEvent string `json:"subject-job-event"`
SubjectNodeState string `json:"subject-node-state"`
JobConcurrency int `json:"job-concurrency"`
NodeConcurrency int `json:"node-concurrency"`
}
type IntRange struct {

View File

@@ -121,6 +121,14 @@ var configSchema = `
"subject-node-state": {
"description": "NATS subject for node state updates",
"type": "string"
},
"job-concurrency": {
"description": "Number of concurrent worker goroutines for processing job events (default: 8).",
"type": "integer"
},
"node-concurrency": {
"description": "Number of concurrent worker goroutines for processing node state events (default: 2).",
"type": "integer"
}
},
"required": ["subject-job-event", "subject-node-state"]