mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2024-12-25 21:09:05 +01:00
Merge branch 'master' of github.com:ClusterCockpit/cc-backend
This commit is contained in:
commit
0cf78e36ef
51
api/rest.go
51
api/rest.go
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@ -68,6 +69,21 @@ type StopJobApiRequest struct {
|
||||
State schema.JobState `json:"jobState"`
|
||||
}
|
||||
|
||||
type ErrorResponse struct {
|
||||
Status string `json:"status"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
func handleError(err error, statusCode int, rw http.ResponseWriter) {
|
||||
log.Printf("REST API error: %s", err.Error())
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
rw.WriteHeader(statusCode)
|
||||
json.NewEncoder(rw).Encode(ErrorResponse{
|
||||
Status: http.StatusText(statusCode),
|
||||
Error: err.Error(),
|
||||
})
|
||||
}
|
||||
|
||||
type TagJobApiRequest []*struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
@ -175,37 +191,36 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
|
||||
// there are optional here (e.g. `jobState` defaults to "running").
|
||||
func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
|
||||
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
||||
log.Warnf("user '%s' used /api/jobs/start_job/ without having the API role", user.Username)
|
||||
http.Error(rw, "missing 'api' role", http.StatusForbidden)
|
||||
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
||||
return
|
||||
}
|
||||
|
||||
req := schema.JobMeta{BaseJob: schema.JobDefaults}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
if config.GetClusterConfig(req.Cluster) == nil || config.GetPartition(req.Cluster, req.Partition) == nil {
|
||||
http.Error(rw, fmt.Sprintf("cluster %#v or partition %#v does not exist", req.Cluster, req.Partition), http.StatusBadRequest)
|
||||
handleError(fmt.Errorf("cluster or partition does not exist: %#v/%#v", req.Cluster, req.Partition), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: Do more such checks, be smarter with them.
|
||||
if len(req.Resources) == 0 || len(req.User) == 0 || req.NumNodes == 0 {
|
||||
http.Error(rw, "required fields are missing", http.StatusBadRequest)
|
||||
handleError(errors.New("the fields 'resources', 'user' and 'numNodes' are required"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if combination of (job_id, cluster_id, start_time) already exists:
|
||||
job, err := api.JobRepository.Find(req.JobID, req.Cluster, req.StartTime)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
handleError(fmt.Errorf("checking for duplicate failed: %w", err), http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
|
||||
if err != sql.ErrNoRows {
|
||||
http.Error(rw, fmt.Sprintf("a job with that job_id, cluster_id and start_time already exists (database id: %d)", job.ID), http.StatusUnprocessableEntity)
|
||||
handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID), http.StatusUnprocessableEntity, rw)
|
||||
return
|
||||
}
|
||||
|
||||
@ -215,20 +230,20 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
|
||||
|
||||
req.RawResources, err = json.Marshal(req.Resources)
|
||||
if err != nil {
|
||||
http.Error(rw, "while parsing resources: "+err.Error(), http.StatusBadRequest)
|
||||
handleError(fmt.Errorf("basically impossible: %w", err), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
id, err := api.JobRepository.Start(&req)
|
||||
if err != nil {
|
||||
log.Errorf("insert into job table failed: %s", err.Error())
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
|
||||
for _, tag := range req.Tags {
|
||||
if _, err := api.JobRepository.AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
handleError(fmt.Errorf("adding tag to new job %d failed: %w", id, err), http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -244,13 +259,13 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
|
||||
// A job has stopped and should be archived.
|
||||
func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
|
||||
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
||||
http.Error(rw, "Missing 'api' role", http.StatusForbidden)
|
||||
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
|
||||
return
|
||||
}
|
||||
|
||||
req := StopJobApiRequest{}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
@ -260,14 +275,14 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
|
||||
if ok {
|
||||
id, e := strconv.ParseInt(id, 10, 64)
|
||||
if e != nil {
|
||||
http.Error(rw, e.Error(), http.StatusBadRequest)
|
||||
handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
job, err = api.JobRepository.FindById(id)
|
||||
} else {
|
||||
if req.JobId == nil || req.Cluster == nil || req.StartTime == nil {
|
||||
http.Error(rw, "'jobId', 'cluster' and 'startTime' are required", http.StatusBadRequest)
|
||||
handleError(errors.New("the fields 'jobId', 'cluster' and 'startTime' are required"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
@ -275,17 +290,17 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
||||
return
|
||||
}
|
||||
|
||||
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
|
||||
http.Error(rw, "stop_time must be larger than start_time and only running jobs can be stopped", http.StatusBadRequest)
|
||||
handleError(errors.New("stopTime must be larger than startTime and only running jobs can be stopped"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
if req.State != "" && !req.State.Valid() {
|
||||
http.Error(rw, fmt.Sprintf("invalid job state: '%s'", req.State), http.StatusBadRequest)
|
||||
handleError(fmt.Errorf("invalid job state: %#v", req.State), http.StatusBadRequest, rw)
|
||||
return
|
||||
} else {
|
||||
req.State = schema.JobStateCompleted
|
||||
@ -320,7 +335,7 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
|
||||
} else {
|
||||
err := doArchiving(job, r.Context())
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
handleError(fmt.Errorf("archiving failed: %w", err), http.StatusInternalServerError, rw)
|
||||
} else {
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
|
Loading…
Reference in New Issue
Block a user