Port NATS api to ccMessages

This commit is contained in:
2025-12-23 07:56:13 +01:00
parent 1cd4a57bd3
commit c1135531ba
5 changed files with 134 additions and 31 deletions

View File

@@ -9,6 +9,7 @@ import (
"bytes"
"database/sql"
"encoding/json"
"strings"
"sync"
"time"
@@ -18,7 +19,9 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/nats"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/ccMessage"
"github.com/ClusterCockpit/cc-lib/schema"
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
)
// NatsAPI provides NATS subscription-based handlers for Job and Node operations.
@@ -50,11 +53,7 @@ func (api *NatsAPI) StartSubscriptions() error {
s := config.Keys.APISubjects
if err := client.Subscribe(s.SubjectJobStart, api.handleStartJob); err != nil {
return err
}
if err := client.Subscribe(s.SubjectJobStop, api.handleStopJob); err != nil {
if err := client.Subscribe(s.SubjectJobEvent, api.handleJobEvent); err != nil {
return err
}
@@ -67,26 +66,63 @@ func (api *NatsAPI) StartSubscriptions() error {
return nil
}
func (api *NatsAPI) processJobEvent(msg lp.CCMessage) {
function, ok := msg.GetTag("function")
if !ok {
cclog.Errorf("Job event is missing tag 'function': %+v", msg)
return
}
switch function {
case "start_job":
api.handleStartJob(msg.GetEventValue())
case "stop_job":
api.handleStopJob(msg.GetEventValue())
default:
cclog.Warnf("Unimplemented job event: %+v", msg)
}
}
func (api *NatsAPI) handleJobEvent(subject string, data []byte) {
d := influx.NewDecoderWithBytes(data)
for d.Next() {
m, err := nats.DecodeInfluxMessage(d)
if err != nil {
cclog.Errorf("NATS %s: Failed to decode message: %v", subject, err)
return
}
if m.IsEvent() {
if m.Name() == "job" {
api.processJobEvent(m)
}
}
}
}
// handleStartJob processes job start messages received via NATS.
// Expected JSON payload follows the schema.Job structure.
func (api *NatsAPI) handleStartJob(subject string, data []byte) {
func (api *NatsAPI) handleStartJob(payload string) {
req := schema.Job{
Shared: "none",
MonitoringStatus: schema.MonitoringStatusRunningOrArchiving,
}
dec := json.NewDecoder(bytes.NewReader(data))
dec := json.NewDecoder(strings.NewReader(payload))
dec.DisallowUnknownFields()
if err := dec.Decode(&req); err != nil {
cclog.Errorf("NATS %s: parsing request failed: %v", subject, err)
cclog.Errorf("NATS start job: parsing request failed: %v", err)
return
}
cclog.Debugf("NATS %s: %s", subject, req.GoString())
cclog.Debugf("NATS start job: %s", req.GoString())
req.State = schema.JobStateRunning
if err := importer.SanityChecks(&req); err != nil {
cclog.Errorf("NATS %s: sanity check failed: %v", subject, err)
cclog.Errorf("NATS start job: sanity check failed: %v", err)
return
}
@@ -96,14 +132,14 @@ func (api *NatsAPI) handleStartJob(subject string, data []byte) {
jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil)
if err != nil && err != sql.ErrNoRows {
cclog.Errorf("NATS %s: checking for duplicate failed: %v", subject, err)
cclog.Errorf("NATS start job: checking for duplicate failed: %v", err)
return
}
if err == nil {
for _, job := range jobs {
if (req.StartTime - job.StartTime) < secondsPerDay {
cclog.Errorf("NATS %s: job with jobId %d, cluster %s already exists (dbid: %d)",
subject, req.JobID, req.Cluster, job.ID)
cclog.Errorf("NATS start job: job with jobId %d, cluster %s already exists (dbid: %d)",
req.JobID, req.Cluster, job.ID)
return
}
}
@@ -111,14 +147,14 @@ func (api *NatsAPI) handleStartJob(subject string, data []byte) {
id, err := api.JobRepository.Start(&req)
if err != nil {
cclog.Errorf("NATS %s: insert into database failed: %v", subject, err)
cclog.Errorf("NATS start job: insert into database failed: %v", err)
return
}
unlockOnce.Do(api.RepositoryMutex.Unlock)
for _, tag := range req.Tags {
if _, err := api.JobRepository.AddTagOrCreate(nil, id, tag.Type, tag.Name, tag.Scope); err != nil {
cclog.Errorf("NATS %s: adding tag to new job %d failed: %v", subject, id, err)
cclog.Errorf("NATS start job: adding tag to new job %d failed: %v", id, err)
return
}
}
@@ -129,18 +165,18 @@ func (api *NatsAPI) handleStartJob(subject string, data []byte) {
// handleStopJob processes job stop messages received via NATS.
// Expected JSON payload follows the StopJobAPIRequest structure.
func (api *NatsAPI) handleStopJob(subject string, data []byte) {
func (api *NatsAPI) handleStopJob(payload string) {
var req StopJobAPIRequest
dec := json.NewDecoder(bytes.NewReader(data))
dec := json.NewDecoder(strings.NewReader(payload))
dec.DisallowUnknownFields()
if err := dec.Decode(&req); err != nil {
cclog.Errorf("NATS %s: parsing request failed: %v", subject, err)
cclog.Errorf("NATS job stop: parsing request failed: %v", err)
return
}
if req.JobID == nil {
cclog.Errorf("NATS %s: the field 'jobId' is required", subject)
cclog.Errorf("NATS job stop: the field 'jobId' is required")
return
}
@@ -148,28 +184,28 @@ func (api *NatsAPI) handleStopJob(subject string, data []byte) {
if err != nil {
cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime)
if cachedErr != nil {
cclog.Errorf("NATS %s: finding job failed: %v (cached lookup also failed: %v)",
subject, err, cachedErr)
cclog.Errorf("NATS job stop: finding job failed: %v (cached lookup also failed: %v)",
err, cachedErr)
return
}
job = cachedJob
}
if job.State != schema.JobStateRunning {
cclog.Errorf("NATS %s: jobId %d (id %d) on %s: job has already been stopped (state is: %s)",
subject, job.JobID, job.ID, job.Cluster, job.State)
cclog.Errorf("NATS job stop: jobId %d (id %d) on %s: job has already been stopped (state is: %s)",
job.JobID, job.ID, job.Cluster, job.State)
return
}
if job.StartTime > req.StopTime {
cclog.Errorf("NATS %s: jobId %d (id %d) on %s: stopTime %d must be >= startTime %d",
subject, job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime)
cclog.Errorf("NATS job stop: jobId %d (id %d) on %s: stopTime %d must be >= startTime %d",
job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime)
return
}
if req.State != "" && !req.State.Valid() {
cclog.Errorf("NATS %s: jobId %d (id %d) on %s: invalid job state: %#v",
subject, job.JobID, job.ID, job.Cluster, req.State)
cclog.Errorf("NATS job stop: jobId %d (id %d) on %s: invalid job state: %#v",
job.JobID, job.ID, job.Cluster, req.State)
return
} else if req.State == "" {
req.State = schema.JobStateCompleted
@@ -182,8 +218,8 @@ func (api *NatsAPI) handleStopJob(subject string, data []byte) {
if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
cclog.Errorf("NATS %s: jobId %d (id %d) on %s: marking job as '%s' failed: %v",
subject, job.JobID, job.ID, job.Cluster, job.State, err)
cclog.Errorf("NATS job stop: jobId %d (id %d) on %s: marking job as '%s' failed: %v",
job.JobID, job.ID, job.Cluster, job.State, err)
return
}
}