From 59f665834432cee1e814676b664fc3f804b7880b Mon Sep 17 00:00:00 2001 From: Bole Ma Date: Tue, 26 Sep 2023 17:01:59 +0200 Subject: [PATCH] Refactored queryAllJobs --- internal/scheduler/slurmRest.go | 91 +++++++++++++++------------------ 1 file changed, 41 insertions(+), 50 deletions(-) diff --git a/internal/scheduler/slurmRest.go b/internal/scheduler/slurmRest.go index 1d4457a..1dea1f5 100644 --- a/internal/scheduler/slurmRest.go +++ b/internal/scheduler/slurmRest.go @@ -23,20 +23,12 @@ import ( openapi "github.com/ClusterCockpit/slurm-rest-client-0_0_38" ) -// A Response struct to map the Entire Response -type Response struct { - Name string `json:"name"` - Jobs []Job `json:"job_entries"` -} - type SlurmRestSchedulerConfig struct { URL string `json:"url"` -} - -type SlurmRestScheduler struct { - url string JobRepository *repository.JobRepository + + clusterConfig map[string]interface{} } var client *http.Client @@ -98,7 +90,7 @@ func queryDB(qtime int64, clusterName string) ([]interface{}, error) { return jobs, nil } -func queryAllJobs() ([]interface{}, error) { +func queryAllJobs() (openapi.V0038JobsResponse, error) { var ctlOutput []byte apiEndpoint := "/slurm/v0.0.38/jobs" @@ -125,20 +117,14 @@ func queryAllJobs() ([]interface{}, error) { log.Errorf("Error reading response body:", err) } - dataJob := make(map[string]interface{}) - err = json.Unmarshal(ctlOutput, &dataJob) + var jobsResponse openapi.V0038JobsResponse + err = json.Unmarshal(ctlOutput, &jobsResponse) if err != nil { log.Errorf("Error parsing JSON response:", err) - os.Exit(1) + return jobsResponse, err } - if _, ok := dataJob["jobs"]; !ok { - log.Errorf("ERROR: jobs not found - response incomplete") - os.Exit(1) - } - - jobs, _ := dataJob["jobs"].([]interface{}) - return jobs, nil + return jobsResponse, nil } func printSlurmInfo(job openapi.V0038JobResponseProperties) string { @@ -165,7 +151,6 @@ func printSlurmInfo(job openapi.V0038JobResponseProperties) string { job.Account, job.Qos, job.Requeue, job.RestartCnt, job.BatchFlag, job.TimeLimit, job.SubmitTime, - //time.Unix(int64(*.(float64)), 0).Format(time.RFC1123), job.Partition, job.Nodes, job.NodeCount, job.Cpus, job.Tasks, job.CpusPerTask, @@ -213,10 +198,11 @@ func loadClusterConfig(filename string) (map[string]interface{}, error) { return clusterConfigData, err } -func (sd *SlurmRestScheduler) Init(rawConfig json.RawMessage) error { - clusterConfigData, err := loadClusterConfig("cluster-fritz.json") +func (cfg *SlurmRestSchedulerConfig) Init(rawConfig json.RawMessage) error { + var err error + cfg.clusterConfig, err = loadClusterConfig("cluster-fritz.json") - for k, v := range clusterConfigData { + for k, v := range cfg.clusterConfig { switch c := v.(type) { case string: fmt.Printf("Item %q is a string, containing %q\n", k, c) @@ -233,7 +219,7 @@ func (sd *SlurmRestScheduler) Init(rawConfig json.RawMessage) error { return err } -func (sd *SlurmRestScheduler) checkAndHandleStopJob(job *schema.Job, req *StopJobRequest) { +func (cfg *SlurmRestSchedulerConfig) checkAndHandleStopJob(job *schema.Job, req *StopJobRequest) { // Sanity checks if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning { @@ -251,7 +237,7 @@ func (sd *SlurmRestScheduler) checkAndHandleStopJob(job *schema.Job, req *StopJo // Mark job as stopped in the database (update state and duration) job.Duration = int32(req.StopTime - job.StartTime.Unix()) job.State = req.State - if err := sd.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { + if err := cfg.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { log.Errorf("marking job as stopped failed: %s", err.Error()) return } @@ -264,10 +250,10 @@ func (sd *SlurmRestScheduler) checkAndHandleStopJob(job *schema.Job, req *StopJo } // Trigger async archiving - sd.JobRepository.TriggerArchiving(job) + cfg.JobRepository.TriggerArchiving(job) } -func (sd *SlurmRestScheduler) HandleJobsResponse(jobsResponse openapi.V0038JobsResponse) { +func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V0038JobsResponse) { // Iterate over the Jobs slice for _, job := range jobsResponse.Jobs { @@ -279,13 +265,13 @@ func (sd *SlurmRestScheduler) HandleJobsResponse(jobsResponse openapi.V0038JobsR // aquire lock to avoid race condition between API calls // var unlockOnce sync.Once - // sd.RepositoryMutex.Lock() - // defer unlockOnce.Do(sd.RepositoryMutex.Unlock) + // cfg.RepositoryMutex.Lock() + // defer unlockOnce.Do(cfg.RepositoryMutex.Unlock) // is "running" one of JSON state? if *job.JobState == "running" { - jobs, err := sd.JobRepository.FindRunningJobs(*job.Cluster) + jobs, err := cfg.JobRepository.FindRunningJobs(*job.Cluster) if err != nil { log.Fatalf("Failed to find running jobs: %v", err) } @@ -313,8 +299,8 @@ func (sd *SlurmRestScheduler) HandleJobsResponse(jobsResponse openapi.V0038JobsR var resources []*schema.Resource - // Define a regular expression to match "gres/gpu=x" - regex := regexp.MustCompile(`gres/gpu=(\d+)`) + // Define a regular expression to match "gpu=x" + regex := regexp.MustCompile(`gpu=(\d+)`) // Find all matches in the input string matches := regex.FindAllStringSubmatch(*job.TresAllocStr, -1) @@ -336,15 +322,26 @@ func (sd *SlurmRestScheduler) HandleJobsResponse(jobsResponse openapi.V0038JobsR threadID, _ := strconv.Atoi(k) res.HWThreads = append(res.HWThreads, threadID) } - res.Accelerators = append(res.Accelerators, *job.TresAllocStr) // cpu=512,mem=1875G,node=4,billing=512,gres\/gpu=32,gres\/gpu:a40=32 + // For core/GPU id mapping, need to query from cluster config file + res.Accelerators = append(res.Accelerators, *job.TresAllocStr) resources = append(resources, &res) } var metaData map[string]string metaData["jobName"] = *job.Name metaData["slurmInfo"] = printSlurmInfo(job) - // metaData["jobScript"] = "What to put here?" + + switch slurmPath := cfg.clusterConfig["slurm_path"].(type) { + case string: + commandCtlScriptTpl := fmt.Sprintf("%sscontrol -M %%s write batch_script %%s -", slurmPath) + queryJobScript := fmt.Sprintf(commandCtlScriptTpl, job.Cluster, job.JobId) + metaData["jobScript"] = queryJobScript + default: + // Type assertion failed + fmt.Println("Conversion of slurm_path to string failed") + } + metaDataInBytes, err := json.Marshal(metaData) var defaultJob schema.BaseJob = schema.BaseJob{ @@ -382,7 +379,8 @@ func (sd *SlurmRestScheduler) HandleJobsResponse(jobsResponse openapi.V0038JobsR Statistics: make(map[string]schema.JobStatistics), } // req := new(schema.JobMeta) - id, err := sd.JobRepository.Start(req) + id, err := cfg.JobRepository.Start(req) + log.Debugf("Added %v", id) } else { for _, job := range jobs { log.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID) @@ -393,7 +391,7 @@ func (sd *SlurmRestScheduler) HandleJobsResponse(jobsResponse openapi.V0038JobsR // Check if completed job with combination of (job_id, cluster_id, start_time) already exists: var jobID int64 jobID = int64(*job.JobId) - existingJob, err := sd.JobRepository.Find(&jobID, job.Cluster, job.StartTime) + existingJob, err := cfg.JobRepository.Find(&jobID, job.Cluster, job.StartTime) if err == nil { existingJob.BaseJob.Duration = int32(*job.EndTime - *job.StartTime) @@ -408,27 +406,20 @@ func (sd *SlurmRestScheduler) HandleJobsResponse(jobsResponse openapi.V0038JobsR StopTime: *job.EndTime, } // req := new(schema.JobMeta) - sd.checkAndHandleStopJob(existingJob, req) + cfg.checkAndHandleStopJob(existingJob, req) } } } } -func (sd *SlurmRestScheduler) Sync() { - // for _, job := range jobs.GetJobs() { - // fmt.Printf("Job %s - %s\n", job.GetJobId(), job.GetJobState()) - // } +func (cfg *SlurmRestSchedulerConfig) Sync() { - response, err := queryAllJobs() + // Fetch an instance of V0037JobsResponse + jobsResponse, err := queryAllJobs() if err != nil { log.Fatal(err.Error()) } - - // Fetch an example instance of V0037JobsResponse - // jobsResponse := openapi.V0038JobsResponse{} - - var jobsResponse openapi.V0038JobsResponse - sd.HandleJobsResponse(jobsResponse) + cfg.HandleJobsResponse(jobsResponse) }