Refactored queryAllJobs

This commit is contained in:
Bole Ma 2023-09-26 17:01:59 +02:00
parent 36b0f33208
commit 59f6658344

View File

@ -23,20 +23,12 @@ import (
openapi "github.com/ClusterCockpit/slurm-rest-client-0_0_38" openapi "github.com/ClusterCockpit/slurm-rest-client-0_0_38"
) )
// A Response struct to map the Entire Response
type Response struct {
Name string `json:"name"`
Jobs []Job `json:"job_entries"`
}
type SlurmRestSchedulerConfig struct { type SlurmRestSchedulerConfig struct {
URL string `json:"url"` URL string `json:"url"`
}
type SlurmRestScheduler struct {
url string
JobRepository *repository.JobRepository JobRepository *repository.JobRepository
clusterConfig map[string]interface{}
} }
var client *http.Client var client *http.Client
@ -98,7 +90,7 @@ func queryDB(qtime int64, clusterName string) ([]interface{}, error) {
return jobs, nil return jobs, nil
} }
func queryAllJobs() ([]interface{}, error) { func queryAllJobs() (openapi.V0038JobsResponse, error) {
var ctlOutput []byte var ctlOutput []byte
apiEndpoint := "/slurm/v0.0.38/jobs" apiEndpoint := "/slurm/v0.0.38/jobs"
@ -125,20 +117,14 @@ func queryAllJobs() ([]interface{}, error) {
log.Errorf("Error reading response body:", err) log.Errorf("Error reading response body:", err)
} }
dataJob := make(map[string]interface{}) var jobsResponse openapi.V0038JobsResponse
err = json.Unmarshal(ctlOutput, &dataJob) err = json.Unmarshal(ctlOutput, &jobsResponse)
if err != nil { if err != nil {
log.Errorf("Error parsing JSON response:", err) log.Errorf("Error parsing JSON response:", err)
os.Exit(1) return jobsResponse, err
} }
if _, ok := dataJob["jobs"]; !ok { return jobsResponse, nil
log.Errorf("ERROR: jobs not found - response incomplete")
os.Exit(1)
}
jobs, _ := dataJob["jobs"].([]interface{})
return jobs, nil
} }
func printSlurmInfo(job openapi.V0038JobResponseProperties) string { func printSlurmInfo(job openapi.V0038JobResponseProperties) string {
@ -165,7 +151,6 @@ func printSlurmInfo(job openapi.V0038JobResponseProperties) string {
job.Account, job.Qos, job.Account, job.Qos,
job.Requeue, job.RestartCnt, job.BatchFlag, job.Requeue, job.RestartCnt, job.BatchFlag,
job.TimeLimit, job.SubmitTime, job.TimeLimit, job.SubmitTime,
//time.Unix(int64(*.(float64)), 0).Format(time.RFC1123),
job.Partition, job.Partition,
job.Nodes, job.Nodes,
job.NodeCount, job.Cpus, job.Tasks, job.CpusPerTask, job.NodeCount, job.Cpus, job.Tasks, job.CpusPerTask,
@ -213,10 +198,11 @@ func loadClusterConfig(filename string) (map[string]interface{}, error) {
return clusterConfigData, err return clusterConfigData, err
} }
func (sd *SlurmRestScheduler) Init(rawConfig json.RawMessage) error { func (cfg *SlurmRestSchedulerConfig) Init(rawConfig json.RawMessage) error {
clusterConfigData, err := loadClusterConfig("cluster-fritz.json") var err error
cfg.clusterConfig, err = loadClusterConfig("cluster-fritz.json")
for k, v := range clusterConfigData { for k, v := range cfg.clusterConfig {
switch c := v.(type) { switch c := v.(type) {
case string: case string:
fmt.Printf("Item %q is a string, containing %q\n", k, c) fmt.Printf("Item %q is a string, containing %q\n", k, c)
@ -233,7 +219,7 @@ func (sd *SlurmRestScheduler) Init(rawConfig json.RawMessage) error {
return err return err
} }
func (sd *SlurmRestScheduler) checkAndHandleStopJob(job *schema.Job, req *StopJobRequest) { func (cfg *SlurmRestSchedulerConfig) checkAndHandleStopJob(job *schema.Job, req *StopJobRequest) {
// Sanity checks // Sanity checks
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning { if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
@ -251,7 +237,7 @@ func (sd *SlurmRestScheduler) checkAndHandleStopJob(job *schema.Job, req *StopJo
// Mark job as stopped in the database (update state and duration) // Mark job as stopped in the database (update state and duration)
job.Duration = int32(req.StopTime - job.StartTime.Unix()) job.Duration = int32(req.StopTime - job.StartTime.Unix())
job.State = req.State job.State = req.State
if err := sd.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { if err := cfg.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
log.Errorf("marking job as stopped failed: %s", err.Error()) log.Errorf("marking job as stopped failed: %s", err.Error())
return return
} }
@ -264,10 +250,10 @@ func (sd *SlurmRestScheduler) checkAndHandleStopJob(job *schema.Job, req *StopJo
} }
// Trigger async archiving // Trigger async archiving
sd.JobRepository.TriggerArchiving(job) cfg.JobRepository.TriggerArchiving(job)
} }
func (sd *SlurmRestScheduler) HandleJobsResponse(jobsResponse openapi.V0038JobsResponse) { func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V0038JobsResponse) {
// Iterate over the Jobs slice // Iterate over the Jobs slice
for _, job := range jobsResponse.Jobs { for _, job := range jobsResponse.Jobs {
@ -279,13 +265,13 @@ func (sd *SlurmRestScheduler) HandleJobsResponse(jobsResponse openapi.V0038JobsR
// aquire lock to avoid race condition between API calls // aquire lock to avoid race condition between API calls
// var unlockOnce sync.Once // var unlockOnce sync.Once
// sd.RepositoryMutex.Lock() // cfg.RepositoryMutex.Lock()
// defer unlockOnce.Do(sd.RepositoryMutex.Unlock) // defer unlockOnce.Do(cfg.RepositoryMutex.Unlock)
// is "running" one of JSON state? // is "running" one of JSON state?
if *job.JobState == "running" { if *job.JobState == "running" {
jobs, err := sd.JobRepository.FindRunningJobs(*job.Cluster) jobs, err := cfg.JobRepository.FindRunningJobs(*job.Cluster)
if err != nil { if err != nil {
log.Fatalf("Failed to find running jobs: %v", err) log.Fatalf("Failed to find running jobs: %v", err)
} }
@ -313,8 +299,8 @@ func (sd *SlurmRestScheduler) HandleJobsResponse(jobsResponse openapi.V0038JobsR
var resources []*schema.Resource var resources []*schema.Resource
// Define a regular expression to match "gres/gpu=x" // Define a regular expression to match "gpu=x"
regex := regexp.MustCompile(`gres/gpu=(\d+)`) regex := regexp.MustCompile(`gpu=(\d+)`)
// Find all matches in the input string // Find all matches in the input string
matches := regex.FindAllStringSubmatch(*job.TresAllocStr, -1) matches := regex.FindAllStringSubmatch(*job.TresAllocStr, -1)
@ -336,15 +322,26 @@ func (sd *SlurmRestScheduler) HandleJobsResponse(jobsResponse openapi.V0038JobsR
threadID, _ := strconv.Atoi(k) threadID, _ := strconv.Atoi(k)
res.HWThreads = append(res.HWThreads, threadID) res.HWThreads = append(res.HWThreads, threadID)
} }
res.Accelerators = append(res.Accelerators, *job.TresAllocStr)
// cpu=512,mem=1875G,node=4,billing=512,gres\/gpu=32,gres\/gpu:a40=32 // cpu=512,mem=1875G,node=4,billing=512,gres\/gpu=32,gres\/gpu:a40=32
// For core/GPU id mapping, need to query from cluster config file
res.Accelerators = append(res.Accelerators, *job.TresAllocStr)
resources = append(resources, &res) resources = append(resources, &res)
} }
var metaData map[string]string var metaData map[string]string
metaData["jobName"] = *job.Name metaData["jobName"] = *job.Name
metaData["slurmInfo"] = printSlurmInfo(job) metaData["slurmInfo"] = printSlurmInfo(job)
// metaData["jobScript"] = "What to put here?"
switch slurmPath := cfg.clusterConfig["slurm_path"].(type) {
case string:
commandCtlScriptTpl := fmt.Sprintf("%sscontrol -M %%s write batch_script %%s -", slurmPath)
queryJobScript := fmt.Sprintf(commandCtlScriptTpl, job.Cluster, job.JobId)
metaData["jobScript"] = queryJobScript
default:
// Type assertion failed
fmt.Println("Conversion of slurm_path to string failed")
}
metaDataInBytes, err := json.Marshal(metaData) metaDataInBytes, err := json.Marshal(metaData)
var defaultJob schema.BaseJob = schema.BaseJob{ var defaultJob schema.BaseJob = schema.BaseJob{
@ -382,7 +379,8 @@ func (sd *SlurmRestScheduler) HandleJobsResponse(jobsResponse openapi.V0038JobsR
Statistics: make(map[string]schema.JobStatistics), Statistics: make(map[string]schema.JobStatistics),
} }
// req := new(schema.JobMeta) // req := new(schema.JobMeta)
id, err := sd.JobRepository.Start(req) id, err := cfg.JobRepository.Start(req)
log.Debugf("Added %v", id)
} else { } else {
for _, job := range jobs { for _, job := range jobs {
log.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID) log.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID)
@ -393,7 +391,7 @@ func (sd *SlurmRestScheduler) HandleJobsResponse(jobsResponse openapi.V0038JobsR
// Check if completed job with combination of (job_id, cluster_id, start_time) already exists: // Check if completed job with combination of (job_id, cluster_id, start_time) already exists:
var jobID int64 var jobID int64
jobID = int64(*job.JobId) jobID = int64(*job.JobId)
existingJob, err := sd.JobRepository.Find(&jobID, job.Cluster, job.StartTime) existingJob, err := cfg.JobRepository.Find(&jobID, job.Cluster, job.StartTime)
if err == nil { if err == nil {
existingJob.BaseJob.Duration = int32(*job.EndTime - *job.StartTime) existingJob.BaseJob.Duration = int32(*job.EndTime - *job.StartTime)
@ -408,27 +406,20 @@ func (sd *SlurmRestScheduler) HandleJobsResponse(jobsResponse openapi.V0038JobsR
StopTime: *job.EndTime, StopTime: *job.EndTime,
} }
// req := new(schema.JobMeta) // req := new(schema.JobMeta)
sd.checkAndHandleStopJob(existingJob, req) cfg.checkAndHandleStopJob(existingJob, req)
} }
} }
} }
} }
func (sd *SlurmRestScheduler) Sync() { func (cfg *SlurmRestSchedulerConfig) Sync() {
// for _, job := range jobs.GetJobs() {
// fmt.Printf("Job %s - %s\n", job.GetJobId(), job.GetJobState())
// }
response, err := queryAllJobs() // Fetch an instance of V0037JobsResponse
jobsResponse, err := queryAllJobs()
if err != nil { if err != nil {
log.Fatal(err.Error()) log.Fatal(err.Error())
} }
cfg.HandleJobsResponse(jobsResponse)
// Fetch an example instance of V0037JobsResponse
// jobsResponse := openapi.V0038JobsResponse{}
var jobsResponse openapi.V0038JobsResponse
sd.HandleJobsResponse(jobsResponse)
} }