diff --git a/internal/importer/initDB.go b/internal/importer/initDB.go index 4b9abab..48f05af 100644 --- a/internal/importer/initDB.go +++ b/internal/importer/initDB.go @@ -136,6 +136,7 @@ func InitDB() error { // This function also sets the subcluster if necessary! func SanityChecks(job *schema.BaseJob) error { + // Missing check on Slurm JobID? if c := archive.GetCluster(job.Cluster); c == nil { return fmt.Errorf("no such cluster: %v", job.Cluster) } diff --git a/internal/natsMessenger/natsMessenger.go b/internal/natsMessenger/natsMessenger.go index 24fc4b1..07e359b 100644 --- a/internal/natsMessenger/natsMessenger.go +++ b/internal/natsMessenger/natsMessenger.go @@ -5,22 +5,26 @@ package natsMessenger import ( + "database/sql" "encoding/json" + "errors" "fmt" "time" + "github.com/ClusterCockpit/cc-backend/internal/importer" + "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" ) -// JobRepository *repository.JobRepository // Authentication *auth.Authentication type NatsMessenger struct { Server *server.Server Connection *nats.Conn Subscriptions []*nats.Subscription + JobRepository *repository.JobRepository } func New(config *schema.NatsConfig) (nm *NatsMessenger, err error) { @@ -31,16 +35,14 @@ type DevNatsMessage struct { Content string `json:"content"` } -// StartJobNatsMessage model -type StartJobNatsMessage struct { - schema.BaseJob - ID *int64 `json:"id,omitempty"` - Statistics map[string]schema.JobStatistics `json:"statistics"` - StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` +// StartJobNatsResponse model +type StartJobNatsResponse struct { + // Database ID of new job + DBID int64 `json:"id"` } -// StopJobNatsMessage model -type StopJobNatsMessage struct { +// StopJobNatsRequest model +type StopJobNatsRequest struct { JobId *int64 `json:"jobId" example:"123000"` Cluster *string `json:"cluster" example:"fritz"` StartTime *int64 `json:"startTime" example:"1649723812"` @@ -48,15 +50,22 @@ type StopJobNatsMessage struct { StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"` } -// DeleteJobNatsMessage model -type DeleteJobNatsMessage struct { +// DeleteJobNatsRequest model +type DeleteJobNatsRequest struct { JobId *int64 `json:"jobId" validate:"required" example:"123000"` // Cluster Job ID of job Cluster *string `json:"cluster" example:"fritz"` // Cluster of job StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch } -// jobEventNatsMessage model -type ReceiveEventNatsMessage struct { +// jobEventNatsRequest model +type ReceiveEventNatsRequest struct { + JobId *int64 `json:"jobId" validate:"required" example:"123000"` // Cluster Job ID of job + Cluster *string `json:"cluster" example:"fritz"` // Cluster of job + StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch + Metric *string `json:"metric" example:"cpu_power"` // Event Target Metric for Job + Timestamp *int64 `json:"timestamp" example:"1649724000"` // Event Timestamp + Event *string `json:"event" example:"powercap"` // Event Name / Type + Value *int64 `json:"value,omitempty" example:"150"` // Optional Value Set for Evenr, eg powercap } // Check auth and setup listeners to channels @@ -74,6 +83,7 @@ func SetupNatsMessenger(config *schema.NatsConfig) (nm *NatsMessenger, err error Server: nil, Connection: nil, Subscriptions: []*nats.Subscription{}, + JobRepository: repository.GetJobRepository(), } // Start Nats Server @@ -103,9 +113,10 @@ func SetupNatsMessenger(config *schema.NatsConfig) (nm *NatsMessenger, err error } // Subscribe - nmr.Subscriptions, err = setupSubscriptions(nmr.Connection) - if err != nil { - log.Error("error when subscribing to channels") + if err = nmr.setupSubscriptions(); err != nil { + log.Error("error when subscribing to channels: nats shut down") + nmr.Connection.Close() + nmr.Server.Shutdown() return nil, err } @@ -126,88 +137,85 @@ func (nm *NatsMessenger) StopNatsMessenger() { log.Info("NATS connections closed and server shut down") } -func setupSubscriptions(conn *nats.Conn) (subs []*nats.Subscription, err error) { +func (nm *NatsMessenger) setupSubscriptions() (err error) { - if startSub, err := startJobListener(conn); err != nil { + if startSub, err := nm.startJobListener(); err != nil { log.Infof("Subscription to 'start-job' failed: %s", err) } else { log.Info("Subscribed to 'start-job'") - subs = append(subs, startSub) + nm.Subscriptions = append(nm.Subscriptions, startSub) } - if stopSub, err := stopJobListener(conn); err != nil { + if stopSub, err := nm.stopJobListener(); err != nil { log.Infof("Subscription to 'stop-job' failed: %s", err) } else { log.Info("Subscribed to 'stop-job'") - subs = append(subs, stopSub) + nm.Subscriptions = append(nm.Subscriptions, stopSub) } - if deleteSub, err := deleteJobListener(conn); err == nil { + if deleteSub, err := nm.deleteJobListener(); err != nil { log.Infof("Subscription to 'delete-job' failed: %s", err) } else { log.Info("Subscribed to 'delete-job'") - subs = append(subs, deleteSub) + nm.Subscriptions = append(nm.Subscriptions, deleteSub) } - if eventSub, err := jobEventListener(conn); err != nil { + if eventSub, err := nm.jobEventListener(); err != nil { log.Infof("Subscription to 'job-event' failed: %s", err) } else { log.Info("Subscribed to 'job-event'") - subs = append(subs, eventSub) + nm.Subscriptions = append(nm.Subscriptions, eventSub) } - return subs, err + return err } // Listeners: Subscribe to specified channels and handle with specific handler functions -func startJobListener(conn *nats.Conn) (sub *nats.Subscription, err error) { - return conn.Subscribe("start-job", func(m *nats.Msg) { - var req DevNatsMessage +func (nm *NatsMessenger) startJobListener() (sub *nats.Subscription, err error) { + return nm.Connection.Subscribe("start-job", func(m *nats.Msg) { + req := schema.JobMeta{BaseJob: schema.JobDefaults} if err := json.Unmarshal(m.Data, &req); err != nil { - log.Error("Error while unmarshaling raw json nats message content: startJob") + log.Warnf("Error while unmarshaling raw json nats message content on channel start-job: %s", err.Error()) + m.Respond([]byte("Error while unmarshaling raw json nats message content on channel start-job: " + err.Error())) } - if err := startJobHandler(req); err != nil { - log.Errorf("error: %s", err.Error()) - } + m.Respond(nm.startJobHandler(req)) }) } -func stopJobListener(conn *nats.Conn) (sub *nats.Subscription, err error) { - return conn.Subscribe("stop-job", func(m *nats.Msg) { - var req DevNatsMessage +func (nm *NatsMessenger) stopJobListener() (sub *nats.Subscription, err error) { + return nm.Connection.Subscribe("stop-job", func(m *nats.Msg) { + var req StopJobNatsRequest if err := json.Unmarshal(m.Data, &req); err != nil { log.Error("Error while unmarshaling raw json nats message content: stopJob") } - if err := stopJobHandler(req); err != nil { - log.Errorf("error: %s", err.Error()) - } + m.Respond(nm.stopJobHandler(req)) }) } -func deleteJobListener(conn *nats.Conn) (sub *nats.Subscription, err error) { - return conn.Subscribe("delete-job", func(m *nats.Msg) { +func (nm *NatsMessenger) deleteJobListener() (sub *nats.Subscription, err error) { + return nm.Connection.Subscribe("delete-job", func(m *nats.Msg) { var req DevNatsMessage if err := json.Unmarshal(m.Data, &req); err != nil { log.Error("Error while unmarshaling raw json nats message content: deleteJob") } - if err := deleteJobHandler(req); err != nil { + if err := nm.deleteJobHandler(req); err != nil { log.Errorf("error: %s", err.Error()) } }) } -func jobEventListener(conn *nats.Conn) (sub *nats.Subscription, err error) { - return conn.Subscribe("job-event", func(m *nats.Msg) { +func (nm *NatsMessenger) jobEventListener() (sub *nats.Subscription, err error) { + return nm.Connection.Subscribe("job-event", func(m *nats.Msg) { var req DevNatsMessage if err := json.Unmarshal(m.Data, &req); err != nil { log.Error("Error while unmarshaling raw json nats message content: jobEvent") } - if err := jobEventHandler(req); err != nil { + if err := nm.jobEventHandler(req); err != nil { log.Errorf("error: %s", err.Error()) } }) @@ -215,22 +223,125 @@ func jobEventListener(conn *nats.Conn) (sub *nats.Subscription, err error) { // Handlers: Take content of message and perform action, e.g. adding job in db -func startJobHandler(req DevNatsMessage) (err error) { - log.Debugf("CALLED HANDLER FOR startJob: %s", req.Content) - return nil +func (nm *NatsMessenger) startJobHandler(req schema.JobMeta) []byte { + if req.State == "" { + req.State = schema.JobStateRunning + } + if err := importer.SanityChecks(&req.BaseJob); err != nil { + log.Error(err) + return handleErr(err) + } + + // // aquire lock to avoid race condition between API calls --> for NATS required? + // var unlockOnce sync.Once + // api.RepositoryMutex.Lock() + // defer unlockOnce.Do(api.RepositoryMutex.Unlock) + + // Check if combination of (job_id, cluster_id, start_time) already exists: + jobs, err := nm.JobRepository.FindAll(&req.JobID, &req.Cluster, nil) + if err != nil && err != sql.ErrNoRows { + log.Errorf("checking for duplicate failed: %s", err) + return handleErr(fmt.Errorf("checking for duplicate failed: %w", err)) + } else if err == nil { + for _, job := range jobs { + if (req.StartTime - job.StartTimeUnix) < 86400 { + log.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID) + return handleErr(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID)) + } + } + } + + // id, err := nm.JobRepository.Start(&req) + // if err != nil { + // log.Errorf("insert into database failed: %s", err) + // return handleErr(fmt.Errorf("insert into database failed: %w", err)) + // } + + // // unlock here, adding Tags can be async + // unlockOnce.Do(api.RepositoryMutex.Unlock) + + for _, tag := range req.Tags { + if _, err := nm.JobRepository.AddTagOrCreate(1337, tag.Type, tag.Name); err != nil { + log.Errorf("adding tag to new job %d failed: %s", 1337, err) + return handleErr(fmt.Errorf("adding tag to new job %d failed: %w", 1337, err)) + } + } + + log.Infof("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", 1337, req.Cluster, req.JobID, req.User, req.StartTime) + + result, _ := json.Marshal(StartJobNatsResponse{ + DBID: 1337, + }) + return result } -func stopJobHandler(req DevNatsMessage) (err error) { - log.Debugf("CALLED HANDLER FOR stopJob: %s", req.Content) - return nil +func (nm *NatsMessenger) stopJobHandler(req StopJobNatsRequest) []byte { + // Fetch job (that will be stopped) from db + var job *schema.Job + var err error + if req.JobId == nil { + return handleErr(errors.New("the field 'jobId' is required")) + } + + job, err = nm.JobRepository.Find(req.JobId, req.Cluster, req.StartTime) + if err != nil { + return handleErr(fmt.Errorf("finding job failed: %w", err)) + } + + // Sanity checks + if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning { + return handleErr(errors.New("stopTime must be larger than startTime and only running jobs can be stopped")) + } + + if req.State != "" && !req.State.Valid() { + return handleErr(fmt.Errorf("invalid job state: %#v", req.State)) + } else if req.State == "" { + req.State = schema.JobStateCompleted + } + + // Mark job as stopped in the database (update state and duration) + job.Duration = int32(req.StopTime - job.StartTime.Unix()) + job.State = req.State + // if err := nm.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { + // return handleErr(fmt.Errorf("marking job as stopped failed: %w", err)) + // } + + log.Infof("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime) + + // // Send a response (with status OK). This means that erros that happen from here on forward + // // can *NOT* be communicated to the client. If reading from a MetricDataRepository or + // // writing to the filesystem fails, the client will not know. + // rw.Header().Add("Content-Type", "application/json") + // rw.WriteHeader(http.StatusOK) + // json.NewEncoder(rw).Encode(job) + + // Monitoring is disabled... + if job.MonitoringStatus == schema.MonitoringStatusDisabled { + return handleErr(fmt.Errorf("monitoring is disabled")) + } + + // Trigger async archiving + // nm.JobRepository.TriggerArchiving(job) + + result, _ := json.Marshal(job) + return result } -func deleteJobHandler(req DevNatsMessage) (err error) { +func (nm *NatsMessenger) deleteJobHandler(req DevNatsMessage) (err error) { + // Allow via Nats? log.Debugf("CALLED HANDLER FOR deleteJob: %s", req.Content) return nil } -func jobEventHandler(req DevNatsMessage) (err error) { +func (nm *NatsMessenger) jobEventHandler(req DevNatsMessage) (err error) { + // Implement from scratch log.Debugf("CALLED HANDLER FOR jobEvent: %s", req.Content) return nil } + +// Helper + +func handleErr(err error) []byte { + res, _ := json.Marshal(err.Error()) + return res +}