From c1135531ba26d3267791113b77c0f4bcc4f71234 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 23 Dec 2025 07:56:13 +0100 Subject: [PATCH] Port NATS api to ccMessages --- go.mod | 5 +++ go.sum | 4 ++ internal/api/nats.go | 94 +++++++++++++++++++++++++++------------ internal/config/config.go | 3 +- pkg/nats/influxDecoder.go | 59 ++++++++++++++++++++++++ 5 files changed, 134 insertions(+), 31 deletions(-) create mode 100644 pkg/nats/influxDecoder.go diff --git a/go.mod b/go.mod index eb061de..b821f7b 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,7 @@ require ( github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect github.com/KyleBanks/depth v1.2.1 // indirect github.com/agnivade/levenshtein v1.2.1 // indirect + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.13 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 // indirect @@ -89,6 +90,8 @@ require ( github.com/gorilla/securecookie v1.1.2 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect + github.com/influxdata/influxdb-client-go/v2 v2.14.0 // indirect + github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf // indirect github.com/jonboulle/clockwork v0.5.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -101,6 +104,7 @@ require ( github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/nats-io/nkeys v0.4.11 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/oapi-codegen/runtime v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/procfs v0.16.1 // indirect @@ -114,6 +118,7 @@ require ( github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect golang.org/x/mod v0.30.0 // indirect golang.org/x/net v0.47.0 // indirect golang.org/x/sync v0.18.0 // indirect diff --git a/go.sum b/go.sum index fd4980d..04e2514 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,7 @@ github.com/NVIDIA/go-nvml v0.13.0-1 h1:OLX8Jq3dONuPOQPC7rndB6+iDmDakw0XTYgzMxObk github.com/NVIDIA/go-nvml v0.13.0-1/go.mod h1:+KNA7c7gIBH7SKSJ1ntlwkfN80zdx8ovl4hrK3LmPt4= github.com/PuerkitoBio/goquery v1.11.0 h1:jZ7pwMQXIITcUXNH83LLk+txlaEy6NVOfTuP43xxfqw= github.com/PuerkitoBio/goquery v1.11.0/go.mod h1:wQHgxUOU3JGuj3oD/QFfxUdlzW6xPHfqyHre6VMY4DQ= +github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/agnivade/levenshtein v1.2.1 h1:EHBY3UOn1gwdy/VbFwgo4cxecRznFk7fKWN1KOX7eoM= github.com/agnivade/levenshtein v1.2.1/go.mod h1:QVVI16kDrtSuwcpd0p1+xMC6Z/VfhtCyDIjcwga4/DU= github.com/alexbrainman/sspi v0.0.0-20250919150558-7d374ff0d59e h1:4dAU9FXIyQktpoUAgOJK3OTFc/xug0PCXYCqU0FgDKI= @@ -64,6 +65,7 @@ github.com/aws/smithy-go v1.24.0 h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk= github.com/aws/smithy-go v1.24.0/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/coreos/go-oidc/v3 v3.16.0 h1:qRQUCFstKpXwmEjDQTIbyY/5jF00+asXzSkmkoa/mow= @@ -194,6 +196,7 @@ github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2E github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -260,6 +263,7 @@ github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= github.com/sosodev/duration v1.3.1 h1:qtHBDMQ6lvMQsL15g4aopM4HEfOaYuhWBw3NPTtlqq4= github.com/sosodev/duration v1.3.1/go.mod h1:RQIBBX0+fMLc/D9+Jb/fwvVmo0eZvDDEERAikUR6SDg= +github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= diff --git a/internal/api/nats.go b/internal/api/nats.go index 1bfe905..745e7ac 100644 --- a/internal/api/nats.go +++ b/internal/api/nats.go @@ -9,6 +9,7 @@ import ( "bytes" "database/sql" "encoding/json" + "strings" "sync" "time" @@ -18,7 +19,9 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/nats" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" + lp "github.com/ClusterCockpit/cc-lib/ccMessage" "github.com/ClusterCockpit/cc-lib/schema" + influx "github.com/influxdata/line-protocol/v2/lineprotocol" ) // NatsAPI provides NATS subscription-based handlers for Job and Node operations. @@ -50,11 +53,7 @@ func (api *NatsAPI) StartSubscriptions() error { s := config.Keys.APISubjects - if err := client.Subscribe(s.SubjectJobStart, api.handleStartJob); err != nil { - return err - } - - if err := client.Subscribe(s.SubjectJobStop, api.handleStopJob); err != nil { + if err := client.Subscribe(s.SubjectJobEvent, api.handleJobEvent); err != nil { return err } @@ -67,26 +66,63 @@ func (api *NatsAPI) StartSubscriptions() error { return nil } +func (api *NatsAPI) processJobEvent(msg lp.CCMessage) { + function, ok := msg.GetTag("function") + if !ok { + cclog.Errorf("Job event is missing tag 'function': %+v", msg) + return + } + + switch function { + case "start_job": + api.handleStartJob(msg.GetEventValue()) + + case "stop_job": + api.handleStopJob(msg.GetEventValue()) + default: + cclog.Warnf("Unimplemented job event: %+v", msg) + } +} + +func (api *NatsAPI) handleJobEvent(subject string, data []byte) { + d := influx.NewDecoderWithBytes(data) + + for d.Next() { + m, err := nats.DecodeInfluxMessage(d) + if err != nil { + cclog.Errorf("NATS %s: Failed to decode message: %v", subject, err) + return + } + + if m.IsEvent() { + if m.Name() == "job" { + api.processJobEvent(m) + } + } + + } +} + // handleStartJob processes job start messages received via NATS. // Expected JSON payload follows the schema.Job structure. -func (api *NatsAPI) handleStartJob(subject string, data []byte) { +func (api *NatsAPI) handleStartJob(payload string) { req := schema.Job{ Shared: "none", MonitoringStatus: schema.MonitoringStatusRunningOrArchiving, } - dec := json.NewDecoder(bytes.NewReader(data)) + dec := json.NewDecoder(strings.NewReader(payload)) dec.DisallowUnknownFields() if err := dec.Decode(&req); err != nil { - cclog.Errorf("NATS %s: parsing request failed: %v", subject, err) + cclog.Errorf("NATS start job: parsing request failed: %v", err) return } - cclog.Debugf("NATS %s: %s", subject, req.GoString()) + cclog.Debugf("NATS start job: %s", req.GoString()) req.State = schema.JobStateRunning if err := importer.SanityChecks(&req); err != nil { - cclog.Errorf("NATS %s: sanity check failed: %v", subject, err) + cclog.Errorf("NATS start job: sanity check failed: %v", err) return } @@ -96,14 +132,14 @@ func (api *NatsAPI) handleStartJob(subject string, data []byte) { jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil) if err != nil && err != sql.ErrNoRows { - cclog.Errorf("NATS %s: checking for duplicate failed: %v", subject, err) + cclog.Errorf("NATS start job: checking for duplicate failed: %v", err) return } if err == nil { for _, job := range jobs { if (req.StartTime - job.StartTime) < secondsPerDay { - cclog.Errorf("NATS %s: job with jobId %d, cluster %s already exists (dbid: %d)", - subject, req.JobID, req.Cluster, job.ID) + cclog.Errorf("NATS start job: job with jobId %d, cluster %s already exists (dbid: %d)", + req.JobID, req.Cluster, job.ID) return } } @@ -111,14 +147,14 @@ func (api *NatsAPI) handleStartJob(subject string, data []byte) { id, err := api.JobRepository.Start(&req) if err != nil { - cclog.Errorf("NATS %s: insert into database failed: %v", subject, err) + cclog.Errorf("NATS start job: insert into database failed: %v", err) return } unlockOnce.Do(api.RepositoryMutex.Unlock) for _, tag := range req.Tags { if _, err := api.JobRepository.AddTagOrCreate(nil, id, tag.Type, tag.Name, tag.Scope); err != nil { - cclog.Errorf("NATS %s: adding tag to new job %d failed: %v", subject, id, err) + cclog.Errorf("NATS start job: adding tag to new job %d failed: %v", id, err) return } } @@ -129,18 +165,18 @@ func (api *NatsAPI) handleStartJob(subject string, data []byte) { // handleStopJob processes job stop messages received via NATS. // Expected JSON payload follows the StopJobAPIRequest structure. -func (api *NatsAPI) handleStopJob(subject string, data []byte) { +func (api *NatsAPI) handleStopJob(payload string) { var req StopJobAPIRequest - dec := json.NewDecoder(bytes.NewReader(data)) + dec := json.NewDecoder(strings.NewReader(payload)) dec.DisallowUnknownFields() if err := dec.Decode(&req); err != nil { - cclog.Errorf("NATS %s: parsing request failed: %v", subject, err) + cclog.Errorf("NATS job stop: parsing request failed: %v", err) return } if req.JobID == nil { - cclog.Errorf("NATS %s: the field 'jobId' is required", subject) + cclog.Errorf("NATS job stop: the field 'jobId' is required") return } @@ -148,28 +184,28 @@ func (api *NatsAPI) handleStopJob(subject string, data []byte) { if err != nil { cachedJob, cachedErr := api.JobRepository.FindCached(req.JobID, req.Cluster, req.StartTime) if cachedErr != nil { - cclog.Errorf("NATS %s: finding job failed: %v (cached lookup also failed: %v)", - subject, err, cachedErr) + cclog.Errorf("NATS job stop: finding job failed: %v (cached lookup also failed: %v)", + err, cachedErr) return } job = cachedJob } if job.State != schema.JobStateRunning { - cclog.Errorf("NATS %s: jobId %d (id %d) on %s: job has already been stopped (state is: %s)", - subject, job.JobID, job.ID, job.Cluster, job.State) + cclog.Errorf("NATS job stop: jobId %d (id %d) on %s: job has already been stopped (state is: %s)", + job.JobID, job.ID, job.Cluster, job.State) return } if job.StartTime > req.StopTime { - cclog.Errorf("NATS %s: jobId %d (id %d) on %s: stopTime %d must be >= startTime %d", - subject, job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime) + cclog.Errorf("NATS job stop: jobId %d (id %d) on %s: stopTime %d must be >= startTime %d", + job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime) return } if req.State != "" && !req.State.Valid() { - cclog.Errorf("NATS %s: jobId %d (id %d) on %s: invalid job state: %#v", - subject, job.JobID, job.ID, job.Cluster, req.State) + cclog.Errorf("NATS job stop: jobId %d (id %d) on %s: invalid job state: %#v", + job.JobID, job.ID, job.Cluster, req.State) return } else if req.State == "" { req.State = schema.JobStateCompleted @@ -182,8 +218,8 @@ func (api *NatsAPI) handleStopJob(subject string, data []byte) { if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { - cclog.Errorf("NATS %s: jobId %d (id %d) on %s: marking job as '%s' failed: %v", - subject, job.JobID, job.ID, job.Cluster, job.State, err) + cclog.Errorf("NATS job stop: jobId %d (id %d) on %s: marking job as '%s' failed: %v", + job.JobID, job.ID, job.Cluster, job.State, err) return } } diff --git a/internal/config/config.go b/internal/config/config.go index b7b8ed0..3c88bcf 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -90,8 +90,7 @@ type ResampleConfig struct { } type NATSConfig struct { - SubjectJobStart string `json:"subjectJobStart"` - SubjectJobStop string `json:"subjectJobStop"` + SubjectJobEvent string `json:"subjectJobEvent"` SubjectNodeState string `json:"subjectNodeState"` } diff --git a/pkg/nats/influxDecoder.go b/pkg/nats/influxDecoder.go new file mode 100644 index 0000000..412f85e --- /dev/null +++ b/pkg/nats/influxDecoder.go @@ -0,0 +1,59 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package nats + +import ( + "time" + + lp "github.com/ClusterCockpit/cc-lib/ccMessage" + influx "github.com/influxdata/line-protocol/v2/lineprotocol" +) + +// DecodeInfluxMessage decodes a single InfluxDB line protocol message from the decoder +// Returns the decoded CCMessage or an error if decoding fails +func DecodeInfluxMessage(d *influx.Decoder) (lp.CCMessage, error) { + measurement, err := d.Measurement() + if err != nil { + return nil, err + } + + tags := make(map[string]string) + for { + key, value, err := d.NextTag() + if err != nil { + return nil, err + } + if key == nil { + break + } + tags[string(key)] = string(value) + } + + fields := make(map[string]interface{}) + for { + key, value, err := d.NextField() + if err != nil { + return nil, err + } + if key == nil { + break + } + fields[string(key)] = value.Interface() + } + + t, err := d.Time(influx.Nanosecond, time.Time{}) + if err != nil { + return nil, err + } + + return lp.NewMessage( + string(measurement), + tags, + nil, + fields, + t, + ) +}