From 1a9f67fa283cad5edd31db8c36859df535ccbf9d Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Mon, 17 Jan 2022 13:27:40 +0100 Subject: [PATCH] new /api/jobs/ REST endpoint --- api/openapi.yaml | 372 +++++++++++++++++++++++++---------------------- api/rest.go | 67 ++++++++- 2 files changed, 263 insertions(+), 176 deletions(-) diff --git a/api/openapi.yaml b/api/openapi.yaml index 38e7f0a..1bd154d 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -1,171 +1,203 @@ -# -# ClusterCockpit's API spec can be exported via: -# docker exec -it cc-php php bin/console api:openapi:export --yaml -# -# This spec is written by hand and hopefully up to date with the API. -# - -openapi: 3.0.3 -info: - title: 'ClusterCockpit REST API' - description: 'API for batch job control' - version: 0.0.2 -servers: - - url: / - description: '' -paths: - '/api/jobs/{id}': - get: - operationId: 'getJob' - summary: 'Get job resource' - parameters: - - name: id - in: path - required: true - schema: { type: integer } - description: 'Database ID (Resource Identifier)' - responses: - 200: - description: 'Job resource' - content: - 'application/json': - schema: - $ref: '#/components/schemas/Job' - 404: - description: 'Resource not found' - '/api/jobs/tag_job/{id}': - post: - operationId: 'tagJob' - summary: 'Add a tag to a job' - parameters: - - name: id - in: path - required: true - schema: { type: integer } - description: 'Job ID' - requestBody: - description: 'Array of tags to add' - required: true - content: - 'application/json': - schema: - type: array - items: - $ref: '#/components/schemas/Tag' - responses: - 200: - description: 'Job resource' - content: - 'application/json': - schema: - $ref: '#/components/schemas/Job' - 404: - description: 'Job or tag does not exist' - 400: - description: 'Bad request' - '/api/jobs/start_job/': - post: - operationId: 'startJob' - summary: 'Add a newly started job' - requestBody: - required: true - content: - 'application/json': - schema: - $ref: '#/components/schemas/Job' - responses: - 201: - description: 'Job successfully' - content: - 'application/json': - schema: - type: object - properties: - id: - type: integer - description: 'The database ID assigned to this job' - 400: - description: 'Bad request' - 422: - description: 'The combination of jobId, clusterId and startTime does already exist' - '/api/jobs/stop_job/': - post: - operationId: stopJobViaJobID - summary: 'Mark a job as stopped. Which job to stop is specified by the request body.' - requestBody: - required: true - content: - 'application/json': - schema: - type: object - required: [jobId, cluster, startTime, stopTime] - properties: - jobId: { type: integer } - cluster: { type: string } - startTime: { type: integer } - stopTime: { type: integer } - responses: - 200: - description: 'Job resource' - content: - 'application/json': - schema: - $ref: '#/components/schemas/Job' - 400: - description: 'Bad request' - 404: - description: 'Resource not found' - '/api/jobs/stop_job/{id}': - post: - operationId: 'stopJobViaDBID' - summary: 'Mark a job as stopped.' - parameters: - - name: id - in: path - required: true - schema: { type: integer } - description: 'Database ID (Resource Identifier)' - requestBody: - required: true - content: - 'application/json': - schema: - type: object - required: [stopTime] - properties: - stopTime: { type: integer } - responses: - 200: - description: 'Job resource' - content: - 'application/json': - schema: - $ref: '#/components/schemas/Job' - 400: - description: 'Bad request' - 404: - description: 'Resource not found' -components: - schemas: - Tag: - description: 'A job tag' - type: object - properties: - id: - type: string - description: 'Database ID' - type: - type: string - description: 'Tag type' - name: - type: string - description: 'Tag name' - Job: - $ref: https://raw.githubusercontent.com/ClusterCockpit/cc-specifications/master/schema/json/job-meta.schema.json - securitySchemes: - bearerAuth: - type: http - scheme: bearer - bearerFormat: JWT -security: +# +# ClusterCockpit's API spec can be exported via: +# docker exec -it cc-php php bin/console api:openapi:export --yaml +# +# This spec is written by hand and hopefully up to date with the API. +# + +openapi: 3.0.3 +info: + title: 'ClusterCockpit REST API' + description: 'API for batch job control' + version: 0.0.2 +servers: + - url: / + description: '' +paths: + '/api/jobs/': + get: + operationId: 'getJobs' + summary: 'List all jobs' + description: 'Get a list of all jobs in the JSON schema defined via GraphQL (main difference: start-time is RFC3339 encoded and there are no statistics). Filters can be applied using query parameters.' + parameters: + - name: state + in: query + schema: + type: string + enum: ["running", "completed", "failed", "canceled", "stopped", "timeout"] + - name: cluster + in: query + schema: { type: string } + responses: + 200: + description: 'Array of jobs' + content: + 'application/json': + schema: + type: array + items: + # Not totally true: start-time is a RFC3339 string here! + $ref: '#/components/schemas/Job' + 400: + description: 'Bad Request' + '/api/jobs/{id}': + get: + operationId: 'getJob' + summary: 'Get job resource' + parameters: + - name: id + in: path + required: true + schema: { type: integer } + description: 'Database ID (Resource Identifier)' + responses: + 200: + description: 'Job resource' + content: + 'application/json': + schema: + $ref: '#/components/schemas/Job' + 404: + description: 'Resource not found' + '/api/jobs/tag_job/{id}': + post: + operationId: 'tagJob' + summary: 'Add a tag to a job' + parameters: + - name: id + in: path + required: true + schema: { type: integer } + description: 'Job ID' + requestBody: + description: 'Array of tags to add' + required: true + content: + 'application/json': + schema: + type: array + items: + $ref: '#/components/schemas/Tag' + responses: + 200: + description: 'Job resource' + content: + 'application/json': + schema: + $ref: '#/components/schemas/Job' + 404: + description: 'Job or tag does not exist' + 400: + description: 'Bad request' + '/api/jobs/start_job/': + post: + operationId: 'startJob' + summary: 'Add a newly started job' + requestBody: + required: true + content: + 'application/json': + schema: + $ref: '#/components/schemas/Job' + responses: + 201: + description: 'Job successfully' + content: + 'application/json': + schema: + type: object + properties: + id: + type: integer + description: 'The database ID assigned to this job' + 400: + description: 'Bad request' + 422: + description: 'The combination of jobId, clusterId and startTime does already exist' + '/api/jobs/stop_job/': + post: + operationId: stopJobViaJobID + summary: 'Mark a job as stopped. Which job to stop is specified by the request body.' + requestBody: + required: true + content: + 'application/json': + schema: + type: object + required: [jobId, cluster, stopTime, jobState] + properties: + jobId: { type: integer } + cluster: { type: string } + startTime: { type: integer } + stopTime: { type: integer } + jobState: + type: string + enum: ["running", "completed", "failed", "canceled", "stopped", "timeout"] + responses: + 200: + description: 'Job resource' + content: + 'application/json': + schema: + $ref: '#/components/schemas/Job' + 400: + description: 'Bad request' + 404: + description: 'Resource not found' + '/api/jobs/stop_job/{id}': + post: + operationId: 'stopJobViaDBID' + summary: 'Mark a job as stopped.' + parameters: + - name: id + in: path + required: true + schema: { type: integer } + description: 'Database ID (Resource Identifier)' + requestBody: + required: true + content: + 'application/json': + schema: + type: object + required: [stopTime, jobState] + properties: + stopTime: { type: integer } + jobState: + type: string + enum: ["running", "completed", "failed", "canceled", "stopped", "timeout"] + responses: + 200: + description: 'Job resource' + content: + 'application/json': + schema: + $ref: '#/components/schemas/Job' + 400: + description: 'Bad request' + 404: + description: 'Resource not found' +components: + schemas: + Tag: + description: 'A job tag' + type: object + properties: + id: + type: string + description: 'Database ID' + type: + type: string + description: 'Tag type' + name: + type: string + description: 'Tag name' + Job: + $ref: https://raw.githubusercontent.com/ClusterCockpit/cc-specifications/master/schema/json/job-meta.schema.json + securitySchemes: + bearerAuth: + type: http + scheme: bearer + bearerFormat: JWT +security: - bearerAuth: [] # Applies `bearerAuth` globally \ No newline at end of file diff --git a/api/rest.go b/api/rest.go index 725aa84..9526252 100644 --- a/api/rest.go +++ b/api/rest.go @@ -1,6 +1,7 @@ package api import ( + "bufio" "context" "encoding/json" "fmt" @@ -13,6 +14,7 @@ import ( "github.com/ClusterCockpit/cc-jobarchive/config" "github.com/ClusterCockpit/cc-jobarchive/graph" + "github.com/ClusterCockpit/cc-jobarchive/graph/model" "github.com/ClusterCockpit/cc-jobarchive/metricdata" "github.com/ClusterCockpit/cc-jobarchive/schema" sq "github.com/Masterminds/squirrel" @@ -36,11 +38,14 @@ func (api *RestApi) MountRoutes(r *mux.Router) { r.HandleFunc("/jobs/stop_job/", api.stopJob).Methods(http.MethodPost, http.MethodPut) r.HandleFunc("/jobs/stop_job/{id}", api.stopJob).Methods(http.MethodPost, http.MethodPut) + r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet) r.HandleFunc("/jobs/{id}", api.getJob).Methods(http.MethodGet) r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch) - r.HandleFunc("/machine_state/{cluster}/{host}", api.getMachineState).Methods(http.MethodGet) - r.HandleFunc("/machine_state/{cluster}/{host}", api.putMachineState).Methods(http.MethodPut, http.MethodPost) + if api.MachineStateDir != "" { + r.HandleFunc("/machine_state/{cluster}/{host}", api.getMachineState).Methods(http.MethodGet) + r.HandleFunc("/machine_state/{cluster}/{host}", api.putMachineState).Methods(http.MethodPut, http.MethodPost) + } } type StartJobApiRespone struct { @@ -50,7 +55,7 @@ type StartJobApiRespone struct { type StopJobApiRequest struct { // JobId, ClusterId and StartTime are optional. // They are only used if no database id was provided. - JobId *string `json:"jobId"` + JobId *int64 `json:"jobId"` Cluster *string `json:"cluster"` StartTime *int64 `json:"startTime"` @@ -64,6 +69,44 @@ type TagJobApiRequest []*struct { Type string `json:"type"` } +// Return a list of jobs +func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { + filter := model.JobFilter{} + for key, vals := range r.URL.Query() { + switch key { + case "state": + for _, s := range vals { + state := schema.JobState(s) + if !state.Valid() { + http.Error(rw, "invalid query parameter value: state", http.StatusBadRequest) + return + } + filter.State = append(filter.State, state) + } + case "cluster": + filter.Cluster = &model.StringInput{Eq: &vals[0]} + default: + http.Error(rw, "invalid query parameter: "+key, http.StatusBadRequest) + return + } + } + + results, err := api.Resolver.Query().Jobs(r.Context(), []*model.JobFilter{&filter}, nil, nil) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + bw := bufio.NewWriter(rw) + defer bw.Flush() + + if err := json.NewEncoder(bw).Encode(results.Items); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } +} + +// Return a single job func (api *RestApi) getJob(rw http.ResponseWriter, r *http.Request) { id := mux.Vars(r)["id"] @@ -84,6 +127,7 @@ func (api *RestApi) getJob(rw http.ResponseWriter, r *http.Request) { json.NewEncoder(rw).Encode(job) } +// Add a tag to a job func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) { id := mux.Vars(r)["id"] job, err := api.Resolver.Query().Job(r.Context(), id) @@ -130,6 +174,8 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) { json.NewEncoder(rw).Encode(job) } +// A new job started. The body should be in the `meta.json` format, but some fields required +// there are optional here (e.g. `jobState` defaults to "running"). func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { req := schema.JobMeta{BaseJob: schema.JobDefaults} if err := json.NewDecoder(r.Body).Decode(&req); err != nil { @@ -142,6 +188,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { return } + // TODO: Do more such checks, be smarter with them. if len(req.Resources) == 0 || len(req.User) == 0 || req.NumNodes == 0 { http.Error(rw, "required fields are missing", http.StatusBadRequest) return @@ -193,6 +240,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { }) } +// A job has stopped and should be archived. func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) { req := StopJobApiRequest{} if err := json.NewDecoder(r.Body).Decode(&req); err != nil { @@ -207,10 +255,13 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) { if ok { sql, args, err = sq.Select(schema.JobColumns...).From("job").Where("job.id = ?", id).ToSql() } else { - sql, args, err = sq.Select(schema.JobColumns...).From("job"). + qb := sq.Select(schema.JobColumns...).From("job"). Where("job.job_id = ?", req.JobId). - Where("job.cluster = ?", req.Cluster). - Where("job.start_time = ?", req.StartTime).ToSql() + Where("job.cluster = ?", req.Cluster) + if req.StartTime != nil { + qb = qb.Where("job.start_time = ?", *req.StartTime) + } + sql, args, err = qb.ToSql() } if err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) @@ -234,6 +285,10 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) { req.State = schema.JobStateCompleted } + // This closure does the real work. It needs to be its own + // function so that it can be done in the background. + // TODO: Throttle/Have a max. number or parallel archivngs + // or use a long-running goroutine receiving jobs by a channel. doArchiving := func(job *schema.Job, ctx context.Context) error { api.OngoingArchivings.Add(1) defer api.OngoingArchivings.Done()