Added JobResource back to Job

This commit is contained in:
Bole Ma 2023-10-26 03:21:51 +02:00
parent a3fbdbcf90
commit 9fe497173d
2 changed files with 89 additions and 99 deletions

View File

@ -89,11 +89,30 @@ type Meta struct {
} `json:"Slurm"` } `json:"Slurm"`
} }
type JobResource struct {
Nodes string `json:"nodes"`
AllocatedCores int `json:"allocated_cores"`
AllocatedHosts int `json:"allocated_hosts"`
AllocatedNodes []AllocatedNode `json:"allocated_nodes"`
}
type AllocatedNode struct {
Sockets map[string]Socket `json:"sockets"`
Nodename string `json:"nodename"`
CPUsUsed *int `json:"cpus_used"`
MemoryUsed *int `json:"memory_used"`
MemoryAllocated *int `json:"memory_allocated"`
}
type Socket struct {
Cores map[string]string `json:"cores"`
}
type Job struct { type Job struct {
Account string `json:"account"` Account string `json:"account"`
AccrueTime int `json:"accrue_time"` AccrueTime int `json:"accrue_time"`
AdminComment string `json:"admin_comment"` AdminComment string `json:"admin_comment"`
ArrayJobID int `json:"array_job_id"` ArrayJobID int64 `json:"array_job_id"`
ArrayTaskID interface{} `json:"array_task_id"` ArrayTaskID interface{} `json:"array_task_id"`
ArrayMaxTasks int `json:"array_max_tasks"` ArrayMaxTasks int `json:"array_max_tasks"`
ArrayTaskString string `json:"array_task_string"` ArrayTaskString string `json:"array_task_string"`
@ -124,7 +143,7 @@ type Job struct {
Dependency string `json:"dependency"` Dependency string `json:"dependency"`
DerivedExitCode int `json:"derived_exit_code"` DerivedExitCode int `json:"derived_exit_code"`
EligibleTime int `json:"eligible_time"` EligibleTime int `json:"eligible_time"`
EndTime int `json:"end_time"` EndTime int64 `json:"end_time"`
ExcludedNodes string `json:"excluded_nodes"` ExcludedNodes string `json:"excluded_nodes"`
ExitCode int `json:"exit_code"` ExitCode int `json:"exit_code"`
Features string `json:"features"` Features string `json:"features"`
@ -134,7 +153,8 @@ type Job struct {
GresDetail []string `json:"gres_detail"` GresDetail []string `json:"gres_detail"`
GroupID int `json:"group_id"` GroupID int `json:"group_id"`
GroupName string `json:"group_name"` GroupName string `json:"group_name"`
JobID int `json:"job_id"` JobID int64 `json:"job_id"`
JobResources JobResource `json:"job_resources"`
JobState string `json:"job_state"` JobState string `json:"job_state"`
LastSchedEvaluation int `json:"last_sched_evaluation"` LastSchedEvaluation int `json:"last_sched_evaluation"`
Licenses string `json:"licenses"` Licenses string `json:"licenses"`
@ -149,8 +169,8 @@ type Job struct {
TasksPerNode int `json:"tasks_per_node"` TasksPerNode int `json:"tasks_per_node"`
TasksPerSocket interface{} `json:"tasks_per_socket"` TasksPerSocket interface{} `json:"tasks_per_socket"`
TasksPerBoard int `json:"tasks_per_board"` TasksPerBoard int `json:"tasks_per_board"`
CPUs int `json:"cpus"` CPUs int32 `json:"cpus"`
NodeCount int `json:"node_count"` NodeCount int32 `json:"node_count"`
Tasks int `json:"tasks"` Tasks int `json:"tasks"`
HETJobID int `json:"het_job_id"` HETJobID int `json:"het_job_id"`
HETJobIDSet string `json:"het_job_id_set"` HETJobIDSet string `json:"het_job_id_set"`
@ -171,11 +191,11 @@ type Job struct {
ResizeTime int `json:"resize_time"` ResizeTime int `json:"resize_time"`
RestartCnt int `json:"restart_cnt"` RestartCnt int `json:"restart_cnt"`
ResvName string `json:"resv_name"` ResvName string `json:"resv_name"`
Shared interface{} `json:"shared"` Shared *string `json:"shared"`
ShowFlags []string `json:"show_flags"` ShowFlags []string `json:"show_flags"`
SocketsPerBoard int `json:"sockets_per_board"` SocketsPerBoard int `json:"sockets_per_board"`
SocketsPerNode interface{} `json:"sockets_per_node"` SocketsPerNode interface{} `json:"sockets_per_node"`
StartTime int `json:"start_time"` StartTime int64 `json:"start_time"`
StateDescription string `json:"state_description"` StateDescription string `json:"state_description"`
StateReason string `json:"state_reason"` StateReason string `json:"state_reason"`
StandardError string `json:"standard_error"` StandardError string `json:"standard_error"`
@ -201,7 +221,7 @@ type Job struct {
CurrentWorkingDirectory string `json:"current_working_directory"` CurrentWorkingDirectory string `json:"current_working_directory"`
} }
type SlurmData struct { type SlurmPayload struct {
Meta Meta `json:"meta"` Meta Meta `json:"meta"`
Errors []interface{} `json:"errors"` Errors []interface{} `json:"errors"`
Jobs []Job `json:"jobs"` Jobs []Job `json:"jobs"`
@ -268,8 +288,8 @@ func DecodeClusterConfig(filename string) (ClusterConfig, error) {
return clusterConfig, nil return clusterConfig, nil
} }
func UnmarshalSlurmPayload(jsonPayload string) (SlurmData, error) { func UnmarshalSlurmPayload(jsonPayload string) (SlurmPayload, error) {
var slurmData SlurmData var slurmData SlurmPayload
err := json.Unmarshal([]byte(jsonPayload), &slurmData) err := json.Unmarshal([]byte(jsonPayload), &slurmData)
if err != nil { if err != nil {
return slurmData, fmt.Errorf("failed to unmarshal JSON data: %v", err) return slurmData, fmt.Errorf("failed to unmarshal JSON data: %v", err)

View File

@ -13,18 +13,12 @@ import (
"os/exec" "os/exec"
"regexp" "regexp"
"strconv" "strconv"
"time"
"fmt"
"regexp"
"strconv"
"strings" "strings"
"time"
"github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
openapi "github.com/ClusterCockpit/slurm-rest-client-0_0_38"
) )
type SlurmRestSchedulerConfig struct { type SlurmRestSchedulerConfig struct {
@ -94,7 +88,7 @@ func queryDB(qtime int64, clusterName string) ([]interface{}, error) {
return jobs, nil return jobs, nil
} }
func queryAllJobs() (openapi.V0038JobsResponse, error) { func queryAllJobs() (SlurmPayload, error) {
var ctlOutput []byte var ctlOutput []byte
apiEndpoint := "http://:8080/slurm/v0.0.38/jobs" apiEndpoint := "http://:8080/slurm/v0.0.38/jobs"
@ -122,7 +116,7 @@ func queryAllJobs() (openapi.V0038JobsResponse, error) {
log.Errorf("Error reading response body: %v", err) log.Errorf("Error reading response body: %v", err)
} }
var jobsResponse openapi.V0038JobsResponse var jobsResponse SlurmPayload
err = json.Unmarshal(ctlOutput, &jobsResponse) err = json.Unmarshal(ctlOutput, &jobsResponse)
if err != nil { if err != nil {
log.Errorf("Error parsing JSON response: %v", err) log.Errorf("Error parsing JSON response: %v", err)
@ -132,7 +126,7 @@ func queryAllJobs() (openapi.V0038JobsResponse, error) {
return jobsResponse, nil return jobsResponse, nil
} }
func queryAllJobsLocal() (openapi.V0038JobsResponse, error) { func queryAllJobsLocal() (SlurmPayload, error) {
// Read the JSON file // Read the JSON file
jobsData, err := os.ReadFile("slurm_0038.json") jobsData, err := os.ReadFile("slurm_0038.json")
@ -140,7 +134,7 @@ func queryAllJobsLocal() (openapi.V0038JobsResponse, error) {
fmt.Println("Error reading JSON file:", err) fmt.Println("Error reading JSON file:", err)
} }
var jobsResponse openapi.V0038JobsResponse var jobsResponse SlurmPayload
err = json.Unmarshal(jobsData, &jobsResponse) err = json.Unmarshal(jobsData, &jobsResponse)
if err != nil { if err != nil {
log.Errorf("Error parsing JSON response: %v", err) log.Errorf("Error parsing JSON response: %v", err)
@ -150,7 +144,7 @@ func queryAllJobsLocal() (openapi.V0038JobsResponse, error) {
return jobsResponse, nil return jobsResponse, nil
} }
func printSlurmInfo(job openapi.V0038JobResponseProperties) string { func printSlurmInfo(job Job) string {
text := fmt.Sprintf(` text := fmt.Sprintf(`
JobId=%v JobName=%v JobId=%v JobName=%v
@ -169,14 +163,14 @@ func printSlurmInfo(job openapi.V0038JobResponseProperties) string {
WorkDir=%v WorkDir=%v
StdErr=%v StdErr=%v
StdOut=%v`, StdOut=%v`,
job.JobId, job.Name, job.JobID, job.Name,
job.UserName, job.UserId, job.GroupId, job.UserName, job.UserID, job.GroupID,
job.Account, job.Qos, job.Account, job.QOS,
job.Requeue, job.RestartCnt, job.BatchFlag, job.Requeue, job.RestartCnt, job.BatchFlag,
job.TimeLimit, job.SubmitTime, job.TimeLimit, job.SubmitTime,
job.Partition, job.Partition,
job.Nodes, job.Nodes,
job.NodeCount, job.Cpus, job.Tasks, job.CpusPerTask, job.NodeCount, job.CPUs, job.Tasks, job.CPUPerTask,
job.TasksPerBoard, job.TasksPerSocket, job.TasksPerCore, job.TasksPerBoard, job.TasksPerSocket, job.TasksPerCore,
job.TresAllocStr, job.TresAllocStr,
job.TresAllocStr, job.TresAllocStr,
@ -207,19 +201,6 @@ func (cfg *SlurmRestSchedulerConfig) Init() error {
cfg.clusterConfig, err = DecodeClusterConfig("cluster-alex.json") cfg.clusterConfig, err = DecodeClusterConfig("cluster-alex.json")
// for k, v := range cfg.clusterConfig {
// fmt.Printf("Entry %q with value %x loaded\n", k, v)
// // switch c := v.(type) {
// // case string:
// // fmt.Printf("Item %q is a string, containing %q\n", k, c)
// // case float64:
// // fmt.Printf("Looks like item %q is a number, specifically %f\n", k, c)
// // default:
// // fmt.Printf("Not sure what type item %q is, but I think it might be %T\n", k, c)
// // }
// }
// fmt.Printf("Cluster Name: %q\n", cfg.clusterConfig["name"])
// Create an HTTP client // Create an HTTP client
client = &http.Client{} client = &http.Client{}
@ -293,26 +274,20 @@ func ConstructNodeAcceleratorMap(input string, accelerator string) map[string]st
return numberMap return numberMap
} }
func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V0038JobsResponse) { func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobs []Job) {
// Iterate over the Jobs slice // Iterate over the Jobs slice
for _, job := range jobsResponse.Jobs { for _, job := range jobs {
// Process each job // Process each job
fmt.Printf("Job ID: %d\n", *job.JobId) fmt.Printf("Job ID: %d\n", job.JobID)
fmt.Printf("Job Name: %s\n", *job.Name) fmt.Printf("Job Name: %s\n", job.Name)
fmt.Printf("Job State: %s\n", *job.JobState) fmt.Printf("Job State: %s\n", job.JobState)
fmt.Println("Job StartTime:", *job.StartTime) fmt.Println("Job StartTime:", job.StartTime)
fmt.Println("Job Cluster:", *job.Cluster) fmt.Println("Job Cluster:", job.Cluster)
// aquire lock to avoid race condition between API calls if job.JobState == "RUNNING" {
// var unlockOnce sync.Once
// cfg.RepositoryMutex.Lock()
// defer unlockOnce.Do(cfg.RepositoryMutex.Unlock)
// is "running" one of JSON state? // jobs, err := cfg.JobRepository.FindRunningJobs(job.Cluster)
if *job.JobState == "RUNNING" {
// jobs, err := cfg.JobRepository.FindRunningJobs(*job.Cluster)
// if err != nil { // if err != nil {
// log.Fatalf("Failed to find running jobs: %v", err) // log.Fatalf("Failed to find running jobs: %v", err)
// } // }
@ -333,18 +308,13 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V00
exclusive = 0 exclusive = 0
} }
jobResourcesInBytes, err := json.Marshal(*job.JobResources)
if err != nil {
log.Fatalf("JobResources JSON marshaling failed: %s", err)
}
var resources []*schema.Resource var resources []*schema.Resource
// Define a regular expression to match "gpu=x" // Define a regular expression to match "gpu=x"
regex := regexp.MustCompile(`gpu=(\d+)`) regex := regexp.MustCompile(`gpu=(\d+)`)
// Find all matches in the input string // Find all matches in the input string
matches := regex.FindAllStringSubmatch(*job.TresAllocStr, -1) matches := regex.FindAllStringSubmatch(job.TresAllocStr, -1)
// Initialize a variable to store the total number of GPUs // Initialize a variable to store the total number of GPUs
var totalGPUs int32 var totalGPUs int32
@ -357,15 +327,15 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V00
for _, node := range job.JobResources.AllocatedNodes { for _, node := range job.JobResources.AllocatedNodes {
var res schema.Resource var res schema.Resource
res.Hostname = *node.Nodename res.Hostname = node.Nodename
log.Debugf("Node %s V0038NodeAllocationSockets.Cores map size: %d\n", *node.Nodename, len(node.Sockets.Cores)) log.Debugf("Node %s Cores map size: %d\n", node.Nodename, len(node.Sockets))
if node.Cpus == nil || node.Memory == nil { if node.CPUsUsed == nil || node.MemoryAllocated == nil {
log.Fatalf("Either node.Cpus or node.Memory is nil\n") log.Fatalf("Either node.Cpus or node.Memory is nil\n")
} }
for k, v := range node.Sockets.Cores { for k, v := range node.Sockets {
fmt.Printf("core id[%s] value[%s]\n", k, v) fmt.Printf("core id[%s] value[%s]\n", k, v)
threadID, _ := strconv.Atoi(k) threadID, _ := strconv.Atoi(k)
res.HWThreads = append(res.HWThreads, threadID) res.HWThreads = append(res.HWThreads, threadID)
@ -373,12 +343,12 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V00
// cpu=512,mem=1875G,node=4,billing=512,gres\/gpu=32,gres\/gpu:a40=32 // cpu=512,mem=1875G,node=4,billing=512,gres\/gpu=32,gres\/gpu:a40=32
// For core/GPU id mapping, need to query from cluster config file // For core/GPU id mapping, need to query from cluster config file
res.Accelerators = append(res.Accelerators, *job.Comment) res.Accelerators = append(res.Accelerators, job.Comment)
resources = append(resources, &res) resources = append(resources, &res)
} }
metaData := make(map[string]string) metaData := make(map[string]string)
metaData["jobName"] = *job.Name metaData["jobName"] = job.Name
metaData["slurmInfo"] = printSlurmInfo(job) metaData["slurmInfo"] = printSlurmInfo(job)
// switch slurmPath := cfg.clusterConfig["slurm_path"].(type) { // switch slurmPath := cfg.clusterConfig["slurm_path"].(type) {
@ -397,26 +367,26 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V00
} }
var defaultJob schema.BaseJob = schema.BaseJob{ var defaultJob schema.BaseJob = schema.BaseJob{
JobID: int64(*job.JobId), JobID: job.JobID,
User: *job.UserName, User: job.UserName,
Project: *job.Account, Project: job.Account,
Cluster: *job.Cluster, Cluster: job.Cluster,
Partition: *job.Partition, Partition: job.Partition,
// check nil // check nil
ArrayJobId: int64(*job.ArrayJobId), ArrayJobId: job.ArrayJobID,
NumNodes: *job.NodeCount, NumNodes: job.NodeCount,
NumHWThreads: *job.Cpus, NumHWThreads: job.CPUs,
NumAcc: totalGPUs, NumAcc: totalGPUs,
Exclusive: exclusive, Exclusive: exclusive,
// MonitoringStatus: job.MonitoringStatus, // MonitoringStatus: job.MonitoringStatus,
// SMT: *job.TasksPerCore, // SMT: job.TasksPerCore,
State: schema.JobState(*job.JobState), State: schema.JobState(job.JobState),
// ignore this for start job // ignore this for start job
// Duration: int32(time.Now().Unix() - *job.StartTime), // or SubmitTime? // Duration: int32(time.Now().Unix() - job.StartTime), // or SubmitTime?
Walltime: time.Now().Unix(), // max duration requested by the job Walltime: time.Now().Unix(), // max duration requested by the job
// Tags: job.Tags, // Tags: job.Tags,
// ignore this! // ignore this!
RawResources: jobResourcesInBytes, // RawResources: jobResourcesInBytes,
// "job_resources": "allocated_nodes" "sockets": // "job_resources": "allocated_nodes" "sockets":
// very important; has to be right // very important; has to be right
Resources: resources, Resources: resources,
@ -429,7 +399,7 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V00
req := &schema.JobMeta{ req := &schema.JobMeta{
BaseJob: defaultJob, BaseJob: defaultJob,
StartTime: *job.StartTime, StartTime: job.StartTime,
Statistics: make(map[string]schema.JobStatistics), Statistics: make(map[string]schema.JobStatistics),
} }
log.Debugf("Generated JobMeta %v", req.BaseJob.JobID) log.Debugf("Generated JobMeta %v", req.BaseJob.JobID)
@ -445,41 +415,41 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V00
// } // }
} else { } else {
// Check if completed job with combination of (job_id, cluster_id, start_time) already exists: // Check if completed job with combination of (job_id, cluster_id, start_time) already exists:
var jobID int64 jobID := job.JobID
jobID = int64(*job.JobId) log.Debugf("jobID: %v Cluster: %v StartTime: %v", jobID, job.Cluster, job.StartTime)
log.Debugf("jobID: %v Cluster: %v StartTime: %v", jobID, *job.Cluster, *job.StartTime)
// commented out as it will cause panic // commented out as it will cause panic
// note down params invoked // note down params invoked
existingJob, err := cfg.JobRepository.Find(&jobID, job.Cluster, job.StartTime) // existingJob, err := cfg.JobRepository.Find(&jobID, &job.Cluster, &job.StartTime)
if err == nil { // if err == nil {
existingJob.BaseJob.Duration = int32(*job.EndTime - *job.StartTime) // existingJob.BaseJob.Duration = int32(job.EndTime - job.StartTime)
existingJob.BaseJob.State = schema.JobState(*job.JobState) // existingJob.BaseJob.State = schema.JobState(job.JobState)
existingJob.BaseJob.Walltime = *job.StartTime // existingJob.BaseJob.Walltime = job.StartTime
req := &StopJobRequest{ // req := &StopJobRequest{
Cluster: job.Cluster, // Cluster: &job.Cluster,
JobId: &jobID, // JobId: &jobID,
State: schema.JobState(*job.JobState), // State: schema.JobState(job.JobState),
StartTime: &existingJob.StartTimeUnix, // StartTime: &existingJob.StartTimeUnix,
StopTime: *job.EndTime, // StopTime: job.EndTime,
} // }
// req := new(schema.JobMeta) // // req := new(schema.JobMeta)
cfg.checkAndHandleStopJob(existingJob, req) // cfg.checkAndHandleStopJob(existingJob, req)
} // }
} }
} }
} }
func (cfg *SlurmRestSchedulerConfig) Sync() { func (cfg *SlurmRestSchedulerConfig) Sync() {
// Fetch an instance of V0037JobsResponse // Fetch an instance of JobsResponse
jobsResponse, err := queryAllJobsLocal() jobsResponse, err := queryAllJobsLocal()
if err != nil { if err != nil {
log.Fatal(err.Error()) log.Fatal(err.Error())
} }
cfg.HandleJobsResponse(jobsResponse) cfg.HandleJobsResponse(jobsResponse.Jobs)
} }