From 9fe497173dcac20d89670e11b5c49128021fb348 Mon Sep 17 00:00:00 2001 From: Bole Ma Date: Thu, 26 Oct 2023 03:21:51 +0200 Subject: [PATCH] Added JobResource back to Job --- internal/scheduler/payloadConverter.go | 40 +++++-- internal/scheduler/slurmRest.go | 148 ++++++++++--------------- 2 files changed, 89 insertions(+), 99 deletions(-) diff --git a/internal/scheduler/payloadConverter.go b/internal/scheduler/payloadConverter.go index 254a323..7d63297 100644 --- a/internal/scheduler/payloadConverter.go +++ b/internal/scheduler/payloadConverter.go @@ -89,11 +89,30 @@ type Meta struct { } `json:"Slurm"` } +type JobResource struct { + Nodes string `json:"nodes"` + AllocatedCores int `json:"allocated_cores"` + AllocatedHosts int `json:"allocated_hosts"` + AllocatedNodes []AllocatedNode `json:"allocated_nodes"` +} + +type AllocatedNode struct { + Sockets map[string]Socket `json:"sockets"` + Nodename string `json:"nodename"` + CPUsUsed *int `json:"cpus_used"` + MemoryUsed *int `json:"memory_used"` + MemoryAllocated *int `json:"memory_allocated"` +} + +type Socket struct { + Cores map[string]string `json:"cores"` +} + type Job struct { Account string `json:"account"` AccrueTime int `json:"accrue_time"` AdminComment string `json:"admin_comment"` - ArrayJobID int `json:"array_job_id"` + ArrayJobID int64 `json:"array_job_id"` ArrayTaskID interface{} `json:"array_task_id"` ArrayMaxTasks int `json:"array_max_tasks"` ArrayTaskString string `json:"array_task_string"` @@ -124,7 +143,7 @@ type Job struct { Dependency string `json:"dependency"` DerivedExitCode int `json:"derived_exit_code"` EligibleTime int `json:"eligible_time"` - EndTime int `json:"end_time"` + EndTime int64 `json:"end_time"` ExcludedNodes string `json:"excluded_nodes"` ExitCode int `json:"exit_code"` Features string `json:"features"` @@ -134,7 +153,8 @@ type Job struct { GresDetail []string `json:"gres_detail"` GroupID int `json:"group_id"` GroupName string `json:"group_name"` - JobID int `json:"job_id"` + JobID int64 `json:"job_id"` + JobResources JobResource `json:"job_resources"` JobState string `json:"job_state"` LastSchedEvaluation int `json:"last_sched_evaluation"` Licenses string `json:"licenses"` @@ -149,8 +169,8 @@ type Job struct { TasksPerNode int `json:"tasks_per_node"` TasksPerSocket interface{} `json:"tasks_per_socket"` TasksPerBoard int `json:"tasks_per_board"` - CPUs int `json:"cpus"` - NodeCount int `json:"node_count"` + CPUs int32 `json:"cpus"` + NodeCount int32 `json:"node_count"` Tasks int `json:"tasks"` HETJobID int `json:"het_job_id"` HETJobIDSet string `json:"het_job_id_set"` @@ -171,11 +191,11 @@ type Job struct { ResizeTime int `json:"resize_time"` RestartCnt int `json:"restart_cnt"` ResvName string `json:"resv_name"` - Shared interface{} `json:"shared"` + Shared *string `json:"shared"` ShowFlags []string `json:"show_flags"` SocketsPerBoard int `json:"sockets_per_board"` SocketsPerNode interface{} `json:"sockets_per_node"` - StartTime int `json:"start_time"` + StartTime int64 `json:"start_time"` StateDescription string `json:"state_description"` StateReason string `json:"state_reason"` StandardError string `json:"standard_error"` @@ -201,7 +221,7 @@ type Job struct { CurrentWorkingDirectory string `json:"current_working_directory"` } -type SlurmData struct { +type SlurmPayload struct { Meta Meta `json:"meta"` Errors []interface{} `json:"errors"` Jobs []Job `json:"jobs"` @@ -268,8 +288,8 @@ func DecodeClusterConfig(filename string) (ClusterConfig, error) { return clusterConfig, nil } -func UnmarshalSlurmPayload(jsonPayload string) (SlurmData, error) { - var slurmData SlurmData +func UnmarshalSlurmPayload(jsonPayload string) (SlurmPayload, error) { + var slurmData SlurmPayload err := json.Unmarshal([]byte(jsonPayload), &slurmData) if err != nil { return slurmData, fmt.Errorf("failed to unmarshal JSON data: %v", err) diff --git a/internal/scheduler/slurmRest.go b/internal/scheduler/slurmRest.go index 5c7c954..9795c64 100644 --- a/internal/scheduler/slurmRest.go +++ b/internal/scheduler/slurmRest.go @@ -13,18 +13,12 @@ import ( "os/exec" "regexp" "strconv" - "time" - - "fmt" - "regexp" - "strconv" "strings" + "time" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" - - openapi "github.com/ClusterCockpit/slurm-rest-client-0_0_38" ) type SlurmRestSchedulerConfig struct { @@ -94,7 +88,7 @@ func queryDB(qtime int64, clusterName string) ([]interface{}, error) { return jobs, nil } -func queryAllJobs() (openapi.V0038JobsResponse, error) { +func queryAllJobs() (SlurmPayload, error) { var ctlOutput []byte apiEndpoint := "http://:8080/slurm/v0.0.38/jobs" @@ -122,7 +116,7 @@ func queryAllJobs() (openapi.V0038JobsResponse, error) { log.Errorf("Error reading response body: %v", err) } - var jobsResponse openapi.V0038JobsResponse + var jobsResponse SlurmPayload err = json.Unmarshal(ctlOutput, &jobsResponse) if err != nil { log.Errorf("Error parsing JSON response: %v", err) @@ -132,7 +126,7 @@ func queryAllJobs() (openapi.V0038JobsResponse, error) { return jobsResponse, nil } -func queryAllJobsLocal() (openapi.V0038JobsResponse, error) { +func queryAllJobsLocal() (SlurmPayload, error) { // Read the JSON file jobsData, err := os.ReadFile("slurm_0038.json") @@ -140,7 +134,7 @@ func queryAllJobsLocal() (openapi.V0038JobsResponse, error) { fmt.Println("Error reading JSON file:", err) } - var jobsResponse openapi.V0038JobsResponse + var jobsResponse SlurmPayload err = json.Unmarshal(jobsData, &jobsResponse) if err != nil { log.Errorf("Error parsing JSON response: %v", err) @@ -150,7 +144,7 @@ func queryAllJobsLocal() (openapi.V0038JobsResponse, error) { return jobsResponse, nil } -func printSlurmInfo(job openapi.V0038JobResponseProperties) string { +func printSlurmInfo(job Job) string { text := fmt.Sprintf(` JobId=%v JobName=%v @@ -169,14 +163,14 @@ func printSlurmInfo(job openapi.V0038JobResponseProperties) string { WorkDir=%v StdErr=%v StdOut=%v`, - job.JobId, job.Name, - job.UserName, job.UserId, job.GroupId, - job.Account, job.Qos, + job.JobID, job.Name, + job.UserName, job.UserID, job.GroupID, + job.Account, job.QOS, job.Requeue, job.RestartCnt, job.BatchFlag, job.TimeLimit, job.SubmitTime, job.Partition, job.Nodes, - job.NodeCount, job.Cpus, job.Tasks, job.CpusPerTask, + job.NodeCount, job.CPUs, job.Tasks, job.CPUPerTask, job.TasksPerBoard, job.TasksPerSocket, job.TasksPerCore, job.TresAllocStr, job.TresAllocStr, @@ -207,19 +201,6 @@ func (cfg *SlurmRestSchedulerConfig) Init() error { cfg.clusterConfig, err = DecodeClusterConfig("cluster-alex.json") - // for k, v := range cfg.clusterConfig { - // fmt.Printf("Entry %q with value %x loaded\n", k, v) - // // switch c := v.(type) { - // // case string: - // // fmt.Printf("Item %q is a string, containing %q\n", k, c) - // // case float64: - // // fmt.Printf("Looks like item %q is a number, specifically %f\n", k, c) - // // default: - // // fmt.Printf("Not sure what type item %q is, but I think it might be %T\n", k, c) - // // } - // } - // fmt.Printf("Cluster Name: %q\n", cfg.clusterConfig["name"]) - // Create an HTTP client client = &http.Client{} @@ -293,26 +274,20 @@ func ConstructNodeAcceleratorMap(input string, accelerator string) map[string]st return numberMap } -func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V0038JobsResponse) { +func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobs []Job) { // Iterate over the Jobs slice - for _, job := range jobsResponse.Jobs { + for _, job := range jobs { // Process each job - fmt.Printf("Job ID: %d\n", *job.JobId) - fmt.Printf("Job Name: %s\n", *job.Name) - fmt.Printf("Job State: %s\n", *job.JobState) - fmt.Println("Job StartTime:", *job.StartTime) - fmt.Println("Job Cluster:", *job.Cluster) + fmt.Printf("Job ID: %d\n", job.JobID) + fmt.Printf("Job Name: %s\n", job.Name) + fmt.Printf("Job State: %s\n", job.JobState) + fmt.Println("Job StartTime:", job.StartTime) + fmt.Println("Job Cluster:", job.Cluster) - // aquire lock to avoid race condition between API calls - // var unlockOnce sync.Once - // cfg.RepositoryMutex.Lock() - // defer unlockOnce.Do(cfg.RepositoryMutex.Unlock) + if job.JobState == "RUNNING" { - // is "running" one of JSON state? - if *job.JobState == "RUNNING" { - - // jobs, err := cfg.JobRepository.FindRunningJobs(*job.Cluster) + // jobs, err := cfg.JobRepository.FindRunningJobs(job.Cluster) // if err != nil { // log.Fatalf("Failed to find running jobs: %v", err) // } @@ -333,18 +308,13 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V00 exclusive = 0 } - jobResourcesInBytes, err := json.Marshal(*job.JobResources) - if err != nil { - log.Fatalf("JobResources JSON marshaling failed: %s", err) - } - var resources []*schema.Resource // Define a regular expression to match "gpu=x" regex := regexp.MustCompile(`gpu=(\d+)`) // Find all matches in the input string - matches := regex.FindAllStringSubmatch(*job.TresAllocStr, -1) + matches := regex.FindAllStringSubmatch(job.TresAllocStr, -1) // Initialize a variable to store the total number of GPUs var totalGPUs int32 @@ -357,15 +327,15 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V00 for _, node := range job.JobResources.AllocatedNodes { var res schema.Resource - res.Hostname = *node.Nodename + res.Hostname = node.Nodename - log.Debugf("Node %s V0038NodeAllocationSockets.Cores map size: %d\n", *node.Nodename, len(node.Sockets.Cores)) + log.Debugf("Node %s Cores map size: %d\n", node.Nodename, len(node.Sockets)) - if node.Cpus == nil || node.Memory == nil { + if node.CPUsUsed == nil || node.MemoryAllocated == nil { log.Fatalf("Either node.Cpus or node.Memory is nil\n") } - for k, v := range node.Sockets.Cores { + for k, v := range node.Sockets { fmt.Printf("core id[%s] value[%s]\n", k, v) threadID, _ := strconv.Atoi(k) res.HWThreads = append(res.HWThreads, threadID) @@ -373,12 +343,12 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V00 // cpu=512,mem=1875G,node=4,billing=512,gres\/gpu=32,gres\/gpu:a40=32 // For core/GPU id mapping, need to query from cluster config file - res.Accelerators = append(res.Accelerators, *job.Comment) + res.Accelerators = append(res.Accelerators, job.Comment) resources = append(resources, &res) } metaData := make(map[string]string) - metaData["jobName"] = *job.Name + metaData["jobName"] = job.Name metaData["slurmInfo"] = printSlurmInfo(job) // switch slurmPath := cfg.clusterConfig["slurm_path"].(type) { @@ -397,26 +367,26 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V00 } var defaultJob schema.BaseJob = schema.BaseJob{ - JobID: int64(*job.JobId), - User: *job.UserName, - Project: *job.Account, - Cluster: *job.Cluster, - Partition: *job.Partition, + JobID: job.JobID, + User: job.UserName, + Project: job.Account, + Cluster: job.Cluster, + Partition: job.Partition, // check nil - ArrayJobId: int64(*job.ArrayJobId), - NumNodes: *job.NodeCount, - NumHWThreads: *job.Cpus, + ArrayJobId: job.ArrayJobID, + NumNodes: job.NodeCount, + NumHWThreads: job.CPUs, NumAcc: totalGPUs, Exclusive: exclusive, // MonitoringStatus: job.MonitoringStatus, - // SMT: *job.TasksPerCore, - State: schema.JobState(*job.JobState), + // SMT: job.TasksPerCore, + State: schema.JobState(job.JobState), // ignore this for start job - // Duration: int32(time.Now().Unix() - *job.StartTime), // or SubmitTime? + // Duration: int32(time.Now().Unix() - job.StartTime), // or SubmitTime? Walltime: time.Now().Unix(), // max duration requested by the job // Tags: job.Tags, // ignore this! - RawResources: jobResourcesInBytes, + // RawResources: jobResourcesInBytes, // "job_resources": "allocated_nodes" "sockets": // very important; has to be right Resources: resources, @@ -429,7 +399,7 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V00 req := &schema.JobMeta{ BaseJob: defaultJob, - StartTime: *job.StartTime, + StartTime: job.StartTime, Statistics: make(map[string]schema.JobStatistics), } log.Debugf("Generated JobMeta %v", req.BaseJob.JobID) @@ -445,41 +415,41 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V00 // } } else { // Check if completed job with combination of (job_id, cluster_id, start_time) already exists: - var jobID int64 - jobID = int64(*job.JobId) - log.Debugf("jobID: %v Cluster: %v StartTime: %v", jobID, *job.Cluster, *job.StartTime) + jobID := job.JobID + log.Debugf("jobID: %v Cluster: %v StartTime: %v", jobID, job.Cluster, job.StartTime) // commented out as it will cause panic // note down params invoked - existingJob, err := cfg.JobRepository.Find(&jobID, job.Cluster, job.StartTime) + // existingJob, err := cfg.JobRepository.Find(&jobID, &job.Cluster, &job.StartTime) - if err == nil { - existingJob.BaseJob.Duration = int32(*job.EndTime - *job.StartTime) - existingJob.BaseJob.State = schema.JobState(*job.JobState) - existingJob.BaseJob.Walltime = *job.StartTime + // if err == nil { + // existingJob.BaseJob.Duration = int32(job.EndTime - job.StartTime) + // existingJob.BaseJob.State = schema.JobState(job.JobState) + // existingJob.BaseJob.Walltime = job.StartTime - req := &StopJobRequest{ - Cluster: job.Cluster, - JobId: &jobID, - State: schema.JobState(*job.JobState), - StartTime: &existingJob.StartTimeUnix, - StopTime: *job.EndTime, - } - // req := new(schema.JobMeta) - cfg.checkAndHandleStopJob(existingJob, req) - } + // req := &StopJobRequest{ + // Cluster: &job.Cluster, + // JobId: &jobID, + // State: schema.JobState(job.JobState), + // StartTime: &existingJob.StartTimeUnix, + // StopTime: job.EndTime, + // } + // // req := new(schema.JobMeta) + // cfg.checkAndHandleStopJob(existingJob, req) + // } } } + } func (cfg *SlurmRestSchedulerConfig) Sync() { - // Fetch an instance of V0037JobsResponse + // Fetch an instance of JobsResponse jobsResponse, err := queryAllJobsLocal() if err != nil { log.Fatal(err.Error()) } - cfg.HandleJobsResponse(jobsResponse) + cfg.HandleJobsResponse(jobsResponse.Jobs) }