diff --git a/internal/scheduler/slurmRest.go b/internal/scheduler/slurmRest.go index 2b2ef3b..5dc0459 100644 --- a/internal/scheduler/slurmRest.go +++ b/internal/scheduler/slurmRest.go @@ -14,9 +14,11 @@ import ( "os" "os/exec" "strconv" + "sync" "time" "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" ) // A Response struct to map the Entire Response @@ -37,7 +39,7 @@ var client *http.Client func queryDB(qtime int64, clusterName string) ([]interface{}, error) { - apiEndpoint := "/slurmdb/v0.0.39/jobs" + apiEndpoint := "/slurmdb/v0.0.38/jobs" // Construct the query parameters queryParams := url.Values{} @@ -95,7 +97,7 @@ func queryDB(qtime int64, clusterName string) ([]interface{}, error) { func queryAllJobs() ([]interface{}, error) { var ctlOutput []byte - apiEndpoint := "/slurm/v0.0.39/jobs" + apiEndpoint := "/slurm/v0.0.38/jobs" // Create a new HTTP GET request with query parameters req, err := http.NewRequest("GET", apiEndpoint, nil) if err != nil { @@ -177,7 +179,7 @@ func printSlurmInfo(job map[string]interface{}) string { job["account"], job["qos"], job["requeue"], job["restart_cnt"], job["batch_flag"], job["time_limit"], - time.Unix(int64(job["submit_time"].(float64)), 0).Format(time.RFC3339), + time.Unix(int64(job["submit_time"].(float64)), 0).Format(time.RFC3338), job["partition"], job["nodes"], job["node_count"], cpus, tasks, cpusPerTask, @@ -250,17 +252,96 @@ func (sd *SlurmRestScheduler) Sync() { // fmt.Printf("Job %s - %s\n", job.GetJobId(), job.GetJobState()) // } - latestJobs, err := queryAllJobs() + jobsResponse, err := queryAllJobs() if err != nil { log.Fatal(err.Error()) } - for _, job := range latestJobs { - // check if each job in latestJobs has existed combination of (job_id, cluster_id, start_time) in JobRepository - jobs, err := sd.JobRepository.FindAll(&job.JobID, &job.Cluster, nil) - if err != nil && err != sql.ErrNoRows { - log.Errorf("checking for duplicate failed: %s", err.Error()) - } else if err == nil { - // should update the JobRepository at this point + + // Fetch an example instance of V0037JobsResponse + // jobsResponse := V0037JobsResponse{} + + // Iterate over the Jobs slice + for _, job := range jobsResponse.Jobs { + // Process each job + fmt.Printf("Job ID: %s\n", job.JobID) + fmt.Printf("Job Name: %s\n", *job.Name) + fmt.Printf("Job State: %s\n", *job.JobState) + fmt.Println("Job StartTime:", *job.StartTime) + + // is aquire lock to avoid race condition between API calls needed? + + // aquire lock to avoid race condition between API calls + var unlockOnce sync.Once + sd.RepositoryMutex.Lock() + defer unlockOnce.Do(sd.RepositoryMutex.Unlock) + + // is "running" one of JSON state? + if *job.JobState == "running" { + + // Check if combination of (job_id, cluster_id, start_time) already exists: + jobs, err := sd.JobRepository.FindAll(job.JobID, &job.Cluster, job.StartTime) + + if err != nil || err != sql.ErrNoRows { + log.Errorf("checking for duplicate failed: %s", err.Error()) + return + } else if err == nil { + if len(jobs) == 0 { + var defaultJob schema.BaseJob = schema.BaseJob{ + JobID: job.JobID, + User: job.User, + Project: job.Project, + Cluster: job.Cluster, + SubCluster: job.SubCluster, + Partition: job.Partition, + ArrayJobId: job.ArrayJobId, + NumNodes: job.NumNodes, + NumHWThreads: job.NumHWThreads, + NumAcc: job.NumAcc, + Exclusive: job.Exclusive, + MonitoringStatus: job.MonitoringStatus, + SMT: job.SMT, + State: job.State, + Duration: job.Duration, + Walltime: job.Walltime, + Tags: job.Tags, + RawResources: job.RawResources, + Resources: job.Resources, + RawMetaData: job.RawMetaData, + MetaData: job.MetaData, + ConcurrentJobs: job.ConcurrentJobs, + } + req := &schema.JobMeta{ + BaseJob: defaultJob, + StartTime: job.StartTime, + Statistics: make(map[string]schema.JobStatistics), + } + // req := new(schema.JobMeta) + id, err := sd.JobRepository.Start(req) + } else { + for _, job := range jobs { + log.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID) + } + } + } + } else { + // Check if completed job with combination of (job_id, cluster_id, start_time) already exists: + existingJob, err := sd.JobRepository.Find(job.JobID, &job.Cluster, job.StartTime) + + if err == nil { + existingJob.BaseJob.Duration = job.EndTime - job.StartTime + existingJob.BaseJob.State = job.State + existingJob.BaseJob.Walltime = job.StartTime + req := &StopJobRequest{ + Cluster: job.Cluster, + JobId: job.JobId, + State: job.State, + StartTime: existingJob.StartTime, + StopTime: job.StartTime, + } + // req := new(schema.JobMeta) + id, err := sd.JobRepository.checkAndHandleStopJob(job, req) + } + } } diff --git a/tools/nats-manager/main.go b/tools/nats-manager/main.go index d27bbab..ddf1c43 100644 --- a/tools/nats-manager/main.go +++ b/tools/nats-manager/main.go @@ -7,7 +7,9 @@ package main import ( "flag" "fmt" + "io/ioutil" "log" + "net/http" "os" "github.com/ClusterCockpit/cc-backend/internal/scheduler" @@ -72,11 +74,54 @@ func setupPublisher() { os.Exit(0) } +func injectPayload() { + // Read the JSON file + jobsData, err := ioutil.ReadFile("slurm_0038.json") + dbData, err := ioutil.ReadFile("slurmdb_0038-large.json") + + if err != nil { + fmt.Println("Error reading JSON file:", err) + return + } + + // Create an HTTP handler function + http.HandleFunc("/slurm/v0.0.38/jobs", func(w http.ResponseWriter, r *http.Request) { + // Set the response content type to JSON + w.Header().Set("Content-Type", "application/json") + + // Write the raw JSON data to the response writer + _, err := w.Write(jobsData) + if err != nil { + http.Error(w, "Error writing jobsData payload", http.StatusInternalServerError) + return + } + }) + + http.HandleFunc("/slurmdb/v0.0.38/jobs", func(w http.ResponseWriter, r *http.Request) { + // Set the response content type to JSON + w.Header().Set("Content-Type", "application/json") + + // Write the raw JSON data to the response writer + _, err := w.Write(dbData) + if err != nil { + http.Error(w, "Error writing dbData payload", http.StatusInternalServerError) + return + } + }) + + // Start the HTTP server on port 8080 + fmt.Println("Listening on :8080...") + http.ListenAndServe(":8080", nil) +} + func main() { cfgData := []byte(`{"target": "localhost"}`) var sch scheduler.SlurmNatsScheduler // sch.URL = "nats://127.0.0.1:1223" sch.Init(cfgData) + + // go injectPayload() + os.Exit(0) }