From 6ac1182c06d8707b6eaa7f8ff671473a01942a8c Mon Sep 17 00:00:00 2001 From: Bole Ma Date: Thu, 26 Oct 2023 16:22:26 +0200 Subject: [PATCH] Added SlurmDB Payload --- internal/scheduler/payloadConverter.go | 189 ++++++++++++- internal/scheduler/slurmRest.go | 362 ++++++++++++++----------- 2 files changed, 390 insertions(+), 161 deletions(-) diff --git a/internal/scheduler/payloadConverter.go b/internal/scheduler/payloadConverter.go index 7d63297..6e9dfb8 100644 --- a/internal/scheduler/payloadConverter.go +++ b/internal/scheduler/payloadConverter.go @@ -26,7 +26,6 @@ type MetricConfig struct { Caution float64 `json:"caution"` Alert float64 `json:"alert"` } - type SubCluster struct { Name string `json:"name"` Nodes string `json:"nodes"` @@ -74,7 +73,7 @@ type ClusterConfig struct { SubClusters []SubCluster `json:"subClusters"` } -type Meta struct { +type Metadata struct { Plugin struct { Type string `json:"type"` Name string `json:"name"` @@ -184,7 +183,7 @@ type Job struct { PreSUSTime int `json:"pre_sus_time"` Priority int `json:"priority"` Profile interface{} `json:"profile"` - QOS string `json:"qos"` + QoS string `json:"qos"` Reboot bool `json:"reboot"` RequiredNodes string `json:"required_nodes"` Requeue bool `json:"requeue"` @@ -222,11 +221,193 @@ type Job struct { } type SlurmPayload struct { - Meta Meta `json:"meta"` + Meta Metadata `json:"meta"` Errors []interface{} `json:"errors"` Jobs []Job `json:"jobs"` } +type DumpedComment struct { + Administrator interface{} `json:"administrator"` + Job interface{} `json:"job"` + System interface{} `json:"system"` +} + +type MaxLimits struct { + Running struct { + Tasks int `json:"tasks"` + } `json:"max"` +} + +type ArrayInfo struct { + JobID int `json:"job_id"` + Limits MaxLimits `json:"limits"` + Task interface{} `json:"task"` + TaskID interface{} `json:"task_id"` +} + +type Association struct { + Account string `json:"account"` + Cluster string `json:"cluster"` + Partition interface{} `json:"partition"` + User string `json:"user"` +} + +type TimeInfo struct { + Elapsed int64 `json:"elapsed"` + Eligible int64 `json:"eligible"` + End int64 `json:"end"` + Start int64 `json:"start"` + Submission int64 `json:"submission"` + Suspended int64 `json:"suspended"` + System struct { + Seconds int `json:"seconds"` + Microseconds int `json:"microseconds"` + } `json:"system"` + Limit int `json:"limit"` + Total struct { + Seconds int `json:"seconds"` + Microseconds int `json:"microseconds"` + } `json:"total"` + User struct { + Seconds int `json:"seconds"` + Microseconds int `json:"microseconds"` + } `json:"user"` +} + +type ExitCode struct { + Status string `json:"status"` + ReturnCode int `json:"return_code"` +} + +type DumpedJob struct { + Account string `json:"account"` + Comment DumpedComment `json:"comment"` + AllocationNodes int `json:"allocation_nodes"` + Array ArrayInfo `json:"array"` + Association Association `json:"association"` + Cluster string `json:"cluster"` + Constraints string `json:"constraints"` + Container interface{} `json:"container"` + DerivedExitCode ExitCode `json:"derived_exit_code"` + Time TimeInfo `json:"time"` + ExitCode ExitCode `json:"exit_code"` + Flags []string `json:"flags"` + Group string `json:"group"` + Het struct { + JobID int `json:"job_id"` + JobOffset interface{} `json:"job_offset"` + } `json:"het"` + JobID int64 `json:"job_id"` + Name string `json:"name"` + MCS struct { + Label string `json:"label"` + } `json:"mcs"` + Nodes string `json:"nodes"` + Partition string `json:"partition"` + Priority int `json:"priority"` + QoS string `json:"qos"` + Required struct { + CPUs int `json:"CPUs"` + Memory int `json:"memory"` + } `json:"required"` + KillRequestUser interface{} `json:"kill_request_user"` + Reservation struct { + ID int `json:"id"` + Name int `json:"name"` + } `json:"reservation"` + State struct { + Current string `json:"current"` + Reason string `json:"reason"` + } `json:"state"` + Steps []struct { + Nodes struct { + List []string `json:"list"` + Count int `json:"count"` + Range string `json:"range"` + } `json:"nodes"` + Tres struct { + Requested struct { + Max []interface{} `json:"max"` + Min []interface{} `json:"min"` + Average []interface{} `json:"average"` + Total []interface{} `json:"total"` + } `json:"requested"` + Consumed struct { + Max []interface{} `json:"max"` + Min []interface{} `json:"min"` + Average []interface{} `json:"average"` + Total []interface{} `json:"total"` + } `json:"consumed"` + Allocated []struct { + Type string `json:"type"` + Name interface{} `json:"name"` + ID int `json:"id"` + Count int `json:"count"` + } `json:"allocated"` + } `json:"tres"` + Time TimeInfo `json:"time"` + ExitCode ExitCode `json:"exit_code"` + Tasks struct { + Count int `json:"count"` + } `json:"tasks"` + PID interface{} `json:"pid"` + CPU struct { + RequestedFrequency struct { + Min int `json:"min"` + Max int `json:"max"` + } `json:"requested_frequency"` + Governor []interface{} `json:"governor"` + } `json:"CPU"` + KillRequestUser interface{} `json:"kill_request_user"` + State string `json:"state"` + Statistics struct { + CPU struct { + ActualFrequency int `json:"actual_frequency"` + } `json:"CPU"` + Energy struct { + Consumed int `json:"consumed"` + } `json:"energy"` + } `json:"statistics"` + Step struct { + JobID int `json:"job_id"` + Het struct { + Component interface{} `json:"component"` + } `json:"het"` + ID string `json:"id"` + Name string `json:"name"` + } `json:"step"` + Task struct { + Distribution string `json:"distribution"` + } `json:"task"` + } `json:"steps"` + Tres struct { + Allocated []struct { + Type string `json:"type"` + Name interface{} `json:"name"` + ID int `json:"id"` + Count int `json:"count"` + } `json:"allocated"` + Requested []struct { + Type string `json:"type"` + Name interface{} `json:"name"` + ID int `json:"id"` + Count int `json:"count"` + } `json:"requested"` + } `json:"tres"` + User string `json:"user"` + Wckey struct { + Wckey string `json:"wckey"` + Flags []string `json:"flags"` + } `json:"wckey"` + WorkingDirectory string `json:"working_directory"` +} + +type SlurmDBPayload struct { + Meta Metadata `json:"meta"` + Errors []string `json:"errors"` + Jobs []DumpedJob `json:"jobs"` +} + func DecodeClusterConfig(filename string) (ClusterConfig, error) { var clusterConfig ClusterConfig diff --git a/internal/scheduler/slurmRest.go b/internal/scheduler/slurmRest.go index 9795c64..63b48fe 100644 --- a/internal/scheduler/slurmRest.go +++ b/internal/scheduler/slurmRest.go @@ -88,7 +88,7 @@ func queryDB(qtime int64, clusterName string) ([]interface{}, error) { return jobs, nil } -func queryAllJobs() (SlurmPayload, error) { +func fetchJobs() (SlurmPayload, error) { var ctlOutput []byte apiEndpoint := "http://:8080/slurm/v0.0.38/jobs" @@ -126,18 +126,36 @@ func queryAllJobs() (SlurmPayload, error) { return jobsResponse, nil } -func queryAllJobsLocal() (SlurmPayload, error) { - // Read the JSON file +func fetchJobsLocal() (SlurmPayload, error) { + // Read the Slurm Payload JSON file jobsData, err := os.ReadFile("slurm_0038.json") if err != nil { - fmt.Println("Error reading JSON file:", err) + fmt.Println("Error reading Slurm Payload JSON file:", err) } var jobsResponse SlurmPayload err = json.Unmarshal(jobsData, &jobsResponse) if err != nil { - log.Errorf("Error parsing JSON response: %v", err) + log.Errorf("Error parsing Slurm Payload JSON response: %v", err) + return jobsResponse, err + } + + return jobsResponse, nil +} + +func fetchDumpedJobsLocal() (SlurmDBPayload, error) { + // Read the SlurmDB Payload JSON file + jobsData, err := os.ReadFile("slurmdb_0038-large.json") + + if err != nil { + fmt.Println("Error reading SlurmDB Payload JSON file:", err) + } + + var jobsResponse SlurmDBPayload + err = json.Unmarshal(jobsData, &jobsResponse) + if err != nil { + log.Errorf("Error parsing SlurmDB Payload JSON response: %v", err) return jobsResponse, err } @@ -165,7 +183,7 @@ func printSlurmInfo(job Job) string { StdOut=%v`, job.JobID, job.Name, job.UserName, job.UserID, job.GroupID, - job.Account, job.QOS, + job.Account, job.QoS, job.Requeue, job.RestartCnt, job.BatchFlag, job.TimeLimit, job.SubmitTime, job.Partition, @@ -191,7 +209,7 @@ func exitWithError(err error, output []byte) { fmt.Fprintf(os.Stderr, "ERROR: API call failed with code %d;\nOutput:\n%s\n", exitError.ExitCode(), output) } } else { - log.Errorf("ERROR:", err) + log.Errorf("ERROR: %v", err) } os.Exit(1) } @@ -274,11 +292,111 @@ func ConstructNodeAcceleratorMap(input string, accelerator string) map[string]st return numberMap } -func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobs []Job) { +func CreateJobMeta(job Job) *schema.JobMeta { + + var exclusive int32 + if job.Shared == nil { + exclusive = 1 + } else { + exclusive = 0 + } + + var resources []*schema.Resource + + // Define a regular expression to match "gpu=x" + regex := regexp.MustCompile(`gpu=(\d+)`) + + // Find all matches in the input string + matches := regex.FindAllStringSubmatch(job.TresAllocStr, -1) + + // Initialize a variable to store the total number of GPUs + var totalGPUs int32 + // Iterate through the matches + match := matches[0] + if len(match) == 2 { + gpuCount, _ := strconv.Atoi(match[1]) + totalGPUs += int32(gpuCount) + } + + for _, node := range job.JobResources.AllocatedNodes { + var res schema.Resource + res.Hostname = node.Nodename + + log.Debugf("Node %s Cores map size: %d\n", node.Nodename, len(node.Sockets)) + + if node.CPUsUsed == nil || node.MemoryAllocated == nil { + log.Fatalf("Either node.Cpus or node.Memory is nil\n") + } + + for k, v := range node.Sockets { + fmt.Printf("core id[%s] value[%s]\n", k, v) + threadID, _ := strconv.Atoi(k) + res.HWThreads = append(res.HWThreads, threadID) + } + + // cpu=512,mem=1875G,node=4,billing=512,gres\/gpu=32,gres\/gpu:a40=32 + // For core/GPU id mapping, need to query from cluster config file + res.Accelerators = append(res.Accelerators, job.Comment) + resources = append(resources, &res) + } + + metaData := make(map[string]string) + metaData["jobName"] = job.Name + metaData["slurmInfo"] = printSlurmInfo(job) + + metaDataInBytes, err := json.Marshal(metaData) + if err != nil { + log.Fatalf("metaData JSON marshaling failed: %s", err) + } + + var defaultJob schema.BaseJob = schema.BaseJob{ + JobID: job.JobID, + User: job.UserName, + Project: job.Account, + Cluster: job.Cluster, + Partition: job.Partition, + // check nil + ArrayJobId: job.ArrayJobID, + NumNodes: job.NodeCount, + NumHWThreads: job.CPUs, + NumAcc: totalGPUs, + Exclusive: exclusive, + // MonitoringStatus: job.MonitoringStatus, + // SMT: job.TasksPerCore, + State: schema.JobState(job.JobState), + // ignore this for start job + // Duration: int32(time.Now().Unix() - job.StartTime), // or SubmitTime? + Walltime: time.Now().Unix(), // max duration requested by the job + // Tags: job.Tags, + // ignore this! + // RawResources: jobResourcesInBytes, + // "job_resources": "allocated_nodes" "sockets": + // very important; has to be right + Resources: resources, + RawMetaData: metaDataInBytes, + // optional metadata with'jobScript 'jobName': 'slurmInfo': + MetaData: metaData, + // ConcurrentJobs: job.ConcurrentJobs, + } + log.Debugf("Generated BaseJob with Resources=%v", defaultJob.Resources[0]) + + meta := &schema.JobMeta{ + BaseJob: defaultJob, + StartTime: job.StartTime, + Statistics: make(map[string]schema.JobStatistics), + } + // log.Debugf("Generated JobMeta %v", req.BaseJob.JobID) + + return meta +} + +func (cfg *SlurmRestSchedulerConfig) HandleJobs(jobs []Job) error { + + // runningJobsInCC, err := cfg.JobRepository.FindRunningJobs("alex") // Iterate over the Jobs slice for _, job := range jobs { - // Process each job + // Process each job from Slurm fmt.Printf("Job ID: %d\n", job.JobID) fmt.Printf("Job Name: %s\n", job.Name) fmt.Printf("Job State: %s\n", job.JobState) @@ -287,169 +405,99 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobs []Job) { if job.JobState == "RUNNING" { - // jobs, err := cfg.JobRepository.FindRunningJobs(job.Cluster) - // if err != nil { - // log.Fatalf("Failed to find running jobs: %v", err) - // } + meta := CreateJobMeta(job) - // for id, job := range jobs { - // fmt.Printf("Job ID: %d, Job: %+v\n", id, job) - // } + // For all running jobs from Slurm + _, notFoundError := cfg.JobRepository.Find(&job.JobID, &job.Cluster, &job.StartTime) - // if err != nil || err != sql.ErrNoRows { - // log.Errorf("checking for duplicate failed: %s", err.Error()) - // return - // } else if err == nil { - // if len(jobs) == 0 { - var exclusive int32 - if job.Shared == nil { - exclusive = 1 - } else { - exclusive = 0 - } - - var resources []*schema.Resource - - // Define a regular expression to match "gpu=x" - regex := regexp.MustCompile(`gpu=(\d+)`) - - // Find all matches in the input string - matches := regex.FindAllStringSubmatch(job.TresAllocStr, -1) - - // Initialize a variable to store the total number of GPUs - var totalGPUs int32 - // Iterate through the matches - match := matches[0] - if len(match) == 2 { - gpuCount, _ := strconv.Atoi(match[1]) - totalGPUs += int32(gpuCount) - } - - for _, node := range job.JobResources.AllocatedNodes { - var res schema.Resource - res.Hostname = node.Nodename - - log.Debugf("Node %s Cores map size: %d\n", node.Nodename, len(node.Sockets)) - - if node.CPUsUsed == nil || node.MemoryAllocated == nil { - log.Fatalf("Either node.Cpus or node.Memory is nil\n") + if notFoundError != nil { + // if it does not exist in CC, create a new entry + log.Print("Job does not exist in CC, will create a new entry:", job.JobID) + id, startJobError := cfg.JobRepository.Start(meta) + if startJobError != nil { + return startJobError } + log.Debug("Added job", id) + } - for k, v := range node.Sockets { - fmt.Printf("core id[%s] value[%s]\n", k, v) - threadID, _ := strconv.Atoi(k) - res.HWThreads = append(res.HWThreads, threadID) + // Running in both sides: nothing needs to be done + } else if job.JobState == "COMPLETED" { + // Check if completed job with combination of (job_id, cluster_id, start_time) already exists + log.Debugf("Processing completed job ID: %v Cluster: %v StartTime: %v", job.JobID, job.Cluster, job.StartTime) + existingJob, err := cfg.JobRepository.Find(&job.JobID, &job.Cluster, &job.StartTime) + + if err == nil && existingJob.State != schema.JobStateCompleted { + // for jobs completed in Slurm (either in Slurm or maybe SlurmDB) + // update job in CC with new info (job final status, duration, end timestamp) + + existingJob.BaseJob.Duration = int32(job.EndTime - job.StartTime) + existingJob.BaseJob.State = schema.JobState(job.JobState) + existingJob.BaseJob.Walltime = job.EndTime + + req := &StopJobRequest{ + Cluster: &job.Cluster, + JobId: &job.JobID, + State: schema.JobState(job.JobState), + StartTime: &job.StartTime, + StopTime: job.EndTime, } - - // cpu=512,mem=1875G,node=4,billing=512,gres\/gpu=32,gres\/gpu:a40=32 - // For core/GPU id mapping, need to query from cluster config file - res.Accelerators = append(res.Accelerators, job.Comment) - resources = append(resources, &res) + cfg.checkAndHandleStopJob(existingJob, req) } - - metaData := make(map[string]string) - metaData["jobName"] = job.Name - metaData["slurmInfo"] = printSlurmInfo(job) - - // switch slurmPath := cfg.clusterConfig["slurm_path"].(type) { - // case string: - // commandCtlScriptTpl := fmt.Sprintf("%sscontrol -M %%s write batch_script %%s -", slurmPath) - // queryJobScript := fmt.Sprintf(commandCtlScriptTpl, job.Cluster, job.JobId) - // metaData["jobScript"] = queryJobScript - // default: - // // Type assertion failed - // fmt.Println("Conversion of slurm_path to string failed", cfg.clusterConfig["slurm_path"]) - // } - - metaDataInBytes, err := json.Marshal(metaData) - if err != nil { - log.Fatalf("metaData JSON marshaling failed: %s", err) - } - - var defaultJob schema.BaseJob = schema.BaseJob{ - JobID: job.JobID, - User: job.UserName, - Project: job.Account, - Cluster: job.Cluster, - Partition: job.Partition, - // check nil - ArrayJobId: job.ArrayJobID, - NumNodes: job.NodeCount, - NumHWThreads: job.CPUs, - NumAcc: totalGPUs, - Exclusive: exclusive, - // MonitoringStatus: job.MonitoringStatus, - // SMT: job.TasksPerCore, - State: schema.JobState(job.JobState), - // ignore this for start job - // Duration: int32(time.Now().Unix() - job.StartTime), // or SubmitTime? - Walltime: time.Now().Unix(), // max duration requested by the job - // Tags: job.Tags, - // ignore this! - // RawResources: jobResourcesInBytes, - // "job_resources": "allocated_nodes" "sockets": - // very important; has to be right - Resources: resources, - RawMetaData: metaDataInBytes, - // optional metadata with'jobScript 'jobName': 'slurmInfo': - MetaData: metaData, - // ConcurrentJobs: job.ConcurrentJobs, - } - log.Debugf("Generated BaseJob with Resources=%v", defaultJob.Resources[0]) - - req := &schema.JobMeta{ - BaseJob: defaultJob, - StartTime: job.StartTime, - Statistics: make(map[string]schema.JobStatistics), - } - log.Debugf("Generated JobMeta %v", req.BaseJob.JobID) - - // req := new(schema.JobMeta) - // id, err := cfg.JobRepository.Start(req) - // log.Debugf("Added %v", id) - // } else { - // for _, job := range jobs { - // log.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID) - // } - // } - // } - } else { - // Check if completed job with combination of (job_id, cluster_id, start_time) already exists: - jobID := job.JobID - log.Debugf("jobID: %v Cluster: %v StartTime: %v", jobID, job.Cluster, job.StartTime) - // commented out as it will cause panic - // note down params invoked - - // existingJob, err := cfg.JobRepository.Find(&jobID, &job.Cluster, &job.StartTime) - - // if err == nil { - // existingJob.BaseJob.Duration = int32(job.EndTime - job.StartTime) - // existingJob.BaseJob.State = schema.JobState(job.JobState) - // existingJob.BaseJob.Walltime = job.StartTime - - // req := &StopJobRequest{ - // Cluster: &job.Cluster, - // JobId: &jobID, - // State: schema.JobState(job.JobState), - // StartTime: &existingJob.StartTimeUnix, - // StopTime: job.EndTime, - // } - // // req := new(schema.JobMeta) - // cfg.checkAndHandleStopJob(existingJob, req) - // } - } } + return nil +} +func (cfg *SlurmRestSchedulerConfig) HandleDumpedJobs(jobs []DumpedJob) error { + + // Iterate over the Jobs slice + for _, job := range jobs { + // Process each job from Slurm + fmt.Printf("Job ID: %d\n", job.JobID) + fmt.Printf("Job Name: %s\n", job.Name) + fmt.Printf("Job State: %s\n", job.State.Current) + fmt.Println("Job EndTime:", job.Time.End) + fmt.Println("Job Cluster:", job.Cluster) + + // Check if completed job with combination of (job_id, cluster_id, start_time) already exists + log.Debugf("Processing completed dumped job ID: %v Cluster: %v StartTime: %v", job.JobID, job.Cluster, job.Time.Start) + existingJob, err := cfg.JobRepository.Find(&job.JobID, &job.Cluster, &job.Time.Start) + + if err == nil && existingJob.State != schema.JobStateCompleted { + // for jobs completed in Slurm (either in Slurm or maybe SlurmDB) + // update job in CC with new info (job final status, duration, end timestamp) + + existingJob.BaseJob.Duration = int32(job.Time.End - job.Time.Start) + existingJob.BaseJob.State = schema.JobState(job.State.Current) + existingJob.BaseJob.Walltime = job.Time.End + + req := &StopJobRequest{ + Cluster: &job.Cluster, + JobId: &job.JobID, + State: schema.JobState(job.State.Current), + StartTime: &job.Time.Start, + StopTime: job.Time.End, + } + cfg.checkAndHandleStopJob(existingJob, req) + } + } + return nil } func (cfg *SlurmRestSchedulerConfig) Sync() { - // Fetch an instance of JobsResponse - jobsResponse, err := queryAllJobsLocal() + // Fetch an instance of Slurm JobsResponse + jobsResponse, err := fetchJobsLocal() if err != nil { log.Fatal(err.Error()) } - cfg.HandleJobsResponse(jobsResponse.Jobs) + cfg.HandleJobs(jobsResponse.Jobs) + + // Fetch an instance of Slurm DB JobsResponse + dumpedJobsResponse, err := fetchDumpedJobsLocal() + if err != nil { + log.Fatal(err.Error()) + } + cfg.HandleDumpedJobs(dumpedJobsResponse.Jobs) }