First commit for swagger api documentation, not yet fully functional

- Problems with JWT auth (eg startJob fails with missing role)
- Manually fixed auto-parsing errors (missing api.TagJobApiRequest, wrong API @Description)
This commit is contained in:
Christoph Kluge
2022-09-15 12:37:44 +02:00
parent 8845bba90d
commit 8e90c954ff
7 changed files with 1700 additions and 30 deletions

View File

@@ -31,6 +31,22 @@ import (
"github.com/gorilla/mux"
)
// @title ClusterCockpit REST API
// @version 0.1.0
// @description API for batch job control.
// @termsOfService TODO
// @contact.name HPC-Support
// @contact.url TODO
// @contact.email hpc-support@fau.de
// @license.name MIT License
// @license.url https://opensource.org/licenses/MIT
// @host clustercockpit.localhost:8082
// @BasePath /api
// @securityDefinitions.apikey ApiKeyAuth
// @in header
// @name X-Auth-Token
// @description JWT based authentification for general API endpoint use.
type RestApi struct {
JobRepository *repository.JobRepository
Resolver *graph.Resolver
@@ -44,8 +60,8 @@ func (api *RestApi) MountRoutes(r *mux.Router) {
r.StrictSlash(true)
r.HandleFunc("/jobs/start_job/", api.startJob).Methods(http.MethodPost, http.MethodPut)
r.HandleFunc("/jobs/stop_job/", api.stopJob).Methods(http.MethodPost, http.MethodPut)
r.HandleFunc("/jobs/stop_job/{id}", api.stopJob).Methods(http.MethodPost, http.MethodPut)
r.HandleFunc("/jobs/stop_job/", api.stopJobByRequest).Methods(http.MethodPost, http.MethodPut)
r.HandleFunc("/jobs/stop_job/{id}", api.stopJobById).Methods(http.MethodPost, http.MethodPut)
// r.HandleFunc("/jobs/import/", api.importJob).Methods(http.MethodPost, http.MethodPut)
r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet)
@@ -68,25 +84,40 @@ func (api *RestApi) MountRoutes(r *mux.Router) {
}
}
// StartJobApiResponse model
// @Description Successful job start response with database id of new job.
type StartJobApiResponse struct {
// Database ID of new job
DBID int64 `json:"id"`
}
// StopJobApiRequest model
// @Description Request to stop running job using stop time and state.
// @Description Optional fields: JobId, ClusterId and StartTime.
// @Description They are only used if no database id was provided.
type StopJobApiRequest struct {
// JobId, ClusterId and StartTime are optional.
// They are only used if no database id was provided.
JobId *int64 `json:"jobId"`
Cluster *string `json:"cluster"`
StartTime *int64 `json:"startTime"`
// Payload
// Stop Time as Epoch
StopTime int64 `json:"stopTime"`
State schema.JobState `json:"jobState"`
State schema.JobState `json:"jobState"` // Final job state
JobId *int64 `json:"jobId"` // Job ID of job (Optional)
Cluster *string `json:"cluster"` // Cluster of job (Optional)
StartTime *int64 `json:"startTime"` // Start Time of job (Optional)
}
// ErrorResponse model
// @Description Error Response when using API.
type ErrorResponse struct {
// Statustext of Errorcode
Status string `json:"status"`
Error string `json:"error"`
Error string `json:"error"` // Error Message
}
// TagJobApiRequest model
// @Description Array of tag-objects for request payload
type TagJobApiRequest []*struct {
// Tag Name
Name string `json:"name"`
Type string `json:"type"` // Tag Type
}
func handleError(err error, statusCode int, rw http.ResponseWriter) {
@@ -105,12 +136,24 @@ func decode(r io.Reader, val interface{}) error {
return dec.Decode(val)
}
type TagJobApiRequest []*struct {
Name string `json:"name"`
Type string `json:"type"`
}
// Return a list of jobs
// getJobs godoc
// @Summary List all jobs
// @Description Get a list of all jobs. Filters can be applied using query parameters.
// @Tags jobs
// @Accept json
// @Produce json
// @Param state query string false "Job State" Enums(running, completed, failed, canceled, stopped, timeout)
// @Param cluster query string false "Job Cluster"
// @Param start-time query string false "Syntax: <from>-<to>, where <from> and <to> are unix timestamps in seconds"
// @Param page query int false "Page Number"
// @Param items-per-page query int false "Items per page"
// @Param with-metadata query bool false "Include metadata in response"
// @Success 200 {array} schema.Job "Array of jobs"
// @Failure 400 {object} api.ErrorResponse "Bad Request"
// @Security ApiKeyAuth
// @Router /jobs/ [get]
func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
withMetadata := false
filter := &model.JobFilter{}
@@ -220,7 +263,19 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
}
}
// Add a tag to a job
// tagJob godoc
// @Summary Add one or more tags to a job
// @Description Add one or more tags as array in request body to job specified by DB ID.
// @Tags jobs
// @Accept json
// @Produce json
// @Param id path int true "Job Database ID"
// @Param request body api.TagJobApiRequest true "Array of tag-objects to add"
// @Success 200 {object} schema.Job "Job resource"
// @Failure 400 {object} api.ErrorResponse "Bad Request"
// @Failure 404 {object} api.ErrorResponse "Job or tag does not exist"
// @Security ApiKeyAuth
// @Router /jobs/tag_job/{id} [post]
func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
iid, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64)
if err != nil {
@@ -265,8 +320,19 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
json.NewEncoder(rw).Encode(job)
}
// A new job started. The body should be in the `meta.json` format, but some fields required
// there are optional here (e.g. `jobState` defaults to "running").
// startJob godoc
// @Summary Add a newly started job
// @Description A new job started. The body should be in the `meta.json` format
// @Description but some fields required there are optional here (e.g. `jobState` defaults to "running").
// @Tags jobs
// @Accept json
// @Produce json
// @Param request body schema.Job true "Job to add"
// @Success 201 {object} api.StartJobApiResponse "Job added successfully"
// @Failure 400 {object} api.ErrorResponse "Bad Request"
// @Failure 422 {object} api.ErrorResponse "The combination of jobId, clusterId and startTime does already exist"
// @Security ApiKeyAuth
// @Router /jobs/start_job/ [post]
func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
@@ -321,14 +387,27 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
})
}
// A job has stopped and should be archived.
func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
// stopJobById godoc
// @Summary Mark job as stopped and trigger archiving
// @Description Job to stop is specified by database ID.
// @Description Only stopTime and final state are required in request body.
// @Tags jobs
// @Accept json
// @Produce json
// @Param id path int true "Database ID of Job"
// @Param request body api.StopJobApiRequest true "Required fields: [stopTime, state]"
// @Success 201 {object} schema.Job "Job resource"
// @Failure 400 {object} api.ErrorResponse "Bad Request"
// @Failure 404 {object} api.ErrorResponse "Resource not found"
// @Security ApiKeyAuth
// @Router /jobs/stop_job/{id} [post]
func (api *RestApi) stopJobById(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
return
}
// Parse request body
// Parse request body: Only StopTime and State
req := StopJobApiRequest{}
if err := decode(r.Body, &req); err != nil {
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
@@ -348,13 +427,114 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
job, err = api.JobRepository.FindById(id)
} else {
if req.JobId == nil {
handleError(errors.New("the field 'jobId' is required"), http.StatusBadRequest, rw)
handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw)
return
}
if err != nil {
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
return
}
// Sanity checks
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
handleError(errors.New("stopTime must be larger than startTime and only running jobs can be stopped"), http.StatusBadRequest, rw)
return
}
if req.State != "" && !req.State.Valid() {
handleError(fmt.Errorf("invalid job state: %#v", req.State), http.StatusBadRequest, rw)
return
} else {
req.State = schema.JobStateCompleted
}
// Mark job as stopped in the database (update state and duration)
job.Duration = int32(req.StopTime - job.StartTime.Unix())
job.State = req.State
if err := api.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
handleError(fmt.Errorf("marking job as stopped failed: %w", err), http.StatusInternalServerError, rw)
return
}
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime)
// Send a response (with status OK). This means that erros that happen from here on forward
// can *NOT* be communicated to the client. If reading from a MetricDataRepository or
// writing to the filesystem fails, the client will not know.
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(job)
// Monitoring is disabled...
if job.MonitoringStatus == schema.MonitoringStatusDisabled {
return
}
// We need to start a new goroutine as this functions needs to return
// for the response to be flushed to the client.
api.OngoingArchivings.Add(1) // So that a shutdown does not interrupt this goroutine.
go func() {
defer api.OngoingArchivings.Done()
if _, err := api.JobRepository.FetchMetadata(job); err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
api.JobRepository.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
return
}
job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and create meta.json/data.json files
jobMeta, err := metricdata.ArchiveJob(job, context.Background())
if err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
api.JobRepository.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
return
}
// Update the jobs database entry one last time:
if err := api.JobRepository.Archive(job.ID, schema.MonitoringStatusArchivingSuccessful, jobMeta.Statistics); err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
return
}
log.Printf("archiving job (dbid: %d) successful", job.ID)
}()
}
// stopJobByRequest godoc
// @Summary Mark job as stopped and trigger archiving
// @Description Job to stop is specified by request body.
// @Description All fields are required in request body.
// @Tags jobs
// @Accept json
// @Produce json
// @Param request body api.StopJobApiRequest true "All fields required"
// @Success 201 {object} schema.Job "Job resource"
// @Failure 400 {object} api.ErrorResponse "Bad Request"
// @Failure 404 {object} api.ErrorResponse "Resource not found"
// @Security ApiKeyAuth
// @Router /jobs/stop_job/ [post]
func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
return
}
// Parse request body
req := StopJobApiRequest{}
if err := decode(r.Body, &req); err != nil {
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
return
}
// Fetch job (that will be stopped) from db
var job *schema.Job
var err error
if req.JobId == nil {
handleError(errors.New("the field 'jobId' is required"), http.StatusBadRequest, rw)
return
}
job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
if err != nil {
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
return
@@ -447,6 +627,7 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
// rw.Write([]byte(`{ "status": "OK" }`))
// }
func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
metrics := r.URL.Query()["metric"]
@@ -489,6 +670,9 @@ func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
})
}
func (api *RestApi) getJWT(rw http.ResponseWriter, r *http.Request) {
rw.Header().Set("Content-Type", "text/plain")
username := r.FormValue("username")
@@ -603,6 +787,10 @@ func (api *RestApi) updateConfiguration(rw http.ResponseWriter, r *http.Request)
rw.Write([]byte("success"))
}
func (api *RestApi) putMachineState(rw http.ResponseWriter, r *http.Request) {
if api.MachineStateDir == "" {
http.Error(rw, "not enabled", http.StatusNotFound)