Add and adapt api handlers for nats

This commit is contained in:
Christoph Kluge 2024-08-12 09:03:53 +02:00
parent f4a414fa6e
commit d729fdfec1
2 changed files with 165 additions and 53 deletions

View File

@ -136,6 +136,7 @@ func InitDB() error {
// This function also sets the subcluster if necessary! // This function also sets the subcluster if necessary!
func SanityChecks(job *schema.BaseJob) error { func SanityChecks(job *schema.BaseJob) error {
// Missing check on Slurm JobID?
if c := archive.GetCluster(job.Cluster); c == nil { if c := archive.GetCluster(job.Cluster); c == nil {
return fmt.Errorf("no such cluster: %v", job.Cluster) return fmt.Errorf("no such cluster: %v", job.Cluster)
} }

View File

@ -5,22 +5,26 @@
package natsMessenger package natsMessenger
import ( import (
"database/sql"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/importer"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
// JobRepository *repository.JobRepository
// Authentication *auth.Authentication // Authentication *auth.Authentication
type NatsMessenger struct { type NatsMessenger struct {
Server *server.Server Server *server.Server
Connection *nats.Conn Connection *nats.Conn
Subscriptions []*nats.Subscription Subscriptions []*nats.Subscription
JobRepository *repository.JobRepository
} }
func New(config *schema.NatsConfig) (nm *NatsMessenger, err error) { func New(config *schema.NatsConfig) (nm *NatsMessenger, err error) {
@ -31,16 +35,14 @@ type DevNatsMessage struct {
Content string `json:"content"` Content string `json:"content"`
} }
// StartJobNatsMessage model // StartJobNatsResponse model
type StartJobNatsMessage struct { type StartJobNatsResponse struct {
schema.BaseJob // Database ID of new job
ID *int64 `json:"id,omitempty"` DBID int64 `json:"id"`
Statistics map[string]schema.JobStatistics `json:"statistics"`
StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"`
} }
// StopJobNatsMessage model // StopJobNatsRequest model
type StopJobNatsMessage struct { type StopJobNatsRequest struct {
JobId *int64 `json:"jobId" example:"123000"` JobId *int64 `json:"jobId" example:"123000"`
Cluster *string `json:"cluster" example:"fritz"` Cluster *string `json:"cluster" example:"fritz"`
StartTime *int64 `json:"startTime" example:"1649723812"` StartTime *int64 `json:"startTime" example:"1649723812"`
@ -48,15 +50,22 @@ type StopJobNatsMessage struct {
StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"` StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"`
} }
// DeleteJobNatsMessage model // DeleteJobNatsRequest model
type DeleteJobNatsMessage struct { type DeleteJobNatsRequest struct {
JobId *int64 `json:"jobId" validate:"required" example:"123000"` // Cluster Job ID of job JobId *int64 `json:"jobId" validate:"required" example:"123000"` // Cluster Job ID of job
Cluster *string `json:"cluster" example:"fritz"` // Cluster of job Cluster *string `json:"cluster" example:"fritz"` // Cluster of job
StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch
} }
// jobEventNatsMessage model // jobEventNatsRequest model
type ReceiveEventNatsMessage struct { type ReceiveEventNatsRequest struct {
JobId *int64 `json:"jobId" validate:"required" example:"123000"` // Cluster Job ID of job
Cluster *string `json:"cluster" example:"fritz"` // Cluster of job
StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch
Metric *string `json:"metric" example:"cpu_power"` // Event Target Metric for Job
Timestamp *int64 `json:"timestamp" example:"1649724000"` // Event Timestamp
Event *string `json:"event" example:"powercap"` // Event Name / Type
Value *int64 `json:"value,omitempty" example:"150"` // Optional Value Set for Evenr, eg powercap
} }
// Check auth and setup listeners to channels // Check auth and setup listeners to channels
@ -74,6 +83,7 @@ func SetupNatsMessenger(config *schema.NatsConfig) (nm *NatsMessenger, err error
Server: nil, Server: nil,
Connection: nil, Connection: nil,
Subscriptions: []*nats.Subscription{}, Subscriptions: []*nats.Subscription{},
JobRepository: repository.GetJobRepository(),
} }
// Start Nats Server // Start Nats Server
@ -103,9 +113,10 @@ func SetupNatsMessenger(config *schema.NatsConfig) (nm *NatsMessenger, err error
} }
// Subscribe // Subscribe
nmr.Subscriptions, err = setupSubscriptions(nmr.Connection) if err = nmr.setupSubscriptions(); err != nil {
if err != nil { log.Error("error when subscribing to channels: nats shut down")
log.Error("error when subscribing to channels") nmr.Connection.Close()
nmr.Server.Shutdown()
return nil, err return nil, err
} }
@ -126,88 +137,85 @@ func (nm *NatsMessenger) StopNatsMessenger() {
log.Info("NATS connections closed and server shut down") log.Info("NATS connections closed and server shut down")
} }
func setupSubscriptions(conn *nats.Conn) (subs []*nats.Subscription, err error) { func (nm *NatsMessenger) setupSubscriptions() (err error) {
if startSub, err := startJobListener(conn); err != nil { if startSub, err := nm.startJobListener(); err != nil {
log.Infof("Subscription to 'start-job' failed: %s", err) log.Infof("Subscription to 'start-job' failed: %s", err)
} else { } else {
log.Info("Subscribed to 'start-job'") log.Info("Subscribed to 'start-job'")
subs = append(subs, startSub) nm.Subscriptions = append(nm.Subscriptions, startSub)
} }
if stopSub, err := stopJobListener(conn); err != nil { if stopSub, err := nm.stopJobListener(); err != nil {
log.Infof("Subscription to 'stop-job' failed: %s", err) log.Infof("Subscription to 'stop-job' failed: %s", err)
} else { } else {
log.Info("Subscribed to 'stop-job'") log.Info("Subscribed to 'stop-job'")
subs = append(subs, stopSub) nm.Subscriptions = append(nm.Subscriptions, stopSub)
} }
if deleteSub, err := deleteJobListener(conn); err == nil { if deleteSub, err := nm.deleteJobListener(); err != nil {
log.Infof("Subscription to 'delete-job' failed: %s", err) log.Infof("Subscription to 'delete-job' failed: %s", err)
} else { } else {
log.Info("Subscribed to 'delete-job'") log.Info("Subscribed to 'delete-job'")
subs = append(subs, deleteSub) nm.Subscriptions = append(nm.Subscriptions, deleteSub)
} }
if eventSub, err := jobEventListener(conn); err != nil { if eventSub, err := nm.jobEventListener(); err != nil {
log.Infof("Subscription to 'job-event' failed: %s", err) log.Infof("Subscription to 'job-event' failed: %s", err)
} else { } else {
log.Info("Subscribed to 'job-event'") log.Info("Subscribed to 'job-event'")
subs = append(subs, eventSub) nm.Subscriptions = append(nm.Subscriptions, eventSub)
} }
return subs, err return err
} }
// Listeners: Subscribe to specified channels and handle with specific handler functions // Listeners: Subscribe to specified channels and handle with specific handler functions
func startJobListener(conn *nats.Conn) (sub *nats.Subscription, err error) { func (nm *NatsMessenger) startJobListener() (sub *nats.Subscription, err error) {
return conn.Subscribe("start-job", func(m *nats.Msg) { return nm.Connection.Subscribe("start-job", func(m *nats.Msg) {
var req DevNatsMessage req := schema.JobMeta{BaseJob: schema.JobDefaults}
if err := json.Unmarshal(m.Data, &req); err != nil { if err := json.Unmarshal(m.Data, &req); err != nil {
log.Error("Error while unmarshaling raw json nats message content: startJob") log.Warnf("Error while unmarshaling raw json nats message content on channel start-job: %s", err.Error())
m.Respond([]byte("Error while unmarshaling raw json nats message content on channel start-job: " + err.Error()))
} }
if err := startJobHandler(req); err != nil { m.Respond(nm.startJobHandler(req))
log.Errorf("error: %s", err.Error())
}
}) })
} }
func stopJobListener(conn *nats.Conn) (sub *nats.Subscription, err error) { func (nm *NatsMessenger) stopJobListener() (sub *nats.Subscription, err error) {
return conn.Subscribe("stop-job", func(m *nats.Msg) { return nm.Connection.Subscribe("stop-job", func(m *nats.Msg) {
var req DevNatsMessage var req StopJobNatsRequest
if err := json.Unmarshal(m.Data, &req); err != nil { if err := json.Unmarshal(m.Data, &req); err != nil {
log.Error("Error while unmarshaling raw json nats message content: stopJob") log.Error("Error while unmarshaling raw json nats message content: stopJob")
} }
if err := stopJobHandler(req); err != nil { m.Respond(nm.stopJobHandler(req))
log.Errorf("error: %s", err.Error())
}
}) })
} }
func deleteJobListener(conn *nats.Conn) (sub *nats.Subscription, err error) { func (nm *NatsMessenger) deleteJobListener() (sub *nats.Subscription, err error) {
return conn.Subscribe("delete-job", func(m *nats.Msg) { return nm.Connection.Subscribe("delete-job", func(m *nats.Msg) {
var req DevNatsMessage var req DevNatsMessage
if err := json.Unmarshal(m.Data, &req); err != nil { if err := json.Unmarshal(m.Data, &req); err != nil {
log.Error("Error while unmarshaling raw json nats message content: deleteJob") log.Error("Error while unmarshaling raw json nats message content: deleteJob")
} }
if err := deleteJobHandler(req); err != nil { if err := nm.deleteJobHandler(req); err != nil {
log.Errorf("error: %s", err.Error()) log.Errorf("error: %s", err.Error())
} }
}) })
} }
func jobEventListener(conn *nats.Conn) (sub *nats.Subscription, err error) { func (nm *NatsMessenger) jobEventListener() (sub *nats.Subscription, err error) {
return conn.Subscribe("job-event", func(m *nats.Msg) { return nm.Connection.Subscribe("job-event", func(m *nats.Msg) {
var req DevNatsMessage var req DevNatsMessage
if err := json.Unmarshal(m.Data, &req); err != nil { if err := json.Unmarshal(m.Data, &req); err != nil {
log.Error("Error while unmarshaling raw json nats message content: jobEvent") log.Error("Error while unmarshaling raw json nats message content: jobEvent")
} }
if err := jobEventHandler(req); err != nil { if err := nm.jobEventHandler(req); err != nil {
log.Errorf("error: %s", err.Error()) log.Errorf("error: %s", err.Error())
} }
}) })
@ -215,22 +223,125 @@ func jobEventListener(conn *nats.Conn) (sub *nats.Subscription, err error) {
// Handlers: Take content of message and perform action, e.g. adding job in db // Handlers: Take content of message and perform action, e.g. adding job in db
func startJobHandler(req DevNatsMessage) (err error) { func (nm *NatsMessenger) startJobHandler(req schema.JobMeta) []byte {
log.Debugf("CALLED HANDLER FOR startJob: %s", req.Content) if req.State == "" {
return nil req.State = schema.JobStateRunning
}
if err := importer.SanityChecks(&req.BaseJob); err != nil {
log.Error(err)
return handleErr(err)
} }
func stopJobHandler(req DevNatsMessage) (err error) { // // aquire lock to avoid race condition between API calls --> for NATS required?
log.Debugf("CALLED HANDLER FOR stopJob: %s", req.Content) // var unlockOnce sync.Once
return nil // api.RepositoryMutex.Lock()
// defer unlockOnce.Do(api.RepositoryMutex.Unlock)
// Check if combination of (job_id, cluster_id, start_time) already exists:
jobs, err := nm.JobRepository.FindAll(&req.JobID, &req.Cluster, nil)
if err != nil && err != sql.ErrNoRows {
log.Errorf("checking for duplicate failed: %s", err)
return handleErr(fmt.Errorf("checking for duplicate failed: %w", err))
} else if err == nil {
for _, job := range jobs {
if (req.StartTime - job.StartTimeUnix) < 86400 {
log.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID)
return handleErr(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID))
}
}
} }
func deleteJobHandler(req DevNatsMessage) (err error) { // id, err := nm.JobRepository.Start(&req)
// if err != nil {
// log.Errorf("insert into database failed: %s", err)
// return handleErr(fmt.Errorf("insert into database failed: %w", err))
// }
// // unlock here, adding Tags can be async
// unlockOnce.Do(api.RepositoryMutex.Unlock)
for _, tag := range req.Tags {
if _, err := nm.JobRepository.AddTagOrCreate(1337, tag.Type, tag.Name); err != nil {
log.Errorf("adding tag to new job %d failed: %s", 1337, err)
return handleErr(fmt.Errorf("adding tag to new job %d failed: %w", 1337, err))
}
}
log.Infof("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", 1337, req.Cluster, req.JobID, req.User, req.StartTime)
result, _ := json.Marshal(StartJobNatsResponse{
DBID: 1337,
})
return result
}
func (nm *NatsMessenger) stopJobHandler(req StopJobNatsRequest) []byte {
// Fetch job (that will be stopped) from db
var job *schema.Job
var err error
if req.JobId == nil {
return handleErr(errors.New("the field 'jobId' is required"))
}
job, err = nm.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
if err != nil {
return handleErr(fmt.Errorf("finding job failed: %w", err))
}
// Sanity checks
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
return handleErr(errors.New("stopTime must be larger than startTime and only running jobs can be stopped"))
}
if req.State != "" && !req.State.Valid() {
return handleErr(fmt.Errorf("invalid job state: %#v", req.State))
} else if req.State == "" {
req.State = schema.JobStateCompleted
}
// Mark job as stopped in the database (update state and duration)
job.Duration = int32(req.StopTime - job.StartTime.Unix())
job.State = req.State
// if err := nm.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
// return handleErr(fmt.Errorf("marking job as stopped failed: %w", err))
// }
log.Infof("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime)
// // Send a response (with status OK). This means that erros that happen from here on forward
// // can *NOT* be communicated to the client. If reading from a MetricDataRepository or
// // writing to the filesystem fails, the client will not know.
// rw.Header().Add("Content-Type", "application/json")
// rw.WriteHeader(http.StatusOK)
// json.NewEncoder(rw).Encode(job)
// Monitoring is disabled...
if job.MonitoringStatus == schema.MonitoringStatusDisabled {
return handleErr(fmt.Errorf("monitoring is disabled"))
}
// Trigger async archiving
// nm.JobRepository.TriggerArchiving(job)
result, _ := json.Marshal(job)
return result
}
func (nm *NatsMessenger) deleteJobHandler(req DevNatsMessage) (err error) {
// Allow via Nats?
log.Debugf("CALLED HANDLER FOR deleteJob: %s", req.Content) log.Debugf("CALLED HANDLER FOR deleteJob: %s", req.Content)
return nil return nil
} }
func jobEventHandler(req DevNatsMessage) (err error) { func (nm *NatsMessenger) jobEventHandler(req DevNatsMessage) (err error) {
// Implement from scratch
log.Debugf("CALLED HANDLER FOR jobEvent: %s", req.Content) log.Debugf("CALLED HANDLER FOR jobEvent: %s", req.Content)
return nil return nil
} }
// Helper
func handleErr(err error) []byte {
res, _ := json.Marshal(err.Error())
return res
}