diff --git a/internal/api/nats.go b/internal/api/nats.go index efd04406..48c6449b 100644 --- a/internal/api/nats.go +++ b/internal/api/nats.go @@ -6,7 +6,6 @@ package api import ( - "bytes" "database/sql" "encoding/json" "strings" @@ -26,7 +25,40 @@ import ( ) // NatsAPI provides NATS subscription-based handlers for Job and Node operations. -// It mirrors the functionality of the REST API but uses NATS messaging. +// It mirrors the functionality of the REST API but uses NATS messaging with +// InfluxDB line protocol as the message format. +// +// # Message Format +// +// All NATS messages use InfluxDB line protocol format (https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) +// with the following structure: +// +// measurement,tag1=value1,tag2=value2 field1=value1,field2=value2 timestamp +// +// # Job Events +// +// Job start/stop events use the "job" measurement with a "function" tag to distinguish operations: +// +// job,function=start_job event="{...JSON payload...}" +// job,function=stop_job event="{...JSON payload...}" +// +// The JSON payload in the "event" field follows the schema.Job or StopJobAPIRequest structure. +// +// Example job start message: +// +// job,function=start_job event="{\"jobId\":1001,\"user\":\"testuser\",\"cluster\":\"testcluster\",...}" 1234567890000000000 +// +// # Node State Events +// +// Node state updates use the "nodestate" measurement with cluster information: +// +// nodestate event="{...JSON payload...}" +// +// The JSON payload follows the UpdateNodeStatesRequest structure. +// +// Example node state message: +// +// nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[{\"hostname\":\"node01\",\"states\":[\"idle\"]}]}" 1234567890000000000 type NatsAPI struct { JobRepository *repository.JobRepository // RepositoryMutex protects job creation operations from race conditions @@ -67,10 +99,12 @@ func (api *NatsAPI) StartSubscriptions() error { return nil } +// processJobEvent routes job event messages to the appropriate handler based on the "function" tag. +// Validates that required tags and fields are present before processing. func (api *NatsAPI) processJobEvent(msg lp.CCMessage) { function, ok := msg.GetTag("function") if !ok { - cclog.Errorf("Job event is missing tag 'function': %+v", msg) + cclog.Errorf("Job event is missing required tag 'function': measurement=%s", msg.Name()) return } @@ -78,43 +112,66 @@ func (api *NatsAPI) processJobEvent(msg lp.CCMessage) { case "start_job": v, ok := msg.GetEventValue() if !ok { - cclog.Errorf("Job event is missing event value: %+v", msg) + cclog.Errorf("Job start event is missing event field with JSON payload") + return } api.handleStartJob(v) case "stop_job": v, ok := msg.GetEventValue() if !ok { - cclog.Errorf("Job event is missing event value: %+v", msg) + cclog.Errorf("Job stop event is missing event field with JSON payload") + return } api.handleStopJob(v) + default: - cclog.Warnf("Unimplemented job event: %+v", msg) + cclog.Warnf("Unknown job event function '%s', expected 'start_job' or 'stop_job'", function) } } +// handleJobEvent processes job-related messages received via NATS using InfluxDB line protocol. +// The message must be in line protocol format with measurement="job" and include: +// - tag "function" with value "start_job" or "stop_job" +// - field "event" containing JSON payload (schema.Job or StopJobAPIRequest) +// +// Example: job,function=start_job event="{\"jobId\":1001,...}" 1234567890000000000 func (api *NatsAPI) handleJobEvent(subject string, data []byte) { + if len(data) == 0 { + cclog.Warnf("NATS %s: received empty message", subject) + return + } + d := influx.NewDecoderWithBytes(data) for d.Next() { m, err := receivers.DecodeInfluxMessage(d) if err != nil { - cclog.Errorf("NATS %s: Failed to decode message: %v", subject, err) + cclog.Errorf("NATS %s: failed to decode InfluxDB line protocol message: %v", subject, err) return } - if m.IsEvent() { - if m.Name() == "job" { - api.processJobEvent(m) - } + if !m.IsEvent() { + cclog.Warnf("NATS %s: received non-event message, skipping", subject) + continue } + if m.Name() == "job" { + api.processJobEvent(m) + } else { + cclog.Warnf("NATS %s: unexpected measurement name '%s', expected 'job'", subject, m.Name()) + } } } // handleStartJob processes job start messages received via NATS. -// Expected JSON payload follows the schema.Job structure. +// The payload parameter contains JSON following the schema.Job structure. +// Jobs are validated, checked for duplicates, and inserted into the database. func (api *NatsAPI) handleStartJob(payload string) { + if payload == "" { + cclog.Error("NATS start job: payload is empty") + return + } req := schema.Job{ Shared: "none", MonitoringStatus: schema.MonitoringStatusRunningOrArchiving, @@ -173,8 +230,13 @@ func (api *NatsAPI) handleStartJob(payload string) { } // handleStopJob processes job stop messages received via NATS. -// Expected JSON payload follows the StopJobAPIRequest structure. +// The payload parameter contains JSON following the StopJobAPIRequest structure. +// The job is marked as stopped in the database and archiving is triggered if monitoring is enabled. func (api *NatsAPI) handleStopJob(payload string) { + if payload == "" { + cclog.Error("NATS stop job: payload is empty") + return + } var req StopJobAPIRequest dec := json.NewDecoder(strings.NewReader(payload)) @@ -243,15 +305,21 @@ func (api *NatsAPI) handleStopJob(payload string) { archiver.TriggerArchiving(job) } -// handleNodeState processes node state update messages received via NATS. -// Expected JSON payload follows the UpdateNodeStatesRequest structure. -func (api *NatsAPI) handleNodeState(subject string, data []byte) { +// processNodestateEvent extracts and processes node state data from the InfluxDB message. +// Updates node states in the repository for all nodes in the payload. +func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) { + v, ok := msg.GetEventValue() + if !ok { + cclog.Errorf("Nodestate event is missing event field with JSON payload") + return + } + var req UpdateNodeStatesRequest - dec := json.NewDecoder(bytes.NewReader(data)) + dec := json.NewDecoder(strings.NewReader(v)) dec.DisallowUnknownFields() if err := dec.Decode(&req); err != nil { - cclog.Errorf("NATS %s: parsing request failed: %v", subject, err) + cclog.Errorf("NATS nodestate: parsing request failed: %v", err) return } @@ -270,10 +338,43 @@ func (api *NatsAPI) handleNodeState(subject string, data []byte) { } if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil { - cclog.Errorf("NATS %s: updating node state for %s on %s failed: %v", - subject, node.Hostname, req.Cluster, err) + cclog.Errorf("NATS nodestate: updating node state for %s on %s failed: %v", + node.Hostname, req.Cluster, err) } } - cclog.Debugf("NATS %s: updated %d node states for cluster %s", subject, len(req.Nodes), req.Cluster) + cclog.Debugf("NATS nodestate: updated %d node states for cluster %s", len(req.Nodes), req.Cluster) +} + +// handleNodeState processes node state update messages received via NATS using InfluxDB line protocol. +// The message must be in line protocol format with measurement="nodestate" and include: +// - field "event" containing JSON payload (UpdateNodeStatesRequest) +// +// Example: nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[...]}" 1234567890000000000 +func (api *NatsAPI) handleNodeState(subject string, data []byte) { + if len(data) == 0 { + cclog.Warnf("NATS %s: received empty message", subject) + return + } + + d := influx.NewDecoderWithBytes(data) + + for d.Next() { + m, err := receivers.DecodeInfluxMessage(d) + if err != nil { + cclog.Errorf("NATS %s: failed to decode InfluxDB line protocol message: %v", subject, err) + return + } + + if !m.IsEvent() { + cclog.Warnf("NATS %s: received non-event message, skipping", subject) + continue + } + + if m.Name() == "nodestate" { + api.processNodestateEvent(m) + } else { + cclog.Warnf("NATS %s: unexpected measurement name '%s', expected 'nodestate'", subject, m.Name()) + } + } } diff --git a/internal/api/nats_test.go b/internal/api/nats_test.go index 319668bb..4b1431cb 100644 --- a/internal/api/nats_test.go +++ b/internal/api/nats_test.go @@ -603,25 +603,13 @@ func TestNatsHandleNodeState(t *testing.T) { tests := []struct { name string - payload string + data []byte expectError bool validateFn func(t *testing.T) }{ { - name: "valid node state update", - payload: `{ - "cluster": "testcluster", - "nodes": [ - { - "hostname": "host123", - "states": ["allocated"], - "cpusAllocated": 8, - "memoryAllocated": 16384, - "gpusAllocated": 0, - "jobsRunning": 1 - } - ] - }`, + name: "valid node state update", + data: []byte(`nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[{\"hostname\":\"host123\",\"states\":[\"allocated\"],\"cpusAllocated\":8,\"memoryAllocated\":16384,\"gpusAllocated\":0,\"jobsRunning\":1}]}" 1234567890000000000`), expectError: false, validateFn: func(t *testing.T) { // In a full test, we would verify the node state was updated in the database @@ -629,51 +617,35 @@ func TestNatsHandleNodeState(t *testing.T) { }, }, { - name: "multiple nodes", - payload: `{ - "cluster": "testcluster", - "nodes": [ - { - "hostname": "host123", - "states": ["idle"], - "cpusAllocated": 0, - "memoryAllocated": 0, - "gpusAllocated": 0, - "jobsRunning": 0 - }, - { - "hostname": "host124", - "states": ["allocated"], - "cpusAllocated": 4, - "memoryAllocated": 8192, - "gpusAllocated": 1, - "jobsRunning": 1 - } - ] - }`, + name: "multiple nodes", + data: []byte(`nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[{\"hostname\":\"host123\",\"states\":[\"idle\"],\"cpusAllocated\":0,\"memoryAllocated\":0,\"gpusAllocated\":0,\"jobsRunning\":0},{\"hostname\":\"host124\",\"states\":[\"allocated\"],\"cpusAllocated\":4,\"memoryAllocated\":8192,\"gpusAllocated\":1,\"jobsRunning\":1}]}" 1234567890000000000`), expectError: false, }, { - name: "invalid JSON", - payload: `{ - "cluster": "testcluster", - "nodes": "not an array" - }`, + name: "invalid JSON in event field", + data: []byte(`nodestate event="{\"cluster\":\"testcluster\",\"nodes\":\"not an array\"}" 1234567890000000000`), expectError: true, }, { - name: "empty nodes array", - payload: `{ - "cluster": "testcluster", - "nodes": [] - }`, + name: "empty nodes array", + data: []byte(`nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[]}" 1234567890000000000`), expectError: false, // Empty array should not cause error }, + { + name: "invalid line protocol format", + data: []byte(`invalid line protocol format`), + expectError: true, + }, + { + name: "empty data", + data: []byte(``), + expectError: false, // Should be handled gracefully with warning + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - natsAPI.handleNodeState("test.subject", []byte(tt.payload)) + natsAPI.handleNodeState("test.subject", tt.data) // Allow some time for async operations time.Sleep(50 * time.Millisecond) @@ -789,7 +761,7 @@ func TestNatsHandleJobEvent(t *testing.T) { }{ { name: "valid influx line protocol", - data: []byte(`job,function=start_job event="{\"jobId\":4001,\"user\":\"testuser\",\"project\":\"testproj\",\"cluster\":\"testcluster\",\"partition\":\"main\",\"walltime\":3600,\"numNodes\":1,\"numHwthreads\":8,\"numAcc\":0,\"shared\":\"none\",\"monitoringStatus\":1,\"smt\":1,\"resources\":[{\"hostname\":\"host123\",\"hwthreads\":[0,1,2,3]}],\"startTime\":1234567890}"`), + data: []byte(`job,function=start_job event="{\"jobId\":4001,\"user\":\"testuser\",\"project\":\"testproj\",\"cluster\":\"testcluster\",\"partition\":\"main\",\"walltime\":3600,\"numNodes\":1,\"numHwthreads\":8,\"numAcc\":0,\"shared\":\"none\",\"monitoringStatus\":1,\"smt\":1,\"resources\":[{\"hostname\":\"host123\",\"hwthreads\":[0,1,2,3]}],\"startTime\":1234567890}" 1234567890000000000`), expectError: false, }, { @@ -814,6 +786,106 @@ func TestNatsHandleJobEvent(t *testing.T) { } } +func TestNatsHandleJobEventEdgeCases(t *testing.T) { + natsAPI := setupNatsTest(t) + t.Cleanup(cleanupNatsTest) + + tests := []struct { + name string + data []byte + expectError bool + description string + }{ + { + name: "non-event message (metric data)", + data: []byte(`job,function=start_job value=123.45 1234567890000000000`), + expectError: false, + description: "Should skip non-event messages gracefully", + }, + { + name: "wrong measurement name", + data: []byte(`wrongmeasurement,function=start_job event="{}" 1234567890000000000`), + expectError: false, + description: "Should warn about unexpected measurement but not fail", + }, + { + name: "missing event field", + data: []byte(`job,function=start_job other_field="value" 1234567890000000000`), + expectError: true, + description: "Should error when event field is missing", + }, + { + name: "multiple measurements in one message", + data: []byte("job,function=start_job event=\"{}\" 1234567890000000000\njob,function=stop_job event=\"{}\" 1234567890000000000"), + expectError: false, + description: "Should process multiple lines", + }, + { + name: "escaped quotes in JSON payload", + data: []byte(`job,function=start_job event="{\"jobId\":6001,\"user\":\"test\\\"user\",\"cluster\":\"test\"}" 1234567890000000000`), + expectError: true, + description: "Should handle escaped quotes (though JSON parsing may fail)", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + natsAPI.handleJobEvent("test.subject", tt.data) + time.Sleep(50 * time.Millisecond) + }) + } +} + +func TestNatsHandleNodeStateEdgeCases(t *testing.T) { + natsAPI := setupNatsTest(t) + t.Cleanup(cleanupNatsTest) + + tests := []struct { + name string + data []byte + expectError bool + description string + }{ + { + name: "missing cluster field in JSON", + data: []byte(`nodestate event="{\"nodes\":[]}" 1234567890000000000`), + expectError: true, + description: "Should fail when cluster is missing", + }, + { + name: "malformed JSON with unescaped quotes", + data: []byte(`nodestate event="{\"cluster\":\"test"cluster\",\"nodes\":[]}" 1234567890000000000`), + expectError: true, + description: "Should fail on malformed JSON", + }, + { + name: "unicode characters in hostname", + data: []byte(`nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[{\"hostname\":\"host-ñ123\",\"states\":[\"idle\"],\"cpusAllocated\":0,\"memoryAllocated\":0,\"gpusAllocated\":0,\"jobsRunning\":0}]}" 1234567890000000000`), + expectError: false, + description: "Should handle unicode characters", + }, + { + name: "very large node count", + data: []byte(`nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[{\"hostname\":\"node1\",\"states\":[\"idle\"],\"cpusAllocated\":0,\"memoryAllocated\":0,\"gpusAllocated\":0,\"jobsRunning\":0},{\"hostname\":\"node2\",\"states\":[\"idle\"],\"cpusAllocated\":0,\"memoryAllocated\":0,\"gpusAllocated\":0,\"jobsRunning\":0},{\"hostname\":\"node3\",\"states\":[\"idle\"],\"cpusAllocated\":0,\"memoryAllocated\":0,\"gpusAllocated\":0,\"jobsRunning\":0}]}" 1234567890000000000`), + expectError: false, + description: "Should handle multiple nodes efficiently", + }, + { + name: "timestamp in past", + data: []byte(`nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[]}" 1000000000000000000`), + expectError: false, + description: "Should accept any valid timestamp", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + natsAPI.handleNodeState("test.subject", tt.data) + time.Sleep(50 * time.Millisecond) + }) + } +} + func TestNatsHandleStartJobDuplicatePrevention(t *testing.T) { natsAPI := setupNatsTest(t) t.Cleanup(cleanupNatsTest)