From a3fbdbcf90e9ae21f33a559561d3afe081324acf Mon Sep 17 00:00:00 2001 From: Bole Ma Date: Wed, 25 Oct 2023 23:27:06 +0200 Subject: [PATCH] Added JSON Payload Converter --- go.mod | 2 +- go.sum | 2 + internal/scheduler/payloadConverter.go | 278 +++++++++++++++++++ internal/scheduler/slurmRest.go | 370 ++++++++++++++----------- tools/nats-manager/main.go | 15 +- 5 files changed, 509 insertions(+), 158 deletions(-) create mode 100644 internal/scheduler/payloadConverter.go diff --git a/go.mod b/go.mod index 98dac3a..bcd4774 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( require ( github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect github.com/ClusterCockpit/slurm-rest-client-0_0_37 v0.0.0-20230901125459-dc653ac37420 // indirect - github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20230906120742-0f15562ea666 // indirect + github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20231010134848-707e8b20bde7 // indirect github.com/KyleBanks/depth v1.2.1 // indirect github.com/agnivade/levenshtein v1.1.1 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index 6f22475..c4c0531 100644 --- a/go.sum +++ b/go.sum @@ -86,6 +86,8 @@ github.com/ClusterCockpit/slurm-rest-client-0_0_37 v0.0.0-20230901125459-dc653ac github.com/ClusterCockpit/slurm-rest-client-0_0_37 v0.0.0-20230901125459-dc653ac37420/go.mod h1:oNgVG2puNj9cNw/KgqLbgE1pPOn8jXORX3ErP58LcAA= github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20230906120742-0f15562ea666 h1:8PofHcOwEMmeAFqJjvAEgnu7rbRHAwJhd2XJ9u/YxiU= github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20230906120742-0f15562ea666/go.mod h1:vxaj1my0GNoCXx4bYyOTA/IZP/IOZImtdOIn4T7pCa4= +github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20231010134848-707e8b20bde7 h1:YY/qDtFsp1DOJw/jyobiIBiIh1/yD2IVOdcK7EVEIKs= +github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20231010134848-707e8b20bde7/go.mod h1:vxaj1my0GNoCXx4bYyOTA/IZP/IOZImtdOIn4T7pCa4= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/Masterminds/squirrel v1.5.3 h1:YPpoceAcxuzIljlr5iWpNKaql7hLeG1KLSrhvdHpkZc= diff --git a/internal/scheduler/payloadConverter.go b/internal/scheduler/payloadConverter.go new file mode 100644 index 0000000..254a323 --- /dev/null +++ b/internal/scheduler/payloadConverter.go @@ -0,0 +1,278 @@ +// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package scheduler + +import ( + "encoding/json" + "fmt" + "os" + + "github.com/ClusterCockpit/cc-backend/pkg/log" +) + +type MetricConfig struct { + Name string `json:"name"` + Unit struct { + Base string `json:"base"` + } `json:"unit"` + Scope string `json:"scope"` + Aggregation string `json:"aggregation"` + Timestep int `json:"timestep"` + Peak float64 `json:"peak"` + Normal float64 `json:"normal"` + Caution float64 `json:"caution"` + Alert float64 `json:"alert"` +} + +type SubCluster struct { + Name string `json:"name"` + Nodes string `json:"nodes"` + ProcessorType string `json:"processorType"` + SocketsPerNode int `json:"socketsPerNode"` + CoresPerSocket int `json:"coresPerSocket"` + ThreadsPerCore int `json:"threadsPerCore"` + FlopRateScalar struct { + Unit struct { + Base string `json:"base"` + Prefix string `json:"prefix"` + } `json:"unit"` + Value float64 `json:"value"` + } `json:"flopRateScalar"` + FlopRateSimd struct { + Unit struct { + Base string `json:"base"` + Prefix string `json:"prefix"` + } `json:"unit"` + Value float64 `json:"value"` + } `json:"flopRateSimd"` + MemoryBandwidth struct { + Unit struct { + Base string `json:"base"` + Prefix string `json:"prefix"` + } `json:"unit"` + Value float64 `json:"value"` + } `json:"memoryBandwidth"` + Topology struct { + Node []int `json:"node"` + Socket [][]int `json:"socket"` + MemoryDomain [][]int `json:"memoryDomain"` + Core [][]int `json:"core"` + Accelerators []struct { + ID string `json:"id"` + Type string `json:"type"` + Model string `json:"model"` + } `json:"accelerators"` + } `json:"topology"` +} + +type ClusterConfig struct { + Name string `json:"name"` + MetricConfig []MetricConfig `json:"metricConfig"` + SubClusters []SubCluster `json:"subClusters"` +} + +type Meta struct { + Plugin struct { + Type string `json:"type"` + Name string `json:"name"` + } `json:"plugin"` + Slurm struct { + Version struct { + Major int `json:"major"` + Micro int `json:"micro"` + Minor int `json:"minor"` + } `json:"version"` + Release string `json:"release"` + } `json:"Slurm"` +} + +type Job struct { + Account string `json:"account"` + AccrueTime int `json:"accrue_time"` + AdminComment string `json:"admin_comment"` + ArrayJobID int `json:"array_job_id"` + ArrayTaskID interface{} `json:"array_task_id"` + ArrayMaxTasks int `json:"array_max_tasks"` + ArrayTaskString string `json:"array_task_string"` + AssociationID int `json:"association_id"` + BatchFeatures string `json:"batch_features"` + BatchFlag bool `json:"batch_flag"` + BatchHost string `json:"batch_host"` + Flags []string `json:"flags"` + BurstBuffer string `json:"burst_buffer"` + BurstBufferState string `json:"burst_buffer_state"` + Cluster string `json:"cluster"` + ClusterFeatures string `json:"cluster_features"` + Command string `json:"command"` + Comment string `json:"comment"` + Container string `json:"container"` + Contiguous bool `json:"contiguous"` + CoreSpec interface{} `json:"core_spec"` + ThreadSpec interface{} `json:"thread_spec"` + CoresPerSocket interface{} `json:"cores_per_socket"` + BillableTres interface{} `json:"billable_tres"` + CPUPerTask interface{} `json:"cpus_per_task"` + CPUFrequencyMinimum interface{} `json:"cpu_frequency_minimum"` + CPUFrequencyMaximum interface{} `json:"cpu_frequency_maximum"` + CPUFrequencyGovernor interface{} `json:"cpu_frequency_governor"` + CPUPerTres string `json:"cpus_per_tres"` + Deadline int `json:"deadline"` + DelayBoot int `json:"delay_boot"` + Dependency string `json:"dependency"` + DerivedExitCode int `json:"derived_exit_code"` + EligibleTime int `json:"eligible_time"` + EndTime int `json:"end_time"` + ExcludedNodes string `json:"excluded_nodes"` + ExitCode int `json:"exit_code"` + Features string `json:"features"` + FederationOrigin string `json:"federation_origin"` + FederationSiblingsActive string `json:"federation_siblings_active"` + FederationSiblingsViable string `json:"federation_siblings_viable"` + GresDetail []string `json:"gres_detail"` + GroupID int `json:"group_id"` + GroupName string `json:"group_name"` + JobID int `json:"job_id"` + JobState string `json:"job_state"` + LastSchedEvaluation int `json:"last_sched_evaluation"` + Licenses string `json:"licenses"` + MaxCPUs int `json:"max_cpus"` + MaxNodes int `json:"max_nodes"` + MCSLabel string `json:"mcs_label"` + MemoryPerTres string `json:"memory_per_tres"` + Name string `json:"name"` + Nodes string `json:"nodes"` + Nice interface{} `json:"nice"` + TasksPerCore interface{} `json:"tasks_per_core"` + TasksPerNode int `json:"tasks_per_node"` + TasksPerSocket interface{} `json:"tasks_per_socket"` + TasksPerBoard int `json:"tasks_per_board"` + CPUs int `json:"cpus"` + NodeCount int `json:"node_count"` + Tasks int `json:"tasks"` + HETJobID int `json:"het_job_id"` + HETJobIDSet string `json:"het_job_id_set"` + HETJobOffset int `json:"het_job_offset"` + Partition string `json:"partition"` + MemoryPerNode interface{} `json:"memory_per_node"` + MemoryPerCPU int `json:"memory_per_cpu"` + MinimumCPUsPerNode int `json:"minimum_cpus_per_node"` + MinimumTmpDiskPerNode int `json:"minimum_tmp_disk_per_node"` + PreemptTime int `json:"preempt_time"` + PreSUSTime int `json:"pre_sus_time"` + Priority int `json:"priority"` + Profile interface{} `json:"profile"` + QOS string `json:"qos"` + Reboot bool `json:"reboot"` + RequiredNodes string `json:"required_nodes"` + Requeue bool `json:"requeue"` + ResizeTime int `json:"resize_time"` + RestartCnt int `json:"restart_cnt"` + ResvName string `json:"resv_name"` + Shared interface{} `json:"shared"` + ShowFlags []string `json:"show_flags"` + SocketsPerBoard int `json:"sockets_per_board"` + SocketsPerNode interface{} `json:"sockets_per_node"` + StartTime int `json:"start_time"` + StateDescription string `json:"state_description"` + StateReason string `json:"state_reason"` + StandardError string `json:"standard_error"` + StandardInput string `json:"standard_input"` + StandardOutput string `json:"standard_output"` + SubmitTime int `json:"submit_time"` + SuspendTime int `json:"suspend_time"` + SystemComment string `json:"system_comment"` + TimeLimit int `json:"time_limit"` + TimeMinimum int `json:"time_minimum"` + ThreadsPerCore interface{} `json:"threads_per_core"` + TresBind string `json:"tres_bind"` + TresFreq string `json:"tres_freq"` + TresPerJob string `json:"tres_per_job"` + TresPerNode string `json:"tres_per_node"` + TresPerSocket string `json:"tres_per_socket"` + TresPerTask string `json:"tres_per_task"` + TresReqStr string `json:"tres_req_str"` + TresAllocStr string `json:"tres_alloc_str"` + UserID int `json:"user_id"` + UserName string `json:"user_name"` + Wckey string `json:"wckey"` + CurrentWorkingDirectory string `json:"current_working_directory"` +} + +type SlurmData struct { + Meta Meta `json:"meta"` + Errors []interface{} `json:"errors"` + Jobs []Job `json:"jobs"` +} + +func DecodeClusterConfig(filename string) (ClusterConfig, error) { + var clusterConfig ClusterConfig + + file, err := os.Open(filename) + if err != nil { + log.Errorf("Cluster config file not found. No cores/GPU ids available.") + return clusterConfig, err + } + defer file.Close() + + decoder := json.NewDecoder(file) + err = decoder.Decode(&clusterConfig) + if err != nil { + log.Errorf("Error decoding cluster config file: %v", err) + } + + log.Printf("Name: %s\n", clusterConfig.Name) + log.Printf("MetricConfig:\n") + for _, metric := range clusterConfig.MetricConfig { + log.Printf(" Name: %s\n", metric.Name) + log.Printf(" Unit Base: %s\n", metric.Unit.Base) + log.Printf(" Scope: %s\n", metric.Scope) + log.Printf(" Aggregation: %s\n", metric.Aggregation) + log.Printf(" Timestep: %d\n", metric.Timestep) + log.Printf(" Peak: %f\n", metric.Peak) + log.Printf(" Normal: %f\n", metric.Normal) + log.Printf(" Caution: %f\n", metric.Caution) + log.Printf(" Alert: %f\n", metric.Alert) + } + log.Printf("SubClusters:\n") + for _, subCluster := range clusterConfig.SubClusters { + log.Printf(" Name: %s\n", subCluster.Name) + log.Printf(" Nodes: %s\n", subCluster.Nodes) + log.Printf(" Processor Type: %s\n", subCluster.ProcessorType) + log.Printf(" Sockets Per Node: %d\n", subCluster.SocketsPerNode) + log.Printf(" Cores Per Socket: %d\n", subCluster.CoresPerSocket) + log.Printf(" Threads Per Core: %d\n", subCluster.ThreadsPerCore) + log.Printf(" Flop Rate Scalar Unit Base: %s\n", subCluster.FlopRateScalar.Unit.Base) + log.Printf(" Flop Rate Scalar Unit Prefix: %s\n", subCluster.FlopRateScalar.Unit.Prefix) + log.Printf(" Flop Rate Scalar Value: %f\n", subCluster.FlopRateScalar.Value) + log.Printf(" Flop Rate Simd Unit Base: %s\n", subCluster.FlopRateSimd.Unit.Base) + log.Printf(" Flop Rate Simd Unit Prefix: %s\n", subCluster.FlopRateSimd.Unit.Prefix) + log.Printf(" Flop Rate Simd Value: %f\n", subCluster.FlopRateSimd.Value) + log.Printf(" Memory Bandwidth Unit Base: %s\n", subCluster.MemoryBandwidth.Unit.Base) + log.Printf(" Memory Bandwidth Unit Prefix: %s\n", subCluster.MemoryBandwidth.Unit.Prefix) + log.Printf(" Memory Bandwidth Value: %f\n", subCluster.MemoryBandwidth.Value) + log.Printf(" Topology Node: %v\n", subCluster.Topology.Node) + log.Printf(" Topology Socket: %v\n", subCluster.Topology.Socket) + log.Printf(" Topology Memory Domain: %v\n", subCluster.Topology.MemoryDomain) + log.Printf(" Topology Core: %v\n", subCluster.Topology.Core) + log.Printf(" Topology Accelerators:\n") + for _, accelerator := range subCluster.Topology.Accelerators { + log.Printf(" ID: %s\n", accelerator.ID) + log.Printf(" Type: %s\n", accelerator.Type) + log.Printf(" Model: %s\n", accelerator.Model) + } + } + + return clusterConfig, nil +} + +func UnmarshalSlurmPayload(jsonPayload string) (SlurmData, error) { + var slurmData SlurmData + err := json.Unmarshal([]byte(jsonPayload), &slurmData) + if err != nil { + return slurmData, fmt.Errorf("failed to unmarshal JSON data: %v", err) + } + return slurmData, nil +} diff --git a/internal/scheduler/slurmRest.go b/internal/scheduler/slurmRest.go index 1dea1f5..5c7c954 100644 --- a/internal/scheduler/slurmRest.go +++ b/internal/scheduler/slurmRest.go @@ -5,7 +5,6 @@ package scheduler import ( - "database/sql" "encoding/json" "fmt" "net/http" @@ -16,6 +15,11 @@ import ( "strconv" "time" + "fmt" + "regexp" + "strconv" + "strings" + "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -28,7 +32,7 @@ type SlurmRestSchedulerConfig struct { JobRepository *repository.JobRepository - clusterConfig map[string]interface{} + clusterConfig ClusterConfig } var client *http.Client @@ -48,19 +52,19 @@ func queryDB(qtime int64, clusterName string) ([]interface{}, error) { // Create a new HTTP GET request req, err := http.NewRequest("GET", apiEndpoint, nil) if err != nil { - log.Errorf("Error creating request:", err) + log.Errorf("Error creating request: %v", err) } // Send the request resp, err := client.Do(req) if err != nil { - log.Errorf("Error sending request:", err) + log.Errorf("Error sending request: %v", err) } defer resp.Body.Close() // Check the response status code if resp.StatusCode != http.StatusOK { - log.Errorf("API request failed with status:", resp.Status) + log.Errorf("API request failed with status: %v", resp.Status) } // Read the response body @@ -69,15 +73,15 @@ func queryDB(qtime int64, clusterName string) ([]interface{}, error) { var dbOutput []byte _, err = resp.Body.Read(dbOutput) if err != nil { - log.Errorf("Error reading response body:", err) + log.Errorf("Error reading response body: %v", err) } - log.Errorf("API response:", string(dbOutput)) + log.Errorf("API response: %v", string(dbOutput)) dataJobs := make(map[string]interface{}) err = json.Unmarshal(dbOutput, &dataJobs) if err != nil { - log.Errorf("Error parsing JSON response:", err) + log.Errorf("Error parsing JSON response: %v", err) os.Exit(1) } @@ -93,34 +97,53 @@ func queryDB(qtime int64, clusterName string) ([]interface{}, error) { func queryAllJobs() (openapi.V0038JobsResponse, error) { var ctlOutput []byte - apiEndpoint := "/slurm/v0.0.38/jobs" + apiEndpoint := "http://:8080/slurm/v0.0.38/jobs" // Create a new HTTP GET request with query parameters req, err := http.NewRequest("GET", apiEndpoint, nil) if err != nil { - log.Errorf("Error creating request:", err) + log.Errorf("Error creating request: %v", err) } // Send the request resp, err := client.Do(req) if err != nil { - log.Errorf("Error sending request:", err) + log.Errorf("Error sending request: %v", err) } defer resp.Body.Close() // Check the response status code if resp.StatusCode != http.StatusOK { - log.Errorf("API request failed with status:", resp.Status) + log.Errorf("API request failed with status: %v", resp.Status) } _, err = resp.Body.Read(ctlOutput) + log.Printf("Received JSON Data: %v", ctlOutput) if err != nil { - log.Errorf("Error reading response body:", err) + log.Errorf("Error reading response body: %v", err) } var jobsResponse openapi.V0038JobsResponse err = json.Unmarshal(ctlOutput, &jobsResponse) if err != nil { - log.Errorf("Error parsing JSON response:", err) + log.Errorf("Error parsing JSON response: %v", err) + return jobsResponse, err + } + + return jobsResponse, nil +} + +func queryAllJobsLocal() (openapi.V0038JobsResponse, error) { + // Read the JSON file + jobsData, err := os.ReadFile("slurm_0038.json") + + if err != nil { + fmt.Println("Error reading JSON file:", err) + } + + var jobsResponse openapi.V0038JobsResponse + err = json.Unmarshal(jobsData, &jobsResponse) + if err != nil { + log.Errorf("Error parsing JSON response: %v", err) return jobsResponse, err } @@ -179,39 +202,23 @@ func exitWithError(err error, output []byte) { os.Exit(1) } -func loadClusterConfig(filename string) (map[string]interface{}, error) { - clusterConfigData := make(map[string]interface{}) - - file, err := os.Open(filename) - if err != nil { - log.Errorf("Cluster config file not found. No cores/GPU ids available.") - return clusterConfigData, err - } - defer file.Close() - - decoder := json.NewDecoder(file) - err = decoder.Decode(&clusterConfigData) - if err != nil { - log.Errorf("Error decoding cluster config file:", err) - } - - return clusterConfigData, err -} - -func (cfg *SlurmRestSchedulerConfig) Init(rawConfig json.RawMessage) error { +func (cfg *SlurmRestSchedulerConfig) Init() error { var err error - cfg.clusterConfig, err = loadClusterConfig("cluster-fritz.json") - for k, v := range cfg.clusterConfig { - switch c := v.(type) { - case string: - fmt.Printf("Item %q is a string, containing %q\n", k, c) - case float64: - fmt.Printf("Looks like item %q is a number, specifically %f\n", k, c) - default: - fmt.Printf("Not sure what type item %q is, but I think it might be %T\n", k, c) - } - } + cfg.clusterConfig, err = DecodeClusterConfig("cluster-alex.json") + + // for k, v := range cfg.clusterConfig { + // fmt.Printf("Entry %q with value %x loaded\n", k, v) + // // switch c := v.(type) { + // // case string: + // // fmt.Printf("Item %q is a string, containing %q\n", k, c) + // // case float64: + // // fmt.Printf("Looks like item %q is a number, specifically %f\n", k, c) + // // default: + // // fmt.Printf("Not sure what type item %q is, but I think it might be %T\n", k, c) + // // } + // } + // fmt.Printf("Cluster Name: %q\n", cfg.clusterConfig["name"]) // Create an HTTP client client = &http.Client{} @@ -253,15 +260,49 @@ func (cfg *SlurmRestSchedulerConfig) checkAndHandleStopJob(job *schema.Job, req cfg.JobRepository.TriggerArchiving(job) } +func ConstructNodeAcceleratorMap(input string, accelerator string) map[string]string { + numberMap := make(map[string]string) + + // Split the input by commas + groups := strings.Split(input, ",") + + for _, group := range groups { + // Use regular expressions to match numbers and ranges + numberRangeRegex := regexp.MustCompile(`a\[(\d+)-(\d+)\]`) + numberRegex := regexp.MustCompile(`a(\d+)`) + + if numberRangeRegex.MatchString(group) { + // Extract nodes from ranges + matches := numberRangeRegex.FindStringSubmatch(group) + if len(matches) == 3 { + start, _ := strconv.Atoi(matches[1]) + end, _ := strconv.Atoi(matches[2]) + for i := start; i <= end; i++ { + numberMap[matches[0]+fmt.Sprintf("%04d", i)] = accelerator + } + } + } else if numberRegex.MatchString(group) { + // Extract individual node + matches := numberRegex.FindStringSubmatch(group) + if len(matches) == 2 { + numberMap[group] = accelerator + } + } + } + + return numberMap +} + func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V0038JobsResponse) { // Iterate over the Jobs slice for _, job := range jobsResponse.Jobs { // Process each job - fmt.Printf("Job ID: %s\n", job.JobId) + fmt.Printf("Job ID: %d\n", *job.JobId) fmt.Printf("Job Name: %s\n", *job.Name) fmt.Printf("Job State: %s\n", *job.JobState) fmt.Println("Job StartTime:", *job.StartTime) + fmt.Println("Job Cluster:", *job.Cluster) // aquire lock to avoid race condition between API calls // var unlockOnce sync.Once @@ -269,128 +310,147 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V00 // defer unlockOnce.Do(cfg.RepositoryMutex.Unlock) // is "running" one of JSON state? - if *job.JobState == "running" { + if *job.JobState == "RUNNING" { - jobs, err := cfg.JobRepository.FindRunningJobs(*job.Cluster) + // jobs, err := cfg.JobRepository.FindRunningJobs(*job.Cluster) + // if err != nil { + // log.Fatalf("Failed to find running jobs: %v", err) + // } + + // for id, job := range jobs { + // fmt.Printf("Job ID: %d, Job: %+v\n", id, job) + // } + + // if err != nil || err != sql.ErrNoRows { + // log.Errorf("checking for duplicate failed: %s", err.Error()) + // return + // } else if err == nil { + // if len(jobs) == 0 { + var exclusive int32 + if job.Shared == nil { + exclusive = 1 + } else { + exclusive = 0 + } + + jobResourcesInBytes, err := json.Marshal(*job.JobResources) if err != nil { - log.Fatalf("Failed to find running jobs: %v", err) + log.Fatalf("JobResources JSON marshaling failed: %s", err) } - for id, job := range jobs { - fmt.Printf("Job ID: %d, Job: %+v\n", id, job) + var resources []*schema.Resource + + // Define a regular expression to match "gpu=x" + regex := regexp.MustCompile(`gpu=(\d+)`) + + // Find all matches in the input string + matches := regex.FindAllStringSubmatch(*job.TresAllocStr, -1) + + // Initialize a variable to store the total number of GPUs + var totalGPUs int32 + // Iterate through the matches + match := matches[0] + if len(match) == 2 { + gpuCount, _ := strconv.Atoi(match[1]) + totalGPUs += int32(gpuCount) } - if err != nil || err != sql.ErrNoRows { - log.Errorf("checking for duplicate failed: %s", err.Error()) - return - } else if err == nil { - if len(jobs) == 0 { - var exclusive int32 - if job.Shared == nil { - exclusive = 1 - } else { - exclusive = 0 - } + for _, node := range job.JobResources.AllocatedNodes { + var res schema.Resource + res.Hostname = *node.Nodename - jobResourcesInBytes, err := json.Marshal(*job.JobResources) - if err != nil { - log.Fatalf("JSON marshaling failed: %s", err) - } + log.Debugf("Node %s V0038NodeAllocationSockets.Cores map size: %d\n", *node.Nodename, len(node.Sockets.Cores)) - var resources []*schema.Resource - - // Define a regular expression to match "gpu=x" - regex := regexp.MustCompile(`gpu=(\d+)`) - - // Find all matches in the input string - matches := regex.FindAllStringSubmatch(*job.TresAllocStr, -1) - - // Initialize a variable to store the total number of GPUs - var totalGPUs int32 - // Iterate through the matches - match := matches[0] - if len(match) == 2 { - gpuCount, _ := strconv.Atoi(match[1]) - totalGPUs += int32(gpuCount) - } - - for _, node := range job.JobResources.AllocatedNodes { - var res schema.Resource - res.Hostname = *node.Nodename - for k, v := range node.Sockets.Cores { - fmt.Printf("core id[%s] value[%s]\n", k, v) - threadID, _ := strconv.Atoi(k) - res.HWThreads = append(res.HWThreads, threadID) - } - // cpu=512,mem=1875G,node=4,billing=512,gres\/gpu=32,gres\/gpu:a40=32 - // For core/GPU id mapping, need to query from cluster config file - res.Accelerators = append(res.Accelerators, *job.TresAllocStr) - resources = append(resources, &res) - } - - var metaData map[string]string - metaData["jobName"] = *job.Name - metaData["slurmInfo"] = printSlurmInfo(job) - - switch slurmPath := cfg.clusterConfig["slurm_path"].(type) { - case string: - commandCtlScriptTpl := fmt.Sprintf("%sscontrol -M %%s write batch_script %%s -", slurmPath) - queryJobScript := fmt.Sprintf(commandCtlScriptTpl, job.Cluster, job.JobId) - metaData["jobScript"] = queryJobScript - default: - // Type assertion failed - fmt.Println("Conversion of slurm_path to string failed") - } - - metaDataInBytes, err := json.Marshal(metaData) - - var defaultJob schema.BaseJob = schema.BaseJob{ - JobID: int64(*job.JobId), - User: *job.UserName, - Project: *job.Account, - Cluster: *job.Cluster, - Partition: *job.Partition, - // check nil - ArrayJobId: int64(*job.ArrayJobId), - NumNodes: *job.NodeCount, - NumHWThreads: *job.Cpus, - NumAcc: totalGPUs, - Exclusive: exclusive, - // MonitoringStatus: job.MonitoringStatus, - // SMT: *job.TasksPerCore, - State: schema.JobState(*job.JobState), - // ignore this for start job - // Duration: int32(time.Now().Unix() - *job.StartTime), // or SubmitTime? - Walltime: time.Now().Unix(), // max duration requested by the job - // Tags: job.Tags, - // ignore this! - RawResources: jobResourcesInBytes, - // "job_resources": "allocated_nodes" "sockets": - // very important; has to be right - Resources: resources, - RawMetaData: metaDataInBytes, - // optional metadata with'jobScript 'jobName': 'slurmInfo': - MetaData: metaData, - // ConcurrentJobs: job.ConcurrentJobs, - } - req := &schema.JobMeta{ - BaseJob: defaultJob, - StartTime: *job.StartTime, - Statistics: make(map[string]schema.JobStatistics), - } - // req := new(schema.JobMeta) - id, err := cfg.JobRepository.Start(req) - log.Debugf("Added %v", id) - } else { - for _, job := range jobs { - log.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID) - } + if node.Cpus == nil || node.Memory == nil { + log.Fatalf("Either node.Cpus or node.Memory is nil\n") } + + for k, v := range node.Sockets.Cores { + fmt.Printf("core id[%s] value[%s]\n", k, v) + threadID, _ := strconv.Atoi(k) + res.HWThreads = append(res.HWThreads, threadID) + } + + // cpu=512,mem=1875G,node=4,billing=512,gres\/gpu=32,gres\/gpu:a40=32 + // For core/GPU id mapping, need to query from cluster config file + res.Accelerators = append(res.Accelerators, *job.Comment) + resources = append(resources, &res) } + + metaData := make(map[string]string) + metaData["jobName"] = *job.Name + metaData["slurmInfo"] = printSlurmInfo(job) + + // switch slurmPath := cfg.clusterConfig["slurm_path"].(type) { + // case string: + // commandCtlScriptTpl := fmt.Sprintf("%sscontrol -M %%s write batch_script %%s -", slurmPath) + // queryJobScript := fmt.Sprintf(commandCtlScriptTpl, job.Cluster, job.JobId) + // metaData["jobScript"] = queryJobScript + // default: + // // Type assertion failed + // fmt.Println("Conversion of slurm_path to string failed", cfg.clusterConfig["slurm_path"]) + // } + + metaDataInBytes, err := json.Marshal(metaData) + if err != nil { + log.Fatalf("metaData JSON marshaling failed: %s", err) + } + + var defaultJob schema.BaseJob = schema.BaseJob{ + JobID: int64(*job.JobId), + User: *job.UserName, + Project: *job.Account, + Cluster: *job.Cluster, + Partition: *job.Partition, + // check nil + ArrayJobId: int64(*job.ArrayJobId), + NumNodes: *job.NodeCount, + NumHWThreads: *job.Cpus, + NumAcc: totalGPUs, + Exclusive: exclusive, + // MonitoringStatus: job.MonitoringStatus, + // SMT: *job.TasksPerCore, + State: schema.JobState(*job.JobState), + // ignore this for start job + // Duration: int32(time.Now().Unix() - *job.StartTime), // or SubmitTime? + Walltime: time.Now().Unix(), // max duration requested by the job + // Tags: job.Tags, + // ignore this! + RawResources: jobResourcesInBytes, + // "job_resources": "allocated_nodes" "sockets": + // very important; has to be right + Resources: resources, + RawMetaData: metaDataInBytes, + // optional metadata with'jobScript 'jobName': 'slurmInfo': + MetaData: metaData, + // ConcurrentJobs: job.ConcurrentJobs, + } + log.Debugf("Generated BaseJob with Resources=%v", defaultJob.Resources[0]) + + req := &schema.JobMeta{ + BaseJob: defaultJob, + StartTime: *job.StartTime, + Statistics: make(map[string]schema.JobStatistics), + } + log.Debugf("Generated JobMeta %v", req.BaseJob.JobID) + + // req := new(schema.JobMeta) + // id, err := cfg.JobRepository.Start(req) + // log.Debugf("Added %v", id) + // } else { + // for _, job := range jobs { + // log.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID) + // } + // } + // } } else { // Check if completed job with combination of (job_id, cluster_id, start_time) already exists: var jobID int64 jobID = int64(*job.JobId) + log.Debugf("jobID: %v Cluster: %v StartTime: %v", jobID, *job.Cluster, *job.StartTime) + // commented out as it will cause panic + // note down params invoked + existingJob, err := cfg.JobRepository.Find(&jobID, job.Cluster, job.StartTime) if err == nil { @@ -416,7 +476,7 @@ func (cfg *SlurmRestSchedulerConfig) HandleJobsResponse(jobsResponse openapi.V00 func (cfg *SlurmRestSchedulerConfig) Sync() { // Fetch an instance of V0037JobsResponse - jobsResponse, err := queryAllJobs() + jobsResponse, err := queryAllJobsLocal() if err != nil { log.Fatal(err.Error()) } diff --git a/tools/nats-manager/main.go b/tools/nats-manager/main.go index 90d0d04..11b0aa7 100644 --- a/tools/nats-manager/main.go +++ b/tools/nats-manager/main.go @@ -110,10 +110,10 @@ func injectPayload() { // Start the HTTP server on port 8080 fmt.Println("Listening on :8080...") - http.ListenAndServe(":8080", nil) + go http.ListenAndServe(":8080", nil) } -func main() { +func loadSlurmNatsScheduler() { cfgData := []byte(`{"target": "localhost"}`) var sch scheduler.SlurmNatsScheduler @@ -122,5 +122,16 @@ func main() { // go injectPayload() +} + +func main() { + + var sch scheduler.SlurmRestSchedulerConfig + sch.Init() + + // injectPayload() + + sch.Sync() + os.Exit(0) }