Added slurmNats code that could not parse BaseJob correctly

This commit is contained in:
Bole Ma
2023-08-18 15:38:02 +02:00
parent c0f9eb7869
commit d9e7a48e2f
62 changed files with 31829 additions and 5 deletions

View File

@@ -5,11 +5,15 @@
package scheduler
import (
"database/sql"
"encoding/json"
"log"
"strings"
"sync"
"github.com/ClusterCockpit/cc-backend/internal/importer"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/nats-io/nats.go"
)
@@ -19,6 +23,130 @@ type SlurmNatsConfig struct {
type SlurmNatsScheduler struct {
url string
RepositoryMutex sync.Mutex
JobRepository *repository.JobRepository
}
type StopJobRequest struct {
// Stop Time of job as epoch
StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"`
State schema.JobState `json:"jobState" validate:"required" example:"completed"` // Final job state
JobId *int64 `json:"jobId" example:"123000"` // Cluster Job ID of job
Cluster *string `json:"cluster" example:"fritz"` // Cluster of job
StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch
}
func (sd *SlurmNatsScheduler) startJob(req *schema.JobMeta) {
// dump job meta
// this is not working
// req := schema.JobMeta{BaseJob: schema.JobDefaults}
log.Printf("Server Name: %s - BaseJob ID: %v", req.BaseJob.Cluster, req.ID)
log.Printf("User: %s - Project: %s", req.BaseJob.User, req.BaseJob.Project)
if req.State == "" {
req.State = schema.JobStateRunning
}
if err := importer.SanityChecks(&req.BaseJob); err != nil {
return
}
// aquire lock to avoid race condition between API calls
var unlockOnce sync.Once
sd.RepositoryMutex.Lock()
defer unlockOnce.Do(sd.RepositoryMutex.Unlock)
// Check if combination of (job_id, cluster_id, start_time) already exists:
jobs, err := sd.JobRepository.FindAll(&req.JobID, &req.Cluster, nil)
if err != nil && err != sql.ErrNoRows {
log.Errorf("checking for duplicate failed: %s", err.Error())
return
} else if err == nil {
for _, job := range jobs {
if (req.StartTime - job.StartTimeUnix) < 86400 {
log.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID)
return
}
}
}
id, err := sd.JobRepository.Start(req)
if err != nil {
log.Errorf("insert into database failed: %s", err.Error())
return
}
// unlock here, adding Tags can be async
unlockOnce.Do(sd.RepositoryMutex.Unlock)
for _, tag := range req.Tags {
if _, err := sd.JobRepository.AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
log.Errorf("adding tag to new job %d failed: %s", id, err.Error())
return
}
}
log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime)
}
func (sd *SlurmNatsScheduler) checkAndHandleStopJob(job *schema.Job, req *StopJobRequest) {
// Sanity checks
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
log.Errorf("stopTime must be larger than startTime and only running jobs can be stopped")
return
}
if req.State != "" && !req.State.Valid() {
log.Errorf("invalid job state: %#v", req.State)
return
} else if req.State == "" {
req.State = schema.JobStateCompleted
}
// Mark job as stopped in the database (update state and duration)
job.Duration = int32(req.StopTime - job.StartTime.Unix())
job.State = req.State
if err := sd.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
log.Errorf("marking job as stopped failed: %s", err.Error())
return
}
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime)
// Monitoring is disabled...
if job.MonitoringStatus == schema.MonitoringStatusDisabled {
return
}
// Trigger async archiving
sd.JobRepository.TriggerArchiving(job)
}
func (sd *SlurmNatsScheduler) stopJob(req *StopJobRequest) {
// if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
// log.Errorf("missing role: %v", auth.GetRoleString(auth.RoleApi))
// return
// }
log.Printf("Server Name: %s - Job ID: %v", *req.Cluster, req.JobId)
// Fetch job (that will be stopped) from db
var job *schema.Job
var err error
if req.JobId == nil {
log.Errorf("the field 'jobId' is required")
return
}
job, err = sd.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
if err != nil {
log.Errorf("finding job failed: %s", err.Error())
return
}
sd.checkAndHandleStopJob(job, req)
}
func (sd *SlurmNatsScheduler) Init(rawConfig json.RawMessage) error {
@@ -26,7 +154,7 @@ func (sd *SlurmNatsScheduler) Init(rawConfig json.RawMessage) error {
nc, err := nats.Connect(strings.Join(servers, ","))
if err != nil {
log.Fatal(err)
log.Fatal(err.Error())
}
defer nc.Close()
@@ -44,7 +172,7 @@ func (sd *SlurmNatsScheduler) Init(rawConfig json.RawMessage) error {
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
log.Fatal(err)
log.Fatal(err.Error())
}
defer ec.Close()
@@ -58,13 +186,21 @@ func (sd *SlurmNatsScheduler) Init(rawConfig json.RawMessage) error {
wg.Add(1)
// Subscribe
if _, err := ec.Subscribe("stopJob", func(s *encodedMessage) {
if _, err := ec.Subscribe("test", func(s *encodedMessage) {
log.Printf("Server Name: %s - Response Code: %v", s.ServerName, s.ResponseCode)
if s.ResponseCode == 500 {
wg.Done()
}
}); err != nil {
log.Fatal(err)
log.Fatal(err.Error())
}
if _, err := ec.Subscribe("startJob", sd.startJob); err != nil {
log.Fatal(err.Error())
}
if _, err := ec.Subscribe("stopJob", sd.stopJob); err != nil {
log.Fatal(err.Error())
}
// Wait for a message to come in

View File

@@ -15,10 +15,25 @@ type SlurmRestScheduler struct {
}
func (sd *SlurmRestScheduler) Init(rawConfig json.RawMessage) error {
// cfg := slurmrest.NewConfiguration()
// cfg.HTTPClient = &http.Client{Timeout: time.Second * 3600}
// cfg.Scheme = "http"
// cfg.Host = "localhost"
// client := slurmrest.NewAPIClient(cfg)
return nil
}
func (sd *SlurmRestScheduler) Sync() {
// jreq := client.SlurmApi.SlurmctldGetJobs(context.Background())
// jobs, resp, err := client.SlurmApi.SlurmctldGetJobsExecute(jreq)
// if err != nil {
// log.Fatalf("FAIL: %s", err)
// } else if resp.StatusCode != 200 {
// log.Fatalf("Invalid status code: %d\n", resp.StatusCode)
// }
// for _, job := range jobs.GetJobs() {
// fmt.Printf("Job %s - %s\n", job.GetJobId(), job.GetJobState())
// }
}