
511 lines
15 KiB
Raw Normal View History

package api
import (
2022-01-17 13:27:40 +01:00
2022-02-07 14:56:46 +01:00
2022-02-14 17:31:51 +01:00
2022-01-07 09:39:00 +01:00
2022-01-07 09:39:00 +01:00
2022-02-07 09:57:06 +01:00
2022-01-27 09:40:59 +01:00
2022-01-27 10:35:26 +01:00
2022-01-27 09:40:59 +01:00
2022-02-06 09:48:31 +01:00
2022-01-27 09:40:59 +01:00
type RestApi struct {
2022-02-07 07:09:47 +01:00
JobRepository *repository.JobRepository
Resolver *graph.Resolver
MachineStateDir string
OngoingArchivings sync.WaitGroup
func (api *RestApi) MountRoutes(r *mux.Router) {
r = r.PathPrefix("/api").Subrouter()
r.HandleFunc("/jobs/start_job/", api.startJob).Methods(http.MethodPost, http.MethodPut)
r.HandleFunc("/jobs/stop_job/", api.stopJob).Methods(http.MethodPost, http.MethodPut)
r.HandleFunc("/jobs/stop_job/{id}", api.stopJob).Methods(http.MethodPost, http.MethodPut)
2022-02-25 11:04:34 +01:00
r.HandleFunc("/jobs/import/", api.importJob).Methods(http.MethodPost, http.MethodPut)
2022-01-07 09:39:00 +01:00
2022-01-17 13:27:40 +01:00
r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet)
// r.HandleFunc("/jobs/{id}", api.getJob).Methods(http.MethodGet)
r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch)
2022-01-27 09:29:53 +01:00
r.HandleFunc("/jobs/metrics/{id}", api.getJobMetrics).Methods(http.MethodGet)
2022-01-17 13:27:40 +01:00
if api.MachineStateDir != "" {
r.HandleFunc("/machine_state/{cluster}/{host}", api.getMachineState).Methods(http.MethodGet)
r.HandleFunc("/machine_state/{cluster}/{host}", api.putMachineState).Methods(http.MethodPut, http.MethodPost)
2022-02-06 09:48:31 +01:00
type StartJobApiResponse struct {
DBID int64 `json:"id"`
type StopJobApiRequest struct {
// JobId, ClusterId and StartTime are optional.
// They are only used if no database id was provided.
2022-01-17 13:27:40 +01:00
JobId *int64 `json:"jobId"`
2021-12-20 10:48:58 +01:00
Cluster *string `json:"cluster"`
StartTime *int64 `json:"startTime"`
// Payload
2021-12-17 15:49:22 +01:00
StopTime int64 `json:"stopTime"`
State schema.JobState `json:"jobState"`
2022-02-14 17:31:51 +01:00
type ErrorResponse struct {
Status string `json:"status"`
Error string `json:"error"`
func handleError(err error, statusCode int, rw http.ResponseWriter) {
log.Printf("REST API error: %s", err.Error())
rw.Header().Add("Content-Type", "application/json")
Status: http.StatusText(statusCode),
Error: err.Error(),
func decode(r io.Reader, val interface{}) error {
dec := json.NewDecoder(r)
return dec.Decode(val)
type TagJobApiRequest []*struct {
Name string `json:"name"`
Type string `json:"type"`
2022-01-17 13:27:40 +01:00
// Return a list of jobs
func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
filter := &model.JobFilter{}
2022-02-22 14:07:49 +01:00
page := &model.PageRequest{ItemsPerPage: -1, Page: 1}
order := &model.OrderByInput{Field: "startTime", Order: model.SortDirectionEnumDesc}
2022-01-17 13:27:40 +01:00
for key, vals := range r.URL.Query() {
switch key {
case "state":
for _, s := range vals {
state := schema.JobState(s)
if !state.Valid() {
http.Error(rw, "invalid query parameter value: state", http.StatusBadRequest)
filter.State = append(filter.State, state)
case "cluster":
filter.Cluster = &model.StringInput{Eq: &vals[0]}
case "start-time":
st := strings.Split(vals[0], "-")
if len(st) != 2 {
http.Error(rw, "invalid query parameter value: startTime", http.StatusBadRequest)
from, err := strconv.ParseInt(st[0], 10, 64)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
to, err := strconv.ParseInt(st[1], 10, 64)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
ufrom, uto := time.Unix(from, 0), time.Unix(to, 0)
filter.StartTime = &model.TimeRange{From: &ufrom, To: &uto}
case "page":
x, err := strconv.Atoi(vals[0])
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
page.Page = x
case "items-per-page":
x, err := strconv.Atoi(vals[0])
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
page.ItemsPerPage = x
2022-01-17 13:27:40 +01:00
http.Error(rw, "invalid query parameter: "+key, http.StatusBadRequest)
jobs, err := api.JobRepository.QueryJobs(r.Context(), []*model.JobFilter{filter}, page, order)
2022-01-17 13:27:40 +01:00
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
results := make([]*schema.JobMeta, 0, len(jobs))
for _, job := range jobs {
res := &schema.JobMeta{
ID: &job.ID,
BaseJob: job.BaseJob,
StartTime: job.StartTime.Unix(),
2022-01-17 13:27:40 +01:00
res.Tags, err = api.JobRepository.GetTags(&job.ID)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
2022-01-17 13:27:40 +01:00
if res.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful {
res.Statistics, err = metricdata.GetStatistics(job)
if err != nil {
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
results = append(results, res)
log.Debugf("/api/jobs: %d jobs returned", len(results))
bw := bufio.NewWriter(rw)
defer bw.Flush()
if err := json.NewEncoder(bw).Encode(map[string]interface{}{
"jobs": results,
}); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
2022-01-17 13:27:40 +01:00
// Add a tag to a job
func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
iid, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
job, err := api.JobRepository.FindById(iid)
if err != nil {
http.Error(rw, err.Error(), http.StatusNotFound)
job.Tags, err = api.JobRepository.GetTags(&job.ID)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
var req TagJobApiRequest
if err := decode(r.Body, &req); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
for _, tag := range req {
2022-02-08 12:49:28 +01:00
tagId, err := api.JobRepository.AddTagOrCreate(job.ID, tag.Type, tag.Name)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
2021-12-17 15:49:22 +01:00
job.Tags = append(job.Tags, &schema.Tag{
ID: tagId,
Type: tag.Type,
Name: tag.Name,
rw.Header().Add("Content-Type", "application/json")
2022-01-17 13:27:40 +01:00
// A new job started. The body should be in the `meta.json` format, but some fields required
// there are optional here (e.g. `jobState` defaults to "running").
func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
2022-02-14 17:40:47 +01:00
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
2021-12-17 15:49:22 +01:00
req := schema.JobMeta{BaseJob: schema.JobDefaults}
if err := decode(r.Body, &req); err != nil {
2022-02-14 17:40:47 +01:00
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
2022-02-25 10:50:43 +01:00
if req.State == "" {
req.State = schema.JobStateRunning
2022-02-25 10:50:43 +01:00
if err := repository.SanityChecks(&req.BaseJob); err != nil {
handleError(err, http.StatusBadRequest, rw)
// Check if combination of (job_id, cluster_id, start_time) already exists:
job, err := api.JobRepository.Find(&req.JobID, &req.Cluster, &req.StartTime)
2022-02-07 14:56:46 +01:00
if err != nil && err != sql.ErrNoRows {
2022-02-14 17:40:47 +01:00
handleError(fmt.Errorf("checking for duplicate failed: %w", err), http.StatusInternalServerError, rw)
2022-02-07 14:56:46 +01:00
if err != sql.ErrNoRows {
2022-02-14 17:40:47 +01:00
handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID), http.StatusUnprocessableEntity, rw)
req.RawResources, err = json.Marshal(req.Resources)
2021-12-17 15:49:22 +01:00
if err != nil {
2022-02-14 17:40:47 +01:00
handleError(fmt.Errorf("basically impossible: %w", err), http.StatusBadRequest, rw)
2022-01-27 10:35:26 +01:00
2021-12-17 15:49:22 +01:00
2022-02-08 12:49:28 +01:00
id, err := api.JobRepository.Start(&req)
if err != nil {
2022-02-14 17:40:47 +01:00
handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw)
2022-02-08 12:49:28 +01:00
for _, tag := range req.Tags {
if _, err := api.JobRepository.AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
2022-02-14 17:40:47 +01:00
handleError(fmt.Errorf("adding tag to new job %d failed: %w", id, err), http.StatusInternalServerError, rw)
2022-02-08 12:49:28 +01:00
2022-01-27 10:35:26 +01:00
log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime)
rw.Header().Add("Content-Type", "application/json")
2022-02-06 09:48:31 +01:00
DBID: id,
2022-01-17 13:27:40 +01:00
// A job has stopped and should be archived.
func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
2022-02-14 17:31:51 +01:00
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
// Parse request body
req := StopJobApiRequest{}
if err := decode(r.Body, &req); err != nil {
2022-02-14 17:31:51 +01:00
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
// Fetch job (that will be stopped) from db
id, ok := mux.Vars(r)["id"]
2022-02-07 07:09:47 +01:00
var job *schema.Job
var err error
if ok {
2022-02-08 11:10:05 +01:00
id, e := strconv.ParseInt(id, 10, 64)
if e != nil {
2022-02-14 17:31:51 +01:00
handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw)
2022-02-08 11:10:05 +01:00
2022-02-07 09:57:06 +01:00
job, err = api.JobRepository.FindById(id)
} else {
if req.JobId == nil {
handleError(errors.New("the field 'jobId' is required"), http.StatusBadRequest, rw)
job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
2021-12-17 15:49:22 +01:00
if err != nil {
2022-02-14 17:31:51 +01:00
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
// Sanity checks
2021-12-17 15:49:22 +01:00
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
2022-02-14 17:31:51 +01:00
handleError(errors.New("stopTime must be larger than startTime and only running jobs can be stopped"), http.StatusBadRequest, rw)
2021-12-17 15:49:22 +01:00
if req.State != "" && !req.State.Valid() {
2022-02-14 17:31:51 +01:00
handleError(fmt.Errorf("invalid job state: %#v", req.State), http.StatusBadRequest, rw)
2021-12-17 15:49:22 +01:00
} else {
req.State = schema.JobStateCompleted
// Mark job as stopped in the database (update state and duration)
job.Duration = int32(req.StopTime - job.StartTime.Unix())
job.State = req.State
if err := api.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
handleError(fmt.Errorf("marking job as stopped failed: %w", err), http.StatusInternalServerError, rw)
2022-01-27 10:35:26 +01:00
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime)
// Send a response (with status OK). This means that erros that happen from here on forward
// can *NOT* be communicated to the client. If reading from a MetricDataRepository or
// writing to the filesystem fails, the client will not know.
rw.Header().Add("Content-Type", "application/json")
// Monitoring is disabled...
if job.MonitoringStatus == schema.MonitoringStatusDisabled {
2022-02-15 13:31:54 +01:00
// We need to start a new goroutine as this functions needs to return
// for the response to be flushed to the client.
api.OngoingArchivings.Add(1) // So that a shutdown does not interrupt this goroutine.
go func() {
defer api.OngoingArchivings.Done()
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and create meta.json/data.json files
jobMeta, err := metricdata.ArchiveJob(job, context.Background())
if err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
api.JobRepository.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
// Update the jobs database entry one last time:
if err := api.JobRepository.Archive(job.ID, schema.MonitoringStatusArchivingSuccessful, jobMeta.Statistics); err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
2022-02-15 13:31:54 +01:00
log.Printf("archiving job (dbid: %d) successful", job.ID)
2022-01-07 09:39:00 +01:00
2022-02-25 11:04:34 +01:00
func (api *RestApi) importJob(rw http.ResponseWriter, r *http.Request) {
if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
handleError(fmt.Errorf("missing role: %#v", auth.RoleApi), http.StatusForbidden, rw)
var body struct {
Meta *schema.JobMeta `json:"meta"`
Data *schema.JobData `json:"data"`
if err := decode(r.Body, &body); err != nil {
handleError(fmt.Errorf("import failed: %s", err.Error()), http.StatusBadRequest, rw)
if err := api.JobRepository.ImportJob(body.Meta, body.Data); err != nil {
handleError(fmt.Errorf("import failed: %s", err.Error()), http.StatusUnprocessableEntity, rw)
rw.Write([]byte(`{ "status": "OK" }`))
2022-01-27 09:29:53 +01:00
func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
metrics := r.URL.Query()["metric"]
var scopes []schema.MetricScope
for _, scope := range r.URL.Query()["scope"] {
var s schema.MetricScope
if err := s.UnmarshalGQL(scope); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
scopes = append(scopes, s)
rw.Header().Add("Content-Type", "application/json")
type Respone struct {
Data *struct {
JobMetrics []*model.JobMetricWithName `json:"jobMetrics"`
} `json:"data"`
Error *struct {
Message string `json:"message"`
} `json:"error"`
data, err := api.Resolver.Query().JobMetrics(r.Context(), id, metrics, scopes)
if err != nil {
2022-01-27 10:35:26 +01:00
2022-01-27 09:29:53 +01:00
Error: &struct {
Message string "json:\"message\""
}{Message: err.Error()},
2022-01-27 10:35:26 +01:00
2022-01-27 09:29:53 +01:00
2022-01-27 10:35:26 +01:00
2022-01-27 09:29:53 +01:00
Data: &struct {
JobMetrics []*model.JobMetricWithName "json:\"jobMetrics\""
}{JobMetrics: data},
2022-01-27 10:35:26 +01:00
2022-01-27 09:29:53 +01:00
2022-01-07 09:39:00 +01:00
func (api *RestApi) putMachineState(rw http.ResponseWriter, r *http.Request) {
if api.MachineStateDir == "" {
http.Error(rw, "not enabled", http.StatusNotFound)
vars := mux.Vars(r)
cluster := vars["cluster"]
host := vars["host"]
dir := filepath.Join(api.MachineStateDir, cluster)
if err := os.MkdirAll(dir, 0755); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
filename := filepath.Join(dir, fmt.Sprintf("%s.json", host))
f, err := os.Create(filename)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
defer f.Close()
if _, err := io.Copy(f, r.Body); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
func (api *RestApi) getMachineState(rw http.ResponseWriter, r *http.Request) {
if api.MachineStateDir == "" {
http.Error(rw, "not enabled", http.StatusNotFound)
vars := mux.Vars(r)
filename := filepath.Join(api.MachineStateDir, vars["cluster"], fmt.Sprintf("%s.json", vars["host"]))
// Sets the content-type and 'Last-Modified' Header and so on automatically
http.ServeFile(rw, r, filename)