Convert nodestate nats API to influx line protocol payload. Review and add doc comments.

Improve and extend tests
This commit is contained in:
2026-01-14 10:08:06 +01:00
parent 9e542dc200
commit b2f870e3c0
2 changed files with 243 additions and 70 deletions

View File

@@ -6,7 +6,6 @@
package api package api
import ( import (
"bytes"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"strings" "strings"
@@ -26,7 +25,40 @@ import (
) )
// NatsAPI provides NATS subscription-based handlers for Job and Node operations. // NatsAPI provides NATS subscription-based handlers for Job and Node operations.
// It mirrors the functionality of the REST API but uses NATS messaging. // It mirrors the functionality of the REST API but uses NATS messaging with
// InfluxDB line protocol as the message format.
//
// # Message Format
//
// All NATS messages use InfluxDB line protocol format (https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/)
// with the following structure:
//
// measurement,tag1=value1,tag2=value2 field1=value1,field2=value2 timestamp
//
// # Job Events
//
// Job start/stop events use the "job" measurement with a "function" tag to distinguish operations:
//
// job,function=start_job event="{...JSON payload...}" <timestamp>
// job,function=stop_job event="{...JSON payload...}" <timestamp>
//
// The JSON payload in the "event" field follows the schema.Job or StopJobAPIRequest structure.
//
// Example job start message:
//
// job,function=start_job event="{\"jobId\":1001,\"user\":\"testuser\",\"cluster\":\"testcluster\",...}" 1234567890000000000
//
// # Node State Events
//
// Node state updates use the "nodestate" measurement with cluster information:
//
// nodestate event="{...JSON payload...}" <timestamp>
//
// The JSON payload follows the UpdateNodeStatesRequest structure.
//
// Example node state message:
//
// nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[{\"hostname\":\"node01\",\"states\":[\"idle\"]}]}" 1234567890000000000
type NatsAPI struct { type NatsAPI struct {
JobRepository *repository.JobRepository JobRepository *repository.JobRepository
// RepositoryMutex protects job creation operations from race conditions // RepositoryMutex protects job creation operations from race conditions
@@ -67,10 +99,12 @@ func (api *NatsAPI) StartSubscriptions() error {
return nil return nil
} }
// processJobEvent routes job event messages to the appropriate handler based on the "function" tag.
// Validates that required tags and fields are present before processing.
func (api *NatsAPI) processJobEvent(msg lp.CCMessage) { func (api *NatsAPI) processJobEvent(msg lp.CCMessage) {
function, ok := msg.GetTag("function") function, ok := msg.GetTag("function")
if !ok { if !ok {
cclog.Errorf("Job event is missing tag 'function': %+v", msg) cclog.Errorf("Job event is missing required tag 'function': measurement=%s", msg.Name())
return return
} }
@@ -78,43 +112,66 @@ func (api *NatsAPI) processJobEvent(msg lp.CCMessage) {
case "start_job": case "start_job":
v, ok := msg.GetEventValue() v, ok := msg.GetEventValue()
if !ok { if !ok {
cclog.Errorf("Job event is missing event value: %+v", msg) cclog.Errorf("Job start event is missing event field with JSON payload")
return
} }
api.handleStartJob(v) api.handleStartJob(v)
case "stop_job": case "stop_job":
v, ok := msg.GetEventValue() v, ok := msg.GetEventValue()
if !ok { if !ok {
cclog.Errorf("Job event is missing event value: %+v", msg) cclog.Errorf("Job stop event is missing event field with JSON payload")
return
} }
api.handleStopJob(v) api.handleStopJob(v)
default: default:
cclog.Warnf("Unimplemented job event: %+v", msg) cclog.Warnf("Unknown job event function '%s', expected 'start_job' or 'stop_job'", function)
} }
} }
// handleJobEvent processes job-related messages received via NATS using InfluxDB line protocol.
// The message must be in line protocol format with measurement="job" and include:
// - tag "function" with value "start_job" or "stop_job"
// - field "event" containing JSON payload (schema.Job or StopJobAPIRequest)
//
// Example: job,function=start_job event="{\"jobId\":1001,...}" 1234567890000000000
func (api *NatsAPI) handleJobEvent(subject string, data []byte) { func (api *NatsAPI) handleJobEvent(subject string, data []byte) {
if len(data) == 0 {
cclog.Warnf("NATS %s: received empty message", subject)
return
}
d := influx.NewDecoderWithBytes(data) d := influx.NewDecoderWithBytes(data)
for d.Next() { for d.Next() {
m, err := receivers.DecodeInfluxMessage(d) m, err := receivers.DecodeInfluxMessage(d)
if err != nil { if err != nil {
cclog.Errorf("NATS %s: Failed to decode message: %v", subject, err) cclog.Errorf("NATS %s: failed to decode InfluxDB line protocol message: %v", subject, err)
return return
} }
if m.IsEvent() { if !m.IsEvent() {
if m.Name() == "job" { cclog.Warnf("NATS %s: received non-event message, skipping", subject)
api.processJobEvent(m) continue
}
} }
if m.Name() == "job" {
api.processJobEvent(m)
} else {
cclog.Warnf("NATS %s: unexpected measurement name '%s', expected 'job'", subject, m.Name())
}
} }
} }
// handleStartJob processes job start messages received via NATS. // handleStartJob processes job start messages received via NATS.
// Expected JSON payload follows the schema.Job structure. // The payload parameter contains JSON following the schema.Job structure.
// Jobs are validated, checked for duplicates, and inserted into the database.
func (api *NatsAPI) handleStartJob(payload string) { func (api *NatsAPI) handleStartJob(payload string) {
if payload == "" {
cclog.Error("NATS start job: payload is empty")
return
}
req := schema.Job{ req := schema.Job{
Shared: "none", Shared: "none",
MonitoringStatus: schema.MonitoringStatusRunningOrArchiving, MonitoringStatus: schema.MonitoringStatusRunningOrArchiving,
@@ -173,8 +230,13 @@ func (api *NatsAPI) handleStartJob(payload string) {
} }
// handleStopJob processes job stop messages received via NATS. // handleStopJob processes job stop messages received via NATS.
// Expected JSON payload follows the StopJobAPIRequest structure. // The payload parameter contains JSON following the StopJobAPIRequest structure.
// The job is marked as stopped in the database and archiving is triggered if monitoring is enabled.
func (api *NatsAPI) handleStopJob(payload string) { func (api *NatsAPI) handleStopJob(payload string) {
if payload == "" {
cclog.Error("NATS stop job: payload is empty")
return
}
var req StopJobAPIRequest var req StopJobAPIRequest
dec := json.NewDecoder(strings.NewReader(payload)) dec := json.NewDecoder(strings.NewReader(payload))
@@ -243,15 +305,21 @@ func (api *NatsAPI) handleStopJob(payload string) {
archiver.TriggerArchiving(job) archiver.TriggerArchiving(job)
} }
// handleNodeState processes node state update messages received via NATS. // processNodestateEvent extracts and processes node state data from the InfluxDB message.
// Expected JSON payload follows the UpdateNodeStatesRequest structure. // Updates node states in the repository for all nodes in the payload.
func (api *NatsAPI) handleNodeState(subject string, data []byte) { func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) {
v, ok := msg.GetEventValue()
if !ok {
cclog.Errorf("Nodestate event is missing event field with JSON payload")
return
}
var req UpdateNodeStatesRequest var req UpdateNodeStatesRequest
dec := json.NewDecoder(bytes.NewReader(data)) dec := json.NewDecoder(strings.NewReader(v))
dec.DisallowUnknownFields() dec.DisallowUnknownFields()
if err := dec.Decode(&req); err != nil { if err := dec.Decode(&req); err != nil {
cclog.Errorf("NATS %s: parsing request failed: %v", subject, err) cclog.Errorf("NATS nodestate: parsing request failed: %v", err)
return return
} }
@@ -270,10 +338,43 @@ func (api *NatsAPI) handleNodeState(subject string, data []byte) {
} }
if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil { if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil {
cclog.Errorf("NATS %s: updating node state for %s on %s failed: %v", cclog.Errorf("NATS nodestate: updating node state for %s on %s failed: %v",
subject, node.Hostname, req.Cluster, err) node.Hostname, req.Cluster, err)
} }
} }
cclog.Debugf("NATS %s: updated %d node states for cluster %s", subject, len(req.Nodes), req.Cluster) cclog.Debugf("NATS nodestate: updated %d node states for cluster %s", len(req.Nodes), req.Cluster)
}
// handleNodeState processes node state update messages received via NATS using InfluxDB line protocol.
// The message must be in line protocol format with measurement="nodestate" and include:
// - field "event" containing JSON payload (UpdateNodeStatesRequest)
//
// Example: nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[...]}" 1234567890000000000
func (api *NatsAPI) handleNodeState(subject string, data []byte) {
if len(data) == 0 {
cclog.Warnf("NATS %s: received empty message", subject)
return
}
d := influx.NewDecoderWithBytes(data)
for d.Next() {
m, err := receivers.DecodeInfluxMessage(d)
if err != nil {
cclog.Errorf("NATS %s: failed to decode InfluxDB line protocol message: %v", subject, err)
return
}
if !m.IsEvent() {
cclog.Warnf("NATS %s: received non-event message, skipping", subject)
continue
}
if m.Name() == "nodestate" {
api.processNodestateEvent(m)
} else {
cclog.Warnf("NATS %s: unexpected measurement name '%s', expected 'nodestate'", subject, m.Name())
}
}
} }

View File

@@ -603,25 +603,13 @@ func TestNatsHandleNodeState(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
payload string data []byte
expectError bool expectError bool
validateFn func(t *testing.T) validateFn func(t *testing.T)
}{ }{
{ {
name: "valid node state update", name: "valid node state update",
payload: `{ data: []byte(`nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[{\"hostname\":\"host123\",\"states\":[\"allocated\"],\"cpusAllocated\":8,\"memoryAllocated\":16384,\"gpusAllocated\":0,\"jobsRunning\":1}]}" 1234567890000000000`),
"cluster": "testcluster",
"nodes": [
{
"hostname": "host123",
"states": ["allocated"],
"cpusAllocated": 8,
"memoryAllocated": 16384,
"gpusAllocated": 0,
"jobsRunning": 1
}
]
}`,
expectError: false, expectError: false,
validateFn: func(t *testing.T) { validateFn: func(t *testing.T) {
// In a full test, we would verify the node state was updated in the database // In a full test, we would verify the node state was updated in the database
@@ -630,50 +618,34 @@ func TestNatsHandleNodeState(t *testing.T) {
}, },
{ {
name: "multiple nodes", name: "multiple nodes",
payload: `{ data: []byte(`nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[{\"hostname\":\"host123\",\"states\":[\"idle\"],\"cpusAllocated\":0,\"memoryAllocated\":0,\"gpusAllocated\":0,\"jobsRunning\":0},{\"hostname\":\"host124\",\"states\":[\"allocated\"],\"cpusAllocated\":4,\"memoryAllocated\":8192,\"gpusAllocated\":1,\"jobsRunning\":1}]}" 1234567890000000000`),
"cluster": "testcluster",
"nodes": [
{
"hostname": "host123",
"states": ["idle"],
"cpusAllocated": 0,
"memoryAllocated": 0,
"gpusAllocated": 0,
"jobsRunning": 0
},
{
"hostname": "host124",
"states": ["allocated"],
"cpusAllocated": 4,
"memoryAllocated": 8192,
"gpusAllocated": 1,
"jobsRunning": 1
}
]
}`,
expectError: false, expectError: false,
}, },
{ {
name: "invalid JSON", name: "invalid JSON in event field",
payload: `{ data: []byte(`nodestate event="{\"cluster\":\"testcluster\",\"nodes\":\"not an array\"}" 1234567890000000000`),
"cluster": "testcluster",
"nodes": "not an array"
}`,
expectError: true, expectError: true,
}, },
{ {
name: "empty nodes array", name: "empty nodes array",
payload: `{ data: []byte(`nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[]}" 1234567890000000000`),
"cluster": "testcluster",
"nodes": []
}`,
expectError: false, // Empty array should not cause error expectError: false, // Empty array should not cause error
}, },
{
name: "invalid line protocol format",
data: []byte(`invalid line protocol format`),
expectError: true,
},
{
name: "empty data",
data: []byte(``),
expectError: false, // Should be handled gracefully with warning
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
natsAPI.handleNodeState("test.subject", []byte(tt.payload)) natsAPI.handleNodeState("test.subject", tt.data)
// Allow some time for async operations // Allow some time for async operations
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
@@ -789,7 +761,7 @@ func TestNatsHandleJobEvent(t *testing.T) {
}{ }{
{ {
name: "valid influx line protocol", name: "valid influx line protocol",
data: []byte(`job,function=start_job event="{\"jobId\":4001,\"user\":\"testuser\",\"project\":\"testproj\",\"cluster\":\"testcluster\",\"partition\":\"main\",\"walltime\":3600,\"numNodes\":1,\"numHwthreads\":8,\"numAcc\":0,\"shared\":\"none\",\"monitoringStatus\":1,\"smt\":1,\"resources\":[{\"hostname\":\"host123\",\"hwthreads\":[0,1,2,3]}],\"startTime\":1234567890}"`), data: []byte(`job,function=start_job event="{\"jobId\":4001,\"user\":\"testuser\",\"project\":\"testproj\",\"cluster\":\"testcluster\",\"partition\":\"main\",\"walltime\":3600,\"numNodes\":1,\"numHwthreads\":8,\"numAcc\":0,\"shared\":\"none\",\"monitoringStatus\":1,\"smt\":1,\"resources\":[{\"hostname\":\"host123\",\"hwthreads\":[0,1,2,3]}],\"startTime\":1234567890}" 1234567890000000000`),
expectError: false, expectError: false,
}, },
{ {
@@ -814,6 +786,106 @@ func TestNatsHandleJobEvent(t *testing.T) {
} }
} }
func TestNatsHandleJobEventEdgeCases(t *testing.T) {
natsAPI := setupNatsTest(t)
t.Cleanup(cleanupNatsTest)
tests := []struct {
name string
data []byte
expectError bool
description string
}{
{
name: "non-event message (metric data)",
data: []byte(`job,function=start_job value=123.45 1234567890000000000`),
expectError: false,
description: "Should skip non-event messages gracefully",
},
{
name: "wrong measurement name",
data: []byte(`wrongmeasurement,function=start_job event="{}" 1234567890000000000`),
expectError: false,
description: "Should warn about unexpected measurement but not fail",
},
{
name: "missing event field",
data: []byte(`job,function=start_job other_field="value" 1234567890000000000`),
expectError: true,
description: "Should error when event field is missing",
},
{
name: "multiple measurements in one message",
data: []byte("job,function=start_job event=\"{}\" 1234567890000000000\njob,function=stop_job event=\"{}\" 1234567890000000000"),
expectError: false,
description: "Should process multiple lines",
},
{
name: "escaped quotes in JSON payload",
data: []byte(`job,function=start_job event="{\"jobId\":6001,\"user\":\"test\\\"user\",\"cluster\":\"test\"}" 1234567890000000000`),
expectError: true,
description: "Should handle escaped quotes (though JSON parsing may fail)",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
natsAPI.handleJobEvent("test.subject", tt.data)
time.Sleep(50 * time.Millisecond)
})
}
}
func TestNatsHandleNodeStateEdgeCases(t *testing.T) {
natsAPI := setupNatsTest(t)
t.Cleanup(cleanupNatsTest)
tests := []struct {
name string
data []byte
expectError bool
description string
}{
{
name: "missing cluster field in JSON",
data: []byte(`nodestate event="{\"nodes\":[]}" 1234567890000000000`),
expectError: true,
description: "Should fail when cluster is missing",
},
{
name: "malformed JSON with unescaped quotes",
data: []byte(`nodestate event="{\"cluster\":\"test"cluster\",\"nodes\":[]}" 1234567890000000000`),
expectError: true,
description: "Should fail on malformed JSON",
},
{
name: "unicode characters in hostname",
data: []byte(`nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[{\"hostname\":\"host-ñ123\",\"states\":[\"idle\"],\"cpusAllocated\":0,\"memoryAllocated\":0,\"gpusAllocated\":0,\"jobsRunning\":0}]}" 1234567890000000000`),
expectError: false,
description: "Should handle unicode characters",
},
{
name: "very large node count",
data: []byte(`nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[{\"hostname\":\"node1\",\"states\":[\"idle\"],\"cpusAllocated\":0,\"memoryAllocated\":0,\"gpusAllocated\":0,\"jobsRunning\":0},{\"hostname\":\"node2\",\"states\":[\"idle\"],\"cpusAllocated\":0,\"memoryAllocated\":0,\"gpusAllocated\":0,\"jobsRunning\":0},{\"hostname\":\"node3\",\"states\":[\"idle\"],\"cpusAllocated\":0,\"memoryAllocated\":0,\"gpusAllocated\":0,\"jobsRunning\":0}]}" 1234567890000000000`),
expectError: false,
description: "Should handle multiple nodes efficiently",
},
{
name: "timestamp in past",
data: []byte(`nodestate event="{\"cluster\":\"testcluster\",\"nodes\":[]}" 1000000000000000000`),
expectError: false,
description: "Should accept any valid timestamp",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
natsAPI.handleNodeState("test.subject", tt.data)
time.Sleep(50 * time.Millisecond)
})
}
}
func TestNatsHandleStartJobDuplicatePrevention(t *testing.T) { func TestNatsHandleStartJobDuplicatePrevention(t *testing.T) {
natsAPI := setupNatsTest(t) natsAPI := setupNatsTest(t)
t.Cleanup(cleanupNatsTest) t.Cleanup(cleanupNatsTest)