diff --git a/internal/scheduler/slurmRest.go b/internal/scheduler/slurmRest.go index 131f126..2b2ef3b 100644 --- a/internal/scheduler/slurmRest.go +++ b/internal/scheduler/slurmRest.go @@ -4,7 +4,26 @@ // license that can be found in the LICENSE file. package scheduler -import "encoding/json" +import ( + "database/sql" + "encoding/json" + "fmt" + "math" + "net/http" + "net/url" + "os" + "os/exec" + "strconv" + "time" + + "github.com/ClusterCockpit/cc-backend/pkg/log" +) + +// A Response struct to map the Entire Response +type Response struct { + Name string `json:"name"` + Jobs []Job `json:"job_entries"` +} type SlurmRestSchedulerConfig struct { URL string `json:"url"` @@ -14,26 +33,235 @@ type SlurmRestScheduler struct { url string } -func (sd *SlurmRestScheduler) Init(rawConfig json.RawMessage) error { - // cfg := slurmrest.NewConfiguration() - // cfg.HTTPClient = &http.Client{Timeout: time.Second * 3600} - // cfg.Scheme = "http" - // cfg.Host = "localhost" +var client *http.Client - // client := slurmrest.NewAPIClient(cfg) - return nil +func queryDB(qtime int64, clusterName string) ([]interface{}, error) { + + apiEndpoint := "/slurmdb/v0.0.39/jobs" + + // Construct the query parameters + queryParams := url.Values{} + queryParams.Set("users", "user1,user2") + queryParams.Set("submit_time", "2023-01-01T00:00:00") + + // Add the query parameters to the API endpoint + apiEndpoint += "?" + queryParams.Encode() + + // Create a new HTTP GET request + req, err := http.NewRequest("GET", apiEndpoint, nil) + if err != nil { + log.Errorf("Error creating request:", err) + } + + // Send the request + resp, err := client.Do(req) + if err != nil { + log.Errorf("Error sending request:", err) + } + defer resp.Body.Close() + + // Check the response status code + if resp.StatusCode != http.StatusOK { + log.Errorf("API request failed with status:", resp.Status) + } + + // Read the response body + // Here you can parse the response body as needed + // For simplicity, let's just print the response body + var dbOutput []byte + _, err = resp.Body.Read(dbOutput) + if err != nil { + log.Errorf("Error reading response body:", err) + } + + log.Errorf("API response:", string(dbOutput)) + + dataJobs := make(map[string]interface{}) + err = json.Unmarshal(dbOutput, &dataJobs) + if err != nil { + log.Errorf("Error parsing JSON response:", err) + os.Exit(1) + } + + if _, ok := dataJobs["jobs"]; !ok { + log.Errorf("ERROR: jobs not found - response incomplete") + os.Exit(1) + } + + jobs, _ := dataJobs["jobs"].([]interface{}) + return jobs, nil +} + +func queryAllJobs() ([]interface{}, error) { + var ctlOutput []byte + + apiEndpoint := "/slurm/v0.0.39/jobs" + // Create a new HTTP GET request with query parameters + req, err := http.NewRequest("GET", apiEndpoint, nil) + if err != nil { + log.Errorf("Error creating request:", err) + } + + // Send the request + resp, err := client.Do(req) + if err != nil { + log.Errorf("Error sending request:", err) + } + defer resp.Body.Close() + + // Check the response status code + if resp.StatusCode != http.StatusOK { + log.Errorf("API request failed with status:", resp.Status) + } + + _, err = resp.Body.Read(ctlOutput) + if err != nil { + log.Errorf("Error reading response body:", err) + } + + dataJob := make(map[string]interface{}) + err = json.Unmarshal(ctlOutput, &dataJob) + if err != nil { + log.Errorf("Error parsing JSON response:", err) + os.Exit(1) + } + + if _, ok := dataJob["jobs"]; !ok { + log.Errorf("ERROR: jobs not found - response incomplete") + os.Exit(1) + } + + jobs, _ := dataJob["jobs"].([]interface{}) + return jobs, nil +} + +func printSlurmInfo(job map[string]interface{}) string { + cpusPerTask := "1" + tasksStr, ok := job["tasks"].(string) + if !ok { + tasksInt, _ := job["tasks"].(int) + tasksStr = strconv.Itoa(tasksInt) + } + + cpusStr, ok := job["cpus"].(string) + if !ok { + cpusInt, _ := job["cpus"].(int) + cpusStr = strconv.Itoa(cpusInt) + } + + tasks, _ := strconv.Atoi(tasksStr) + cpus, _ := strconv.Atoi(cpusStr) + if tasks > 0 { + cpusPerTask = strconv.Itoa(int(math.Round(float64(cpus) / float64(tasks)))) + } + + text := fmt.Sprintf(` + JobId=%v JobName=%v + UserId=%v(%v) GroupId=%v + Account=%v QOS=%v + Requeue=%v Restarts=%v BatchFlag=%v + TimeLimit=%v + SubmitTime=%v + Partition=%v + NodeList=%v + NumNodes=%v NumCPUs=%v NumTasks=%v CPUs/Task=%v + NTasksPerNode:Socket:Core=%v:%v:%v + TRES_req=%v + TRES_alloc=%v + Command=%v + WorkDir=%v + StdErr=%v + StdOut=%v`, + job["job_id"], job["name"], + job["user_name"], job["user_id"], job["group_id"], + job["account"], job["qos"], + job["requeue"], job["restart_cnt"], job["batch_flag"], + job["time_limit"], + time.Unix(int64(job["submit_time"].(float64)), 0).Format(time.RFC3339), + job["partition"], + job["nodes"], + job["node_count"], cpus, tasks, cpusPerTask, + job["tasks_per_node"], job["tasks_per_socket"], job["tasks_per_core"], + job["tres_req_str"], + job["tres_alloc_str"], + job["command"], + job["current_working_directory"], + job["standard_error"], + job["standard_output"], + ) + + return text +} + +func exitWithError(err error, output []byte) { + if exitError, ok := err.(*exec.ExitError); ok { + if exitError.ExitCode() == 28 { + fmt.Fprintf(os.Stderr, "ERROR: API call failed with timeout; check slurmrestd.\nOutput:\n%s\n", output) + } else { + fmt.Fprintf(os.Stderr, "ERROR: API call failed with code %d;\nOutput:\n%s\n", exitError.ExitCode(), output) + } + } else { + log.Errorf("ERROR:", err) + } + os.Exit(1) +} + +func loadClusterConfig(filename string) (map[string]interface{}, error) { + clusterConfigData := make(map[string]interface{}) + + file, err := os.Open(filename) + if err != nil { + log.Errorf("Cluster config file not found. No cores/GPU ids available.") + return clusterConfigData, err + } + defer file.Close() + + decoder := json.NewDecoder(file) + err = decoder.Decode(&clusterConfigData) + if err != nil { + log.Errorf("Error decoding cluster config file:", err) + } + + return clusterConfigData, err +} + +func (sd *SlurmRestScheduler) Init(rawConfig json.RawMessage) error { + clusterConfigData, err := loadClusterConfig("cluster-fritz.json") + + for k, v := range clusterConfigData { + switch c := v.(type) { + case string: + fmt.Printf("Item %q is a string, containing %q\n", k, c) + case float64: + fmt.Printf("Looks like item %q is a number, specifically %f\n", k, c) + default: + fmt.Printf("Not sure what type item %q is, but I think it might be %T\n", k, c) + } + } + + // Create an HTTP client + client = &http.Client{} + + return err } func (sd *SlurmRestScheduler) Sync() { - // jreq := client.SlurmApi.SlurmctldGetJobs(context.Background()) - // jobs, resp, err := client.SlurmApi.SlurmctldGetJobsExecute(jreq) - // if err != nil { - // log.Fatalf("FAIL: %s", err) - // } else if resp.StatusCode != 200 { - // log.Fatalf("Invalid status code: %d\n", resp.StatusCode) - // } - // for _, job := range jobs.GetJobs() { // fmt.Printf("Job %s - %s\n", job.GetJobId(), job.GetJobState()) // } + + latestJobs, err := queryAllJobs() + if err != nil { + log.Fatal(err.Error()) + } + for _, job := range latestJobs { + // check if each job in latestJobs has existed combination of (job_id, cluster_id, start_time) in JobRepository + jobs, err := sd.JobRepository.FindAll(&job.JobID, &job.Cluster, nil) + if err != nil && err != sql.ErrNoRows { + log.Errorf("checking for duplicate failed: %s", err.Error()) + } else if err == nil { + // should update the JobRepository at this point + } + } + }