Added code handling OpenAPI structs

This commit is contained in:
Bole Ma 2023-09-05 23:26:34 +02:00
parent a43d03457b
commit feeb0231e7
2 changed files with 137 additions and 11 deletions

View File

@ -14,9 +14,11 @@ import (
"os"
"os/exec"
"strconv"
"sync"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
// A Response struct to map the Entire Response
@ -37,7 +39,7 @@ var client *http.Client
func queryDB(qtime int64, clusterName string) ([]interface{}, error) {
apiEndpoint := "/slurmdb/v0.0.39/jobs"
apiEndpoint := "/slurmdb/v0.0.38/jobs"
// Construct the query parameters
queryParams := url.Values{}
@ -95,7 +97,7 @@ func queryDB(qtime int64, clusterName string) ([]interface{}, error) {
func queryAllJobs() ([]interface{}, error) {
var ctlOutput []byte
apiEndpoint := "/slurm/v0.0.39/jobs"
apiEndpoint := "/slurm/v0.0.38/jobs"
// Create a new HTTP GET request with query parameters
req, err := http.NewRequest("GET", apiEndpoint, nil)
if err != nil {
@ -177,7 +179,7 @@ func printSlurmInfo(job map[string]interface{}) string {
job["account"], job["qos"],
job["requeue"], job["restart_cnt"], job["batch_flag"],
job["time_limit"],
time.Unix(int64(job["submit_time"].(float64)), 0).Format(time.RFC3339),
time.Unix(int64(job["submit_time"].(float64)), 0).Format(time.RFC3338),
job["partition"],
job["nodes"],
job["node_count"], cpus, tasks, cpusPerTask,
@ -250,17 +252,96 @@ func (sd *SlurmRestScheduler) Sync() {
// fmt.Printf("Job %s - %s\n", job.GetJobId(), job.GetJobState())
// }
latestJobs, err := queryAllJobs()
jobsResponse, err := queryAllJobs()
if err != nil {
log.Fatal(err.Error())
}
for _, job := range latestJobs {
// check if each job in latestJobs has existed combination of (job_id, cluster_id, start_time) in JobRepository
jobs, err := sd.JobRepository.FindAll(&job.JobID, &job.Cluster, nil)
if err != nil && err != sql.ErrNoRows {
log.Errorf("checking for duplicate failed: %s", err.Error())
} else if err == nil {
// should update the JobRepository at this point
// Fetch an example instance of V0037JobsResponse
// jobsResponse := V0037JobsResponse{}
// Iterate over the Jobs slice
for _, job := range jobsResponse.Jobs {
// Process each job
fmt.Printf("Job ID: %s\n", job.JobID)
fmt.Printf("Job Name: %s\n", *job.Name)
fmt.Printf("Job State: %s\n", *job.JobState)
fmt.Println("Job StartTime:", *job.StartTime)
// is aquire lock to avoid race condition between API calls needed?
// aquire lock to avoid race condition between API calls
var unlockOnce sync.Once
sd.RepositoryMutex.Lock()
defer unlockOnce.Do(sd.RepositoryMutex.Unlock)
// is "running" one of JSON state?
if *job.JobState == "running" {
// Check if combination of (job_id, cluster_id, start_time) already exists:
jobs, err := sd.JobRepository.FindAll(job.JobID, &job.Cluster, job.StartTime)
if err != nil || err != sql.ErrNoRows {
log.Errorf("checking for duplicate failed: %s", err.Error())
return
} else if err == nil {
if len(jobs) == 0 {
var defaultJob schema.BaseJob = schema.BaseJob{
JobID: job.JobID,
User: job.User,
Project: job.Project,
Cluster: job.Cluster,
SubCluster: job.SubCluster,
Partition: job.Partition,
ArrayJobId: job.ArrayJobId,
NumNodes: job.NumNodes,
NumHWThreads: job.NumHWThreads,
NumAcc: job.NumAcc,
Exclusive: job.Exclusive,
MonitoringStatus: job.MonitoringStatus,
SMT: job.SMT,
State: job.State,
Duration: job.Duration,
Walltime: job.Walltime,
Tags: job.Tags,
RawResources: job.RawResources,
Resources: job.Resources,
RawMetaData: job.RawMetaData,
MetaData: job.MetaData,
ConcurrentJobs: job.ConcurrentJobs,
}
req := &schema.JobMeta{
BaseJob: defaultJob,
StartTime: job.StartTime,
Statistics: make(map[string]schema.JobStatistics),
}
// req := new(schema.JobMeta)
id, err := sd.JobRepository.Start(req)
} else {
for _, job := range jobs {
log.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID)
}
}
}
} else {
// Check if completed job with combination of (job_id, cluster_id, start_time) already exists:
existingJob, err := sd.JobRepository.Find(job.JobID, &job.Cluster, job.StartTime)
if err == nil {
existingJob.BaseJob.Duration = job.EndTime - job.StartTime
existingJob.BaseJob.State = job.State
existingJob.BaseJob.Walltime = job.StartTime
req := &StopJobRequest{
Cluster: job.Cluster,
JobId: job.JobId,
State: job.State,
StartTime: existingJob.StartTime,
StopTime: job.StartTime,
}
// req := new(schema.JobMeta)
id, err := sd.JobRepository.checkAndHandleStopJob(job, req)
}
}
}

View File

@ -7,7 +7,9 @@ package main
import (
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"github.com/ClusterCockpit/cc-backend/internal/scheduler"
@ -72,11 +74,54 @@ func setupPublisher() {
os.Exit(0)
}
func injectPayload() {
// Read the JSON file
jobsData, err := ioutil.ReadFile("slurm_0038.json")
dbData, err := ioutil.ReadFile("slurmdb_0038-large.json")
if err != nil {
fmt.Println("Error reading JSON file:", err)
return
}
// Create an HTTP handler function
http.HandleFunc("/slurm/v0.0.38/jobs", func(w http.ResponseWriter, r *http.Request) {
// Set the response content type to JSON
w.Header().Set("Content-Type", "application/json")
// Write the raw JSON data to the response writer
_, err := w.Write(jobsData)
if err != nil {
http.Error(w, "Error writing jobsData payload", http.StatusInternalServerError)
return
}
})
http.HandleFunc("/slurmdb/v0.0.38/jobs", func(w http.ResponseWriter, r *http.Request) {
// Set the response content type to JSON
w.Header().Set("Content-Type", "application/json")
// Write the raw JSON data to the response writer
_, err := w.Write(dbData)
if err != nil {
http.Error(w, "Error writing dbData payload", http.StatusInternalServerError)
return
}
})
// Start the HTTP server on port 8080
fmt.Println("Listening on :8080...")
http.ListenAndServe(":8080", nil)
}
func main() {
cfgData := []byte(`{"target": "localhost"}`)
var sch scheduler.SlurmNatsScheduler
// sch.URL = "nats://127.0.0.1:1223"
sch.Init(cfgData)
// go injectPayload()
os.Exit(0)
}