Added SlurmDB Payload

This commit is contained in:
Bole Ma 2023-10-26 16:22:26 +02:00
parent 9fe497173d
commit 6ac1182c06
2 changed files with 390 additions and 161 deletions

View File

@ -26,7 +26,6 @@ type MetricConfig struct {
Caution float64 `json:"caution"` Caution float64 `json:"caution"`
Alert float64 `json:"alert"` Alert float64 `json:"alert"`
} }
type SubCluster struct { type SubCluster struct {
Name string `json:"name"` Name string `json:"name"`
Nodes string `json:"nodes"` Nodes string `json:"nodes"`
@ -74,7 +73,7 @@ type ClusterConfig struct {
SubClusters []SubCluster `json:"subClusters"` SubClusters []SubCluster `json:"subClusters"`
} }
type Meta struct { type Metadata struct {
Plugin struct { Plugin struct {
Type string `json:"type"` Type string `json:"type"`
Name string `json:"name"` Name string `json:"name"`
@ -184,7 +183,7 @@ type Job struct {
PreSUSTime int `json:"pre_sus_time"` PreSUSTime int `json:"pre_sus_time"`
Priority int `json:"priority"` Priority int `json:"priority"`
Profile interface{} `json:"profile"` Profile interface{} `json:"profile"`
QOS string `json:"qos"` QoS string `json:"qos"`
Reboot bool `json:"reboot"` Reboot bool `json:"reboot"`
RequiredNodes string `json:"required_nodes"` RequiredNodes string `json:"required_nodes"`
Requeue bool `json:"requeue"` Requeue bool `json:"requeue"`
@ -222,11 +221,193 @@ type Job struct {
} }
type SlurmPayload struct { type SlurmPayload struct {
Meta Meta `json:"meta"` Meta Metadata `json:"meta"`
Errors []interface{} `json:"errors"` Errors []interface{} `json:"errors"`
Jobs []Job `json:"jobs"` Jobs []Job `json:"jobs"`
} }
type DumpedComment struct {
Administrator interface{} `json:"administrator"`
Job interface{} `json:"job"`
System interface{} `json:"system"`
}
type MaxLimits struct {
Running struct {
Tasks int `json:"tasks"`
} `json:"max"`
}
type ArrayInfo struct {
JobID int `json:"job_id"`
Limits MaxLimits `json:"limits"`
Task interface{} `json:"task"`
TaskID interface{} `json:"task_id"`
}
type Association struct {
Account string `json:"account"`
Cluster string `json:"cluster"`
Partition interface{} `json:"partition"`
User string `json:"user"`
}
type TimeInfo struct {
Elapsed int64 `json:"elapsed"`
Eligible int64 `json:"eligible"`
End int64 `json:"end"`
Start int64 `json:"start"`
Submission int64 `json:"submission"`
Suspended int64 `json:"suspended"`
System struct {
Seconds int `json:"seconds"`
Microseconds int `json:"microseconds"`
} `json:"system"`
Limit int `json:"limit"`
Total struct {
Seconds int `json:"seconds"`
Microseconds int `json:"microseconds"`
} `json:"total"`
User struct {
Seconds int `json:"seconds"`
Microseconds int `json:"microseconds"`
} `json:"user"`
}
type ExitCode struct {
Status string `json:"status"`
ReturnCode int `json:"return_code"`
}
type DumpedJob struct {
Account string `json:"account"`
Comment DumpedComment `json:"comment"`
AllocationNodes int `json:"allocation_nodes"`
Array ArrayInfo `json:"array"`
Association Association `json:"association"`
Cluster string `json:"cluster"`
Constraints string `json:"constraints"`
Container interface{} `json:"container"`
DerivedExitCode ExitCode `json:"derived_exit_code"`
Time TimeInfo `json:"time"`
ExitCode ExitCode `json:"exit_code"`
Flags []string `json:"flags"`
Group string `json:"group"`
Het struct {
JobID int `json:"job_id"`
JobOffset interface{} `json:"job_offset"`
} `json:"het"`
JobID int64 `json:"job_id"`
Name string `json:"name"`
MCS struct {
Label string `json:"label"`
} `json:"mcs"`
Nodes string `json:"nodes"`
Partition string `json:"partition"`
Priority int `json:"priority"`
QoS string `json:"qos"`
Required struct {
CPUs int `json:"CPUs"`
Memory int `json:"memory"`
} `json:"required"`
KillRequestUser interface{} `json:"kill_request_user"`
Reservation struct {
ID int `json:"id"`
Name int `json:"name"`
} `json:"reservation"`
State struct {
Current string `json:"current"`
Reason string `json:"reason"`
} `json:"state"`
Steps []struct {
Nodes struct {
List []string `json:"list"`
Count int `json:"count"`
Range string `json:"range"`
} `json:"nodes"`
Tres struct {
Requested struct {
Max []interface{} `json:"max"`
Min []interface{} `json:"min"`
Average []interface{} `json:"average"`
Total []interface{} `json:"total"`
} `json:"requested"`
Consumed struct {
Max []interface{} `json:"max"`
Min []interface{} `json:"min"`
Average []interface{} `json:"average"`
Total []interface{} `json:"total"`
} `json:"consumed"`
Allocated []struct {
Type string `json:"type"`
Name interface{} `json:"name"`
ID int `json:"id"`
Count int `json:"count"`
} `json:"allocated"`
} `json:"tres"`
Time TimeInfo `json:"time"`
ExitCode ExitCode `json:"exit_code"`
Tasks struct {
Count int `json:"count"`
} `json:"tasks"`
PID interface{} `json:"pid"`
CPU struct {
RequestedFrequency struct {
Min int `json:"min"`
Max int `json:"max"`
} `json:"requested_frequency"`
Governor []interface{} `json:"governor"`
} `json:"CPU"`
KillRequestUser interface{} `json:"kill_request_user"`
State string `json:"state"`
Statistics struct {
CPU struct {
ActualFrequency int `json:"actual_frequency"`
} `json:"CPU"`
Energy struct {
Consumed int `json:"consumed"`
} `json:"energy"`
} `json:"statistics"`
Step struct {
JobID int `json:"job_id"`
Het struct {
Component interface{} `json:"component"`
} `json:"het"`
ID string `json:"id"`
Name string `json:"name"`
} `json:"step"`
Task struct {
Distribution string `json:"distribution"`
} `json:"task"`
} `json:"steps"`
Tres struct {
Allocated []struct {
Type string `json:"type"`
Name interface{} `json:"name"`
ID int `json:"id"`
Count int `json:"count"`
} `json:"allocated"`
Requested []struct {
Type string `json:"type"`
Name interface{} `json:"name"`
ID int `json:"id"`
Count int `json:"count"`
} `json:"requested"`
} `json:"tres"`
User string `json:"user"`
Wckey struct {
Wckey string `json:"wckey"`
Flags []string `json:"flags"`
} `json:"wckey"`
WorkingDirectory string `json:"working_directory"`
}
type SlurmDBPayload struct {
Meta Metadata `json:"meta"`
Errors []string `json:"errors"`
Jobs []DumpedJob `json:"jobs"`
}
func DecodeClusterConfig(filename string) (ClusterConfig, error) { func DecodeClusterConfig(filename string) (ClusterConfig, error) {
var clusterConfig ClusterConfig var clusterConfig ClusterConfig

View File

@ -88,7 +88,7 @@ func queryDB(qtime int64, clusterName string) ([]interface{}, error) {
return jobs, nil return jobs, nil
} }
func queryAllJobs() (SlurmPayload, error) { func fetchJobs() (SlurmPayload, error) {
var ctlOutput []byte var ctlOutput []byte
apiEndpoint := "http://:8080/slurm/v0.0.38/jobs" apiEndpoint := "http://:8080/slurm/v0.0.38/jobs"
@ -126,18 +126,36 @@ func queryAllJobs() (SlurmPayload, error) {
return jobsResponse, nil return jobsResponse, nil
} }
func queryAllJobsLocal() (SlurmPayload, error) { func fetchJobsLocal() (SlurmPayload, error) {
// Read the JSON file // Read the Slurm Payload JSON file
jobsData, err := os.ReadFile("slurm_0038.json") jobsData, err := os.ReadFile("slurm_0038.json")
if err != nil { if err != nil {
fmt.Println("Error reading JSON file:", err) fmt.Println("Error reading Slurm Payload JSON file:", err)
} }
var jobsResponse SlurmPayload var jobsResponse SlurmPayload
err = json.Unmarshal(jobsData, &jobsResponse) err = json.Unmarshal(jobsData, &jobsResponse)
if err != nil { if err != nil {
log.Errorf("Error parsing JSON response: %v", err) log.Errorf("Error parsing Slurm Payload JSON response: %v", err)
return jobsResponse, err
}
return jobsResponse, nil
}
func fetchDumpedJobsLocal() (SlurmDBPayload, error) {
// Read the SlurmDB Payload JSON file
jobsData, err := os.ReadFile("slurmdb_0038-large.json")
if err != nil {
fmt.Println("Error reading SlurmDB Payload JSON file:", err)
}
var jobsResponse SlurmDBPayload
err = json.Unmarshal(jobsData, &jobsResponse)
if err != nil {
log.Errorf("Error parsing SlurmDB Payload JSON response: %v", err)
return jobsResponse, err return jobsResponse, err
} }
@ -165,7 +183,7 @@ func printSlurmInfo(job Job) string {
StdOut=%v`, StdOut=%v`,
job.JobID, job.Name, job.JobID, job.Name,
job.UserName, job.UserID, job.GroupID, job.UserName, job.UserID, job.GroupID,
job.Account, job.QOS, job.Account, job.QoS,
job.Requeue, job.RestartCnt, job.BatchFlag, job.Requeue, job.RestartCnt, job.BatchFlag,
job.TimeLimit, job.SubmitTime, job.TimeLimit, job.SubmitTime,
job.Partition, job.Partition,
@ -191,7 +209,7 @@ func exitWithError(err error, output []byte) {
fmt.Fprintf(os.Stderr, "ERROR: API call failed with code %d;\nOutput:\n%s\n", exitError.ExitCode(), output) fmt.Fprintf(os.Stderr, "ERROR: API call failed with code %d;\nOutput:\n%s\n", exitError.ExitCode(), output)
} }
} else { } else {
log.Errorf("ERROR:", err) log.Errorf("ERROR: %v", err)
} }
os.Exit(1) os.Exit(1)
} }
@ -274,33 +292,8 @@ func ConstructNodeAcceleratorMap(input string, accelerator string) map[string]st
return numberMap return numberMap
} }
func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobs []Job) { func CreateJobMeta(job Job) *schema.JobMeta {
// Iterate over the Jobs slice
for _, job := range jobs {
// Process each job
fmt.Printf("Job ID: %d\n", job.JobID)
fmt.Printf("Job Name: %s\n", job.Name)
fmt.Printf("Job State: %s\n", job.JobState)
fmt.Println("Job StartTime:", job.StartTime)
fmt.Println("Job Cluster:", job.Cluster)
if job.JobState == "RUNNING" {
// jobs, err := cfg.JobRepository.FindRunningJobs(job.Cluster)
// if err != nil {
// log.Fatalf("Failed to find running jobs: %v", err)
// }
// for id, job := range jobs {
// fmt.Printf("Job ID: %d, Job: %+v\n", id, job)
// }
// if err != nil || err != sql.ErrNoRows {
// log.Errorf("checking for duplicate failed: %s", err.Error())
// return
// } else if err == nil {
// if len(jobs) == 0 {
var exclusive int32 var exclusive int32
if job.Shared == nil { if job.Shared == nil {
exclusive = 1 exclusive = 1
@ -351,16 +344,6 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobs []Job) {
metaData["jobName"] = job.Name metaData["jobName"] = job.Name
metaData["slurmInfo"] = printSlurmInfo(job) metaData["slurmInfo"] = printSlurmInfo(job)
// switch slurmPath := cfg.clusterConfig["slurm_path"].(type) {
// case string:
// commandCtlScriptTpl := fmt.Sprintf("%sscontrol -M %%s write batch_script %%s -", slurmPath)
// queryJobScript := fmt.Sprintf(commandCtlScriptTpl, job.Cluster, job.JobId)
// metaData["jobScript"] = queryJobScript
// default:
// // Type assertion failed
// fmt.Println("Conversion of slurm_path to string failed", cfg.clusterConfig["slurm_path"])
// }
metaDataInBytes, err := json.Marshal(metaData) metaDataInBytes, err := json.Marshal(metaData)
if err != nil { if err != nil {
log.Fatalf("metaData JSON marshaling failed: %s", err) log.Fatalf("metaData JSON marshaling failed: %s", err)
@ -397,59 +380,124 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobs []Job) {
} }
log.Debugf("Generated BaseJob with Resources=%v", defaultJob.Resources[0]) log.Debugf("Generated BaseJob with Resources=%v", defaultJob.Resources[0])
req := &schema.JobMeta{ meta := &schema.JobMeta{
BaseJob: defaultJob, BaseJob: defaultJob,
StartTime: job.StartTime, StartTime: job.StartTime,
Statistics: make(map[string]schema.JobStatistics), Statistics: make(map[string]schema.JobStatistics),
} }
log.Debugf("Generated JobMeta %v", req.BaseJob.JobID) // log.Debugf("Generated JobMeta %v", req.BaseJob.JobID)
// req := new(schema.JobMeta) return meta
// id, err := cfg.JobRepository.Start(req) }
// log.Debugf("Added %v", id)
// } else {
// for _, job := range jobs {
// log.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID)
// }
// }
// }
} else {
// Check if completed job with combination of (job_id, cluster_id, start_time) already exists:
jobID := job.JobID
log.Debugf("jobID: %v Cluster: %v StartTime: %v", jobID, job.Cluster, job.StartTime)
// commented out as it will cause panic
// note down params invoked
// existingJob, err := cfg.JobRepository.Find(&jobID, &job.Cluster, &job.StartTime) func (cfg *SlurmRestSchedulerConfig) HandleJobs(jobs []Job) error {
// if err == nil { // runningJobsInCC, err := cfg.JobRepository.FindRunningJobs("alex")
// existingJob.BaseJob.Duration = int32(job.EndTime - job.StartTime)
// existingJob.BaseJob.State = schema.JobState(job.JobState)
// existingJob.BaseJob.Walltime = job.StartTime
// req := &StopJobRequest{ // Iterate over the Jobs slice
// Cluster: &job.Cluster, for _, job := range jobs {
// JobId: &jobID, // Process each job from Slurm
// State: schema.JobState(job.JobState), fmt.Printf("Job ID: %d\n", job.JobID)
// StartTime: &existingJob.StartTimeUnix, fmt.Printf("Job Name: %s\n", job.Name)
// StopTime: job.EndTime, fmt.Printf("Job State: %s\n", job.JobState)
// } fmt.Println("Job StartTime:", job.StartTime)
// // req := new(schema.JobMeta) fmt.Println("Job Cluster:", job.Cluster)
// cfg.checkAndHandleStopJob(existingJob, req)
// }
if job.JobState == "RUNNING" {
meta := CreateJobMeta(job)
// For all running jobs from Slurm
_, notFoundError := cfg.JobRepository.Find(&job.JobID, &job.Cluster, &job.StartTime)
if notFoundError != nil {
// if it does not exist in CC, create a new entry
log.Print("Job does not exist in CC, will create a new entry:", job.JobID)
id, startJobError := cfg.JobRepository.Start(meta)
if startJobError != nil {
return startJobError
} }
log.Debug("Added job", id)
} }
// Running in both sides: nothing needs to be done
} else if job.JobState == "COMPLETED" {
// Check if completed job with combination of (job_id, cluster_id, start_time) already exists
log.Debugf("Processing completed job ID: %v Cluster: %v StartTime: %v", job.JobID, job.Cluster, job.StartTime)
existingJob, err := cfg.JobRepository.Find(&job.JobID, &job.Cluster, &job.StartTime)
if err == nil && existingJob.State != schema.JobStateCompleted {
// for jobs completed in Slurm (either in Slurm or maybe SlurmDB)
// update job in CC with new info (job final status, duration, end timestamp)
existingJob.BaseJob.Duration = int32(job.EndTime - job.StartTime)
existingJob.BaseJob.State = schema.JobState(job.JobState)
existingJob.BaseJob.Walltime = job.EndTime
req := &StopJobRequest{
Cluster: &job.Cluster,
JobId: &job.JobID,
State: schema.JobState(job.JobState),
StartTime: &job.StartTime,
StopTime: job.EndTime,
}
cfg.checkAndHandleStopJob(existingJob, req)
}
}
}
return nil
}
func (cfg *SlurmRestSchedulerConfig) HandleDumpedJobs(jobs []DumpedJob) error {
// Iterate over the Jobs slice
for _, job := range jobs {
// Process each job from Slurm
fmt.Printf("Job ID: %d\n", job.JobID)
fmt.Printf("Job Name: %s\n", job.Name)
fmt.Printf("Job State: %s\n", job.State.Current)
fmt.Println("Job EndTime:", job.Time.End)
fmt.Println("Job Cluster:", job.Cluster)
// Check if completed job with combination of (job_id, cluster_id, start_time) already exists
log.Debugf("Processing completed dumped job ID: %v Cluster: %v StartTime: %v", job.JobID, job.Cluster, job.Time.Start)
existingJob, err := cfg.JobRepository.Find(&job.JobID, &job.Cluster, &job.Time.Start)
if err == nil && existingJob.State != schema.JobStateCompleted {
// for jobs completed in Slurm (either in Slurm or maybe SlurmDB)
// update job in CC with new info (job final status, duration, end timestamp)
existingJob.BaseJob.Duration = int32(job.Time.End - job.Time.Start)
existingJob.BaseJob.State = schema.JobState(job.State.Current)
existingJob.BaseJob.Walltime = job.Time.End
req := &StopJobRequest{
Cluster: &job.Cluster,
JobId: &job.JobID,
State: schema.JobState(job.State.Current),
StartTime: &job.Time.Start,
StopTime: job.Time.End,
}
cfg.checkAndHandleStopJob(existingJob, req)
}
}
return nil
} }
func (cfg *SlurmRestSchedulerConfig) Sync() { func (cfg *SlurmRestSchedulerConfig) Sync() {
// Fetch an instance of JobsResponse // Fetch an instance of Slurm JobsResponse
jobsResponse, err := queryAllJobsLocal() jobsResponse, err := fetchJobsLocal()
if err != nil { if err != nil {
log.Fatal(err.Error()) log.Fatal(err.Error())
} }
cfg.HandleJobsResponse(jobsResponse.Jobs) cfg.HandleJobs(jobsResponse.Jobs)
// Fetch an instance of Slurm DB JobsResponse
dumpedJobsResponse, err := fetchDumpedJobsLocal()
if err != nil {
log.Fatal(err.Error())
}
cfg.HandleDumpedJobs(dumpedJobsResponse.Jobs)
} }