From 1d8e7e072f485e38986c9062c75c3732d13853c3 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 5 Jun 2025 13:23:36 +0200 Subject: [PATCH] Refactor rest api --- internal/api/cluster.go | 70 +++ internal/api/job.go | 987 +++++++++++++++++++++++++++++++++ internal/api/node.go | 30 + internal/api/rest.go | 1172 +-------------------------------------- internal/api/user.go | 159 ++++++ 5 files changed, 1249 insertions(+), 1169 deletions(-) create mode 100644 internal/api/cluster.go create mode 100644 internal/api/job.go create mode 100644 internal/api/node.go create mode 100644 internal/api/user.go diff --git a/internal/api/cluster.go b/internal/api/cluster.go new file mode 100644 index 0000000..0529480 --- /dev/null +++ b/internal/api/cluster.go @@ -0,0 +1,70 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package api + +import ( + "bufio" + "encoding/json" + "fmt" + "net/http" + + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +// GetClustersApiResponse model +type GetClustersApiResponse struct { + Clusters []*schema.Cluster `json:"clusters"` // Array of clusters +} + +// getClusters godoc +// @summary Lists all cluster configs +// @tags Cluster query +// @description Get a list of all cluster configs. Specific cluster can be requested using query parameter. +// @produce json +// @param cluster query string false "Job Cluster" +// @success 200 {object} api.GetClustersApiResponse "Array of clusters" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 403 {object} api.ErrorResponse "Forbidden" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /api/clusters/ [get] +func (api *RestApi) getClusters(rw http.ResponseWriter, r *http.Request) { + if user := repository.GetUserFromContext(r.Context()); user != nil && + !user.HasRole(schema.RoleApi) { + + handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw) + return + } + + rw.Header().Add("Content-Type", "application/json") + bw := bufio.NewWriter(rw) + defer bw.Flush() + + var clusters []*schema.Cluster + + if r.URL.Query().Has("cluster") { + name := r.URL.Query().Get("cluster") + cluster := archive.GetCluster(name) + if cluster == nil { + handleError(fmt.Errorf("unknown cluster: %s", name), http.StatusBadRequest, rw) + return + } + clusters = append(clusters, cluster) + } else { + clusters = archive.Clusters + } + + payload := GetClustersApiResponse{ + Clusters: clusters, + } + + if err := json.NewEncoder(bw).Encode(payload); err != nil { + handleError(err, http.StatusInternalServerError, rw) + return + } +} diff --git a/internal/api/job.go b/internal/api/job.go new file mode 100644 index 0000000..1af6c38 --- /dev/null +++ b/internal/api/job.go @@ -0,0 +1,987 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package api + +import ( + "bufio" + "database/sql" + "encoding/json" + "errors" + "fmt" + "net/http" + "strconv" + "strings" + "sync" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/archiver" + "github.com/ClusterCockpit/cc-backend/internal/graph" + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/internal/importer" + "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" + "github.com/gorilla/mux" +) + +// DefaultApiResponse model +type DefaultJobApiResponse struct { + Message string `json:"msg"` +} + +// StopJobApiRequest model +type StopJobApiRequest struct { + JobId *int64 `json:"jobId" example:"123000"` + Cluster *string `json:"cluster" example:"fritz"` + StartTime *int64 `json:"startTime" example:"1649723812"` + State schema.JobState `json:"jobState" validate:"required" example:"completed"` + StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"` +} + +// DeleteJobApiRequest model +type DeleteJobApiRequest struct { + JobId *int64 `json:"jobId" validate:"required" example:"123000"` // Cluster Job ID of job + Cluster *string `json:"cluster" example:"fritz"` // Cluster of job + StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch +} + +// GetJobsApiResponse model +type GetJobsApiResponse struct { + Jobs []*schema.Job `json:"jobs"` // Array of jobs + Items int `json:"items"` // Number of jobs returned + Page int `json:"page"` // Page id returned +} + +// ApiTag model +type ApiTag struct { + // Tag Type + Type string `json:"type" example:"Debug"` + Name string `json:"name" example:"Testjob"` // Tag Name + Scope string `json:"scope" example:"global"` // Tag Scope for Frontend Display +} + +// ApiMeta model +type EditMetaRequest struct { + Key string `json:"key" example:"jobScript"` + Value string `json:"value" example:"bash script"` +} + +type TagJobApiRequest []*ApiTag + +type GetJobApiRequest []string + +type GetJobApiResponse struct { + Meta *schema.Job + Data []*JobMetricWithName +} + +type GetCompleteJobApiResponse struct { + Meta *schema.Job + Data schema.JobData +} + +type JobMetricWithName struct { + Metric *schema.JobMetric `json:"metric"` + Name string `json:"name"` + Scope schema.MetricScope `json:"scope"` +} + +// getJobs godoc +// @summary Lists all jobs +// @tags Job query +// @description Get a list of all jobs. Filters can be applied using query parameters. +// @description Number of results can be limited by page. Results are sorted by descending startTime. +// @produce json +// @param state query string false "Job State" Enums(running, completed, failed, cancelled, stopped, timeout) +// @param cluster query string false "Job Cluster" +// @param start-time query string false "Syntax: '$from-$to', as unix epoch timestamps in seconds" +// @param items-per-page query int false "Items per page (Default: 25)" +// @param page query int false "Page Number (Default: 1)" +// @param with-metadata query bool false "Include metadata (e.g. jobScript) in response" +// @success 200 {object} api.GetJobsApiResponse "Job array and page info" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 403 {object} api.ErrorResponse "Forbidden" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /api/jobs/ [get] +func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { + withMetadata := false + filter := &model.JobFilter{} + page := &model.PageRequest{ItemsPerPage: 25, Page: 1} + order := &model.OrderByInput{Field: "startTime", Type: "col", Order: model.SortDirectionEnumDesc} + + for key, vals := range r.URL.Query() { + switch key { + case "state": + for _, s := range vals { + state := schema.JobState(s) + if !state.Valid() { + handleError(fmt.Errorf("invalid query parameter value: state"), + http.StatusBadRequest, rw) + return + } + filter.State = append(filter.State, state) + } + case "cluster": + filter.Cluster = &model.StringInput{Eq: &vals[0]} + case "start-time": + st := strings.Split(vals[0], "-") + if len(st) != 2 { + handleError(fmt.Errorf("invalid query parameter value: startTime"), + http.StatusBadRequest, rw) + return + } + from, err := strconv.ParseInt(st[0], 10, 64) + if err != nil { + handleError(err, http.StatusBadRequest, rw) + return + } + to, err := strconv.ParseInt(st[1], 10, 64) + if err != nil { + handleError(err, http.StatusBadRequest, rw) + return + } + ufrom, uto := time.Unix(from, 0), time.Unix(to, 0) + filter.StartTime = &schema.TimeRange{From: &ufrom, To: &uto} + case "page": + x, err := strconv.Atoi(vals[0]) + if err != nil { + handleError(err, http.StatusBadRequest, rw) + return + } + page.Page = x + case "items-per-page": + x, err := strconv.Atoi(vals[0]) + if err != nil { + handleError(err, http.StatusBadRequest, rw) + return + } + page.ItemsPerPage = x + case "with-metadata": + withMetadata = true + default: + handleError(fmt.Errorf("invalid query parameter: %s", key), + http.StatusBadRequest, rw) + return + } + } + + jobs, err := api.JobRepository.QueryJobs(r.Context(), []*model.JobFilter{filter}, page, order) + if err != nil { + handleError(err, http.StatusInternalServerError, rw) + return + } + + results := make([]*schema.Job, 0, len(jobs)) + for _, job := range jobs { + if withMetadata { + if _, err = api.JobRepository.FetchMetadata(job); err != nil { + handleError(err, http.StatusInternalServerError, rw) + return + } + } + + job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID) + if err != nil { + handleError(err, http.StatusInternalServerError, rw) + return + } + + if job.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful { + job.Statistics, err = archive.GetStatistics(job) + if err != nil { + handleError(err, http.StatusInternalServerError, rw) + return + } + } + + results = append(results, job) + } + + log.Debugf("/api/jobs: %d jobs returned", len(results)) + rw.Header().Add("Content-Type", "application/json") + bw := bufio.NewWriter(rw) + defer bw.Flush() + + payload := GetJobsApiResponse{ + Jobs: results, + Items: page.ItemsPerPage, + Page: page.Page, + } + + if err := json.NewEncoder(bw).Encode(payload); err != nil { + handleError(err, http.StatusInternalServerError, rw) + return + } +} + +// getCompleteJobById godoc +// @summary Get job meta and optional all metric data +// @tags Job query +// @description Job to get is specified by database ID +// @description Returns full job resource information according to 'JobMeta' scheme and all metrics according to 'JobData'. +// @produce json +// @param id path int true "Database ID of Job" +// @param all-metrics query bool false "Include all available metrics" +// @success 200 {object} api.GetJobApiResponse "Job resource" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 403 {object} api.ErrorResponse "Forbidden" +// @failure 404 {object} api.ErrorResponse "Resource not found" +// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /api/jobs/{id} [get] +func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) { + // Fetch job from db + id, ok := mux.Vars(r)["id"] + var job *schema.Job + var err error + if ok { + id, e := strconv.ParseInt(id, 10, 64) + if e != nil { + handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw) + return + } + + job, err = api.JobRepository.FindById(r.Context(), id) // Get Job from Repo by ID + } else { + handleError(fmt.Errorf("the parameter 'id' is required"), http.StatusBadRequest, rw) + return + } + if err != nil { + handleError(fmt.Errorf("finding job with db id %s failed: %w", id, err), http.StatusUnprocessableEntity, rw) + return + } + + job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID) + if err != nil { + handleError(err, http.StatusInternalServerError, rw) + return + + } + if _, err = api.JobRepository.FetchMetadata(job); err != nil { + + handleError(err, http.StatusInternalServerError, rw) + return + } + + var scopes []schema.MetricScope + + if job.NumNodes == 1 { + scopes = []schema.MetricScope{"core"} + } else { + scopes = []schema.MetricScope{"node"} + } + + var data schema.JobData + + metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + resolution := 0 + + for _, mc := range metricConfigs { + resolution = max(resolution, mc.Timestep) + } + + if r.URL.Query().Get("all-metrics") == "true" { + data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context(), resolution) + if err != nil { + log.Warnf("REST: error while loading all-metrics job data for JobID %d on %s", job.JobID, job.Cluster) + return + } + } + + log.Debugf("/api/job/%s: get job %d", id, job.JobID) + rw.Header().Add("Content-Type", "application/json") + bw := bufio.NewWriter(rw) + defer bw.Flush() + + payload := GetCompleteJobApiResponse{ + Meta: job, + Data: data, + } + + if err := json.NewEncoder(bw).Encode(payload); err != nil { + handleError(err, http.StatusInternalServerError, rw) + return + } +} + +// getJobById godoc +// @summary Get job meta and configurable metric data +// @tags Job query +// @description Job to get is specified by database ID +// @description Returns full job resource information according to 'JobMeta' scheme and all metrics according to 'JobData'. +// @accept json +// @produce json +// @param id path int true "Database ID of Job" +// @param request body api.GetJobApiRequest true "Array of metric names" +// @success 200 {object} api.GetJobApiResponse "Job resource" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 403 {object} api.ErrorResponse "Forbidden" +// @failure 404 {object} api.ErrorResponse "Resource not found" +// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /api/jobs/{id} [post] +func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { + // Fetch job from db + id, ok := mux.Vars(r)["id"] + var job *schema.Job + var err error + if ok { + id, e := strconv.ParseInt(id, 10, 64) + if e != nil { + handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw) + return + } + + job, err = api.JobRepository.FindById(r.Context(), id) + } else { + handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw) + return + } + if err != nil { + handleError(fmt.Errorf("finding job with db id %s failed: %w", id, err), http.StatusUnprocessableEntity, rw) + return + } + + job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID) + if err != nil { + handleError(err, http.StatusInternalServerError, rw) + return + + } + if _, err = api.JobRepository.FetchMetadata(job); err != nil { + + handleError(err, http.StatusInternalServerError, rw) + return + } + + var metrics GetJobApiRequest + if err = decode(r.Body, &metrics); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + var scopes []schema.MetricScope + + if job.NumNodes == 1 { + scopes = []schema.MetricScope{"core"} + } else { + scopes = []schema.MetricScope{"node"} + } + + metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + resolution := 0 + + for _, mc := range metricConfigs { + resolution = max(resolution, mc.Timestep) + } + + data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context(), resolution) + if err != nil { + log.Warnf("REST: error while loading job data for JobID %d on %s", job.JobID, job.Cluster) + return + } + + res := []*JobMetricWithName{} + for name, md := range data { + for scope, metric := range md { + res = append(res, &JobMetricWithName{ + Name: name, + Scope: scope, + Metric: metric, + }) + } + } + + log.Debugf("/api/job/%s: get job %d", id, job.JobID) + rw.Header().Add("Content-Type", "application/json") + bw := bufio.NewWriter(rw) + defer bw.Flush() + + payload := GetJobApiResponse{ + Meta: job, + Data: res, + } + + if err := json.NewEncoder(bw).Encode(payload); err != nil { + handleError(err, http.StatusInternalServerError, rw) + return + } +} + +// editMeta godoc +// @summary Edit meta-data json +// @tags Job add and modify +// @description Edit key value pairs in job metadata json +// @description If a key already exists its content will be overwritten +// @accept json +// @produce json +// @param id path int true "Job Database ID" +// @param request body api.EditMetaRequest true "Kay value pair to add" +// @success 200 {object} schema.Job "Updated job resource" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 404 {object} api.ErrorResponse "Job does not exist" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /api/jobs/edit_meta/{id} [post] +func (api *RestApi) editMeta(rw http.ResponseWriter, r *http.Request) { + id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + job, err := api.JobRepository.FindById(r.Context(), id) + if err != nil { + http.Error(rw, err.Error(), http.StatusNotFound) + return + } + + var req EditMetaRequest + if err := decode(r.Body, &req); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + if err := api.JobRepository.UpdateMetadata(job, req.Key, req.Value); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusOK) + json.NewEncoder(rw).Encode(job) +} + +// tagJob godoc +// @summary Adds one or more tags to a job +// @tags Job add and modify +// @description Adds tag(s) to a job specified by DB ID. Name and Type of Tag(s) can be chosen freely. +// @description Tag Scope for frontend visibility will default to "global" if none entered, other options: "admin" or specific username. +// @description If tagged job is already finished: Tag will be written directly to respective archive files. +// @accept json +// @produce json +// @param id path int true "Job Database ID" +// @param request body api.TagJobApiRequest true "Array of tag-objects to add" +// @success 200 {object} schema.Job "Updated job resource" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 404 {object} api.ErrorResponse "Job or tag does not exist" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /api/jobs/tag_job/{id} [post] +func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) { + id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + job, err := api.JobRepository.FindById(r.Context(), id) + if err != nil { + http.Error(rw, err.Error(), http.StatusNotFound) + return + } + + job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + var req TagJobApiRequest + if err := decode(r.Body, &req); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + for _, tag := range req { + tagId, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), *job.ID, tag.Type, tag.Name, tag.Scope) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + job.Tags = append(job.Tags, &schema.Tag{ + ID: tagId, + Type: tag.Type, + Name: tag.Name, + Scope: tag.Scope, + }) + } + + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusOK) + json.NewEncoder(rw).Encode(job) +} + +// removeTagJob godoc +// @summary Removes one or more tags from a job +// @tags Job add and modify +// @description Removes tag(s) from a job specified by DB ID. Name and Type of Tag(s) must match. +// @description Tag Scope is required for matching, options: "global", "admin". Private tags can not be deleted via API. +// @description If tagged job is already finished: Tag will be removed from respective archive files. +// @accept json +// @produce json +// @param id path int true "Job Database ID" +// @param request body api.TagJobApiRequest true "Array of tag-objects to remove" +// @success 200 {object} schema.Job "Updated job resource" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 404 {object} api.ErrorResponse "Job or tag does not exist" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /jobs/tag_job/{id} [delete] +func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) { + id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + job, err := api.JobRepository.FindById(r.Context(), id) + if err != nil { + http.Error(rw, err.Error(), http.StatusNotFound) + return + } + + job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + var req TagJobApiRequest + if err := decode(r.Body, &req); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + for _, rtag := range req { + // Only Global and Admin Tags + if rtag.Scope != "global" && rtag.Scope != "admin" { + log.Warnf("Cannot delete private tag for job %d: Skip", job.JobID) + continue + } + + remainingTags, err := api.JobRepository.RemoveJobTagByRequest(repository.GetUserFromContext(r.Context()), *job.ID, rtag.Type, rtag.Name, rtag.Scope) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + job.Tags = remainingTags + } + + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusOK) + json.NewEncoder(rw).Encode(job) +} + +// removeTags godoc +// @summary Removes all tags and job-relations for type:name tuple +// @tags Tag remove +// @description Removes tags by type and name. Name and Type of Tag(s) must match. +// @description Tag Scope is required for matching, options: "global", "admin". Private tags can not be deleted via API. +// @description Tag wills be removed from respective archive files. +// @accept json +// @produce plain +// @param request body api.TagJobApiRequest true "Array of tag-objects to remove" +// @success 200 {string} string "Success Response" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 404 {object} api.ErrorResponse "Job or tag does not exist" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /tags/ [delete] +func (api *RestApi) removeTags(rw http.ResponseWriter, r *http.Request) { + var req TagJobApiRequest + if err := decode(r.Body, &req); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + targetCount := len(req) + currentCount := 0 + for _, rtag := range req { + // Only Global and Admin Tags + if rtag.Scope != "global" && rtag.Scope != "admin" { + log.Warn("Cannot delete private tag: Skip") + continue + } + + err := api.JobRepository.RemoveTagByRequest(rtag.Type, rtag.Name, rtag.Scope) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } else { + currentCount++ + } + } + + rw.WriteHeader(http.StatusOK) + fmt.Fprintf(rw, "Deleted Tags from DB: %d successfull of %d requested\n", currentCount, targetCount) +} + +// startJob godoc +// @summary Adds a new job as "running" +// @tags Job add and modify +// @description Job specified in request body will be saved to database as "running" with new DB ID. +// @description Job specifications follow the 'JobMeta' scheme, API will fail to execute if requirements are not met. +// @accept json +// @produce json +// @param request body schema.JobMeta true "Job to add" +// @success 201 {object} api.DefaultJobApiResponse "Job added successfully" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 403 {object} api.ErrorResponse "Forbidden" +// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: The combination of jobId, clusterId and startTime does already exist" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /api/jobs/start_job/ [post] +func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { + req := schema.Job{ + Exclusive: 1, + MonitoringStatus: schema.MonitoringStatusRunningOrArchiving, + } + if err := decode(r.Body, &req); err != nil { + handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) + return + } + + log.Printf("REST: %s\n", req.GoString()) + req.State = schema.JobStateRunning + + if err := importer.SanityChecks(&req); err != nil { + handleError(err, http.StatusBadRequest, rw) + return + } + + // aquire lock to avoid race condition between API calls + var unlockOnce sync.Once + api.RepositoryMutex.Lock() + defer unlockOnce.Do(api.RepositoryMutex.Unlock) + + // Check if combination of (job_id, cluster_id, start_time) already exists: + jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil) + if err != nil && err != sql.ErrNoRows { + handleError(fmt.Errorf("checking for duplicate failed: %w", err), http.StatusInternalServerError, rw) + return + } else if err == nil { + for _, job := range jobs { + if (req.StartTime - job.StartTime) < 86400 { + handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID), http.StatusUnprocessableEntity, rw) + return + } + } + } + + id, err := api.JobRepository.Start(&req) + if err != nil { + handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw) + return + } + // unlock here, adding Tags can be async + unlockOnce.Do(api.RepositoryMutex.Unlock) + + for _, tag := range req.Tags { + if _, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), id, tag.Type, tag.Name, tag.Scope); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + handleError(fmt.Errorf("adding tag to new job %d failed: %w", id, err), http.StatusInternalServerError, rw) + return + } + } + + log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime) + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusCreated) + json.NewEncoder(rw).Encode(DefaultJobApiResponse{ + Message: "success", + }) +} + +// stopJobByRequest godoc +// @summary Marks job as completed and triggers archiving +// @tags Job add and modify +// @description Job to stop is specified by request body. All fields are required in this case. +// @description Returns full job resource information according to 'JobMeta' scheme. +// @produce json +// @param request body api.StopJobApiRequest true "All fields required" +// @success 200 {object} schema.JobMeta "Success message" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 403 {object} api.ErrorResponse "Forbidden" +// @failure 404 {object} api.ErrorResponse "Resource not found" +// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: job has already been stopped" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /api/jobs/stop_job/ [post] +func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) { + // Parse request body + req := StopJobApiRequest{} + if err := decode(r.Body, &req); err != nil { + handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) + return + } + + // Fetch job (that will be stopped) from db + var job *schema.Job + var err error + if req.JobId == nil { + handleError(errors.New("the field 'jobId' is required"), http.StatusBadRequest, rw) + return + } + + // log.Printf("loading db job for stopJobByRequest... : stopJobApiRequest=%v", req) + job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime) + if err != nil { + job, err = api.JobRepository.FindCached(req.JobId, req.Cluster, req.StartTime) + // FIXME: Previous error is hidden + if err != nil { + handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw) + return + } + } + + api.checkAndHandleStopJob(rw, job, req) +} + +// deleteJobById godoc +// @summary Remove a job from the sql database +// @tags Job remove +// @description Job to remove is specified by database ID. This will not remove the job from the job archive. +// @produce json +// @param id path int true "Database ID of Job" +// @success 200 {object} api.DefaultJobApiResponse "Success message" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 403 {object} api.ErrorResponse "Forbidden" +// @failure 404 {object} api.ErrorResponse "Resource not found" +// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /api/jobs/delete_job/{id} [delete] +func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) { + // Fetch job (that will be stopped) from db + id, ok := mux.Vars(r)["id"] + var err error + if ok { + id, e := strconv.ParseInt(id, 10, 64) + if e != nil { + handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw) + return + } + + err = api.JobRepository.DeleteJobById(id) + } else { + handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw) + return + } + if err != nil { + handleError(fmt.Errorf("deleting job failed: %w", err), http.StatusUnprocessableEntity, rw) + return + } + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusOK) + json.NewEncoder(rw).Encode(DefaultJobApiResponse{ + Message: fmt.Sprintf("Successfully deleted job %s", id), + }) +} + +// deleteJobByRequest godoc +// @summary Remove a job from the sql database +// @tags Job remove +// @description Job to delete is specified by request body. All fields are required in this case. +// @accept json +// @produce json +// @param request body api.DeleteJobApiRequest true "All fields required" +// @success 200 {object} api.DefaultJobApiResponse "Success message" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 403 {object} api.ErrorResponse "Forbidden" +// @failure 404 {object} api.ErrorResponse "Resource not found" +// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /api/jobs/delete_job/ [delete] +func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) { + // Parse request body + req := DeleteJobApiRequest{} + if err := decode(r.Body, &req); err != nil { + handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) + return + } + + // Fetch job (that will be deleted) from db + var job *schema.Job + var err error + if req.JobId == nil { + handleError(errors.New("the field 'jobId' is required"), http.StatusBadRequest, rw) + return + } + + job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime) + if err != nil { + handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw) + return + } + + err = api.JobRepository.DeleteJobById(*job.ID) + if err != nil { + handleError(fmt.Errorf("deleting job failed: %w", err), http.StatusUnprocessableEntity, rw) + return + } + + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusOK) + json.NewEncoder(rw).Encode(DefaultJobApiResponse{ + Message: fmt.Sprintf("Successfully deleted job %d", job.ID), + }) +} + +// deleteJobBefore godoc +// @summary Remove a job from the sql database +// @tags Job remove +// @description Remove all jobs with start time before timestamp. The jobs will not be removed from the job archive. +// @produce json +// @param ts path int true "Unix epoch timestamp" +// @success 200 {object} api.DefaultJobApiResponse "Success message" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 403 {object} api.ErrorResponse "Forbidden" +// @failure 404 {object} api.ErrorResponse "Resource not found" +// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /api/jobs/delete_job_before/{ts} [delete] +func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) { + var cnt int + // Fetch job (that will be stopped) from db + id, ok := mux.Vars(r)["ts"] + var err error + if ok { + ts, e := strconv.ParseInt(id, 10, 64) + if e != nil { + handleError(fmt.Errorf("integer expected in path for ts: %w", e), http.StatusBadRequest, rw) + return + } + + cnt, err = api.JobRepository.DeleteJobsBefore(ts) + } else { + handleError(errors.New("the parameter 'ts' is required"), http.StatusBadRequest, rw) + return + } + if err != nil { + handleError(fmt.Errorf("deleting jobs failed: %w", err), http.StatusUnprocessableEntity, rw) + return + } + + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusOK) + json.NewEncoder(rw).Encode(DefaultJobApiResponse{ + Message: fmt.Sprintf("Successfully deleted %d jobs", cnt), + }) +} + +func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobApiRequest) { + // Sanity checks + if job.State != schema.JobStateRunning { + handleError(fmt.Errorf("jobId %d (id %d) on %s : job has already been stopped (state is: %s)", job.JobID, job.ID, job.Cluster, job.State), http.StatusUnprocessableEntity, rw) + return + } + + if job == nil || job.StartTime > req.StopTime { + handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime), http.StatusBadRequest, rw) + return + } + + if req.State != "" && !req.State.Valid() { + handleError(fmt.Errorf("jobId %d (id %d) on %s : invalid requested job state: %#v", job.JobID, job.ID, job.Cluster, req.State), http.StatusBadRequest, rw) + return + } else if req.State == "" { + req.State = schema.JobStateCompleted + } + + // Mark job as stopped in the database (update state and duration) + job.Duration = int32(req.StopTime - job.StartTime) + job.State = req.State + api.JobRepository.Mutex.Lock() + if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { + if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { + api.JobRepository.Mutex.Unlock() + handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw) + return + } + } + api.JobRepository.Mutex.Unlock() + + log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) + + // Send a response (with status OK). This means that erros that happen from here on forward + // can *NOT* be communicated to the client. If reading from a MetricDataRepository or + // writing to the filesystem fails, the client will not know. + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusOK) + json.NewEncoder(rw).Encode(job) + + // Monitoring is disabled... + if job.MonitoringStatus == schema.MonitoringStatusDisabled { + return + } + + // Trigger async archiving + archiver.TriggerArchiving(job) +} + +func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + metrics := r.URL.Query()["metric"] + var scopes []schema.MetricScope + for _, scope := range r.URL.Query()["scope"] { + var s schema.MetricScope + if err := s.UnmarshalGQL(scope); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + scopes = append(scopes, s) + } + + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusOK) + + type Respone struct { + Data *struct { + JobMetrics []*model.JobMetricWithName `json:"jobMetrics"` + } `json:"data"` + Error *struct { + Message string `json:"message"` + } `json:"error"` + } + + resolver := graph.GetResolverInstance() + data, err := resolver.Query().JobMetrics(r.Context(), id, metrics, scopes, nil) + if err != nil { + json.NewEncoder(rw).Encode(Respone{ + Error: &struct { + Message string "json:\"message\"" + }{Message: err.Error()}, + }) + return + } + + json.NewEncoder(rw).Encode(Respone{ + Data: &struct { + JobMetrics []*model.JobMetricWithName "json:\"jobMetrics\"" + }{JobMetrics: data}, + }) +} diff --git a/internal/api/node.go b/internal/api/node.go new file mode 100644 index 0000000..ab34b16 --- /dev/null +++ b/internal/api/node.go @@ -0,0 +1,30 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package api + +import ( + "fmt" + "net/http" +) + +type Node struct { + Name string `json:"hostname"` + States []string `json:"states"` +} + +// updateNodeStatesRequest model +type UpdateNodeStatesRequest struct { + Nodes []Node `json:"nodes"` + Cluster string `json:"cluster" example:"fritz"` +} + +func (api *RestApi) updateNodeStates(rw http.ResponseWriter, r *http.Request) { + // Parse request body + req := UpdateNodeStatesRequest{} + if err := decode(r.Body, &req); err != nil { + handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) + return + } +} diff --git a/internal/api/rest.go b/internal/api/rest.go index 31a5979..54472d8 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -5,30 +5,18 @@ package api import ( - "bufio" - "database/sql" "encoding/json" - "errors" "fmt" "io" "net/http" "os" "path/filepath" - "strconv" - "strings" "sync" - "time" - "github.com/ClusterCockpit/cc-backend/internal/archiver" "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/internal/graph" - "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/internal/importer" - "github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/util" - "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/gorilla/mux" @@ -73,6 +61,8 @@ func (api *RestApi) MountApiRoutes(r *mux.Router) { r.HandleFunc("/users/", api.getUsers).Methods(http.MethodGet) // Cluster List r.HandleFunc("/clusters/", api.getClusters).Methods(http.MethodGet) + // Slurm node state + r.HandleFunc("/nodestate/", api.updateNodeStates).Methods(http.MethodPost, http.MethodPut) // Job Handler r.HandleFunc("/jobs/start_job/", api.startJob).Methods(http.MethodPost, http.MethodPut) r.HandleFunc("/jobs/stop_job/", api.stopJobByRequest).Methods(http.MethodPost, http.MethodPut) @@ -120,46 +110,13 @@ func (api *RestApi) MountConfigApiRoutes(r *mux.Router) { func (api *RestApi) MountFrontendApiRoutes(r *mux.Router) { r.StrictSlash(true) - // Settings Frontrend Uses SessionAuth + // Settings Frontend Uses SessionAuth if api.Authentication != nil { r.HandleFunc("/jwt/", api.getJWT).Methods(http.MethodGet) r.HandleFunc("/configuration/", api.updateConfiguration).Methods(http.MethodPost) } } -// DefaultApiResponse model -type DefaultJobApiResponse struct { - Message string `json:"msg"` -} - -// StopJobApiRequest model -type StopJobApiRequest struct { - JobId *int64 `json:"jobId" example:"123000"` - Cluster *string `json:"cluster" example:"fritz"` - StartTime *int64 `json:"startTime" example:"1649723812"` - State schema.JobState `json:"jobState" validate:"required" example:"completed"` - StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"` -} - -// DeleteJobApiRequest model -type DeleteJobApiRequest struct { - JobId *int64 `json:"jobId" validate:"required" example:"123000"` // Cluster Job ID of job - Cluster *string `json:"cluster" example:"fritz"` // Cluster of job - StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch -} - -// GetJobsApiResponse model -type GetJobsApiResponse struct { - Jobs []*schema.Job `json:"jobs"` // Array of jobs - Items int `json:"items"` // Number of jobs returned - Page int `json:"page"` // Page id returned -} - -// GetClustersApiResponse model -type GetClustersApiResponse struct { - Clusters []*schema.Cluster `json:"clusters"` // Array of clusters -} - // ErrorResponse model type ErrorResponse struct { // Statustext of Errorcode @@ -167,48 +124,6 @@ type ErrorResponse struct { Error string `json:"error"` // Error Message } -// ApiTag model -type ApiTag struct { - // Tag Type - Type string `json:"type" example:"Debug"` - Name string `json:"name" example:"Testjob"` // Tag Name - Scope string `json:"scope" example:"global"` // Tag Scope for Frontend Display -} - -// ApiMeta model -type EditMetaRequest struct { - Key string `json:"key" example:"jobScript"` - Value string `json:"value" example:"bash script"` -} - -type TagJobApiRequest []*ApiTag - -type GetJobApiRequest []string - -type GetJobApiResponse struct { - Meta *schema.Job - Data []*JobMetricWithName -} - -type GetCompleteJobApiResponse struct { - Meta *schema.Job - Data schema.JobData -} - -type JobMetricWithName struct { - Metric *schema.JobMetric `json:"metric"` - Name string `json:"name"` - Scope schema.MetricScope `json:"scope"` -} - -type ApiReturnedUser struct { - Username string `json:"username"` - Name string `json:"name"` - Roles []string `json:"roles"` - Email string `json:"email"` - Projects []string `json:"projects"` -} - func handleError(err error, statusCode int, rw http.ResponseWriter) { log.Warnf("REST ERROR : %s", err.Error()) rw.Header().Add("Content-Type", "application/json") @@ -225,1087 +140,6 @@ func decode(r io.Reader, val any) error { return dec.Decode(val) } -// getClusters godoc -// @summary Lists all cluster configs -// @tags Cluster query -// @description Get a list of all cluster configs. Specific cluster can be requested using query parameter. -// @produce json -// @param cluster query string false "Job Cluster" -// @success 200 {object} api.GetClustersApiResponse "Array of clusters" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /api/clusters/ [get] -func (api *RestApi) getClusters(rw http.ResponseWriter, r *http.Request) { - if user := repository.GetUserFromContext(r.Context()); user != nil && - !user.HasRole(schema.RoleApi) { - - handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw) - return - } - - rw.Header().Add("Content-Type", "application/json") - bw := bufio.NewWriter(rw) - defer bw.Flush() - - var clusters []*schema.Cluster - - if r.URL.Query().Has("cluster") { - name := r.URL.Query().Get("cluster") - cluster := archive.GetCluster(name) - if cluster == nil { - handleError(fmt.Errorf("unknown cluster: %s", name), http.StatusBadRequest, rw) - return - } - clusters = append(clusters, cluster) - } else { - clusters = archive.Clusters - } - - payload := GetClustersApiResponse{ - Clusters: clusters, - } - - if err := json.NewEncoder(bw).Encode(payload); err != nil { - handleError(err, http.StatusInternalServerError, rw) - return - } -} - -// getJobs godoc -// @summary Lists all jobs -// @tags Job query -// @description Get a list of all jobs. Filters can be applied using query parameters. -// @description Number of results can be limited by page. Results are sorted by descending startTime. -// @produce json -// @param state query string false "Job State" Enums(running, completed, failed, cancelled, stopped, timeout) -// @param cluster query string false "Job Cluster" -// @param start-time query string false "Syntax: '$from-$to', as unix epoch timestamps in seconds" -// @param items-per-page query int false "Items per page (Default: 25)" -// @param page query int false "Page Number (Default: 1)" -// @param with-metadata query bool false "Include metadata (e.g. jobScript) in response" -// @success 200 {object} api.GetJobsApiResponse "Job array and page info" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /api/jobs/ [get] -func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { - withMetadata := false - filter := &model.JobFilter{} - page := &model.PageRequest{ItemsPerPage: 25, Page: 1} - order := &model.OrderByInput{Field: "startTime", Type: "col", Order: model.SortDirectionEnumDesc} - - for key, vals := range r.URL.Query() { - switch key { - case "state": - for _, s := range vals { - state := schema.JobState(s) - if !state.Valid() { - handleError(fmt.Errorf("invalid query parameter value: state"), - http.StatusBadRequest, rw) - return - } - filter.State = append(filter.State, state) - } - case "cluster": - filter.Cluster = &model.StringInput{Eq: &vals[0]} - case "start-time": - st := strings.Split(vals[0], "-") - if len(st) != 2 { - handleError(fmt.Errorf("invalid query parameter value: startTime"), - http.StatusBadRequest, rw) - return - } - from, err := strconv.ParseInt(st[0], 10, 64) - if err != nil { - handleError(err, http.StatusBadRequest, rw) - return - } - to, err := strconv.ParseInt(st[1], 10, 64) - if err != nil { - handleError(err, http.StatusBadRequest, rw) - return - } - ufrom, uto := time.Unix(from, 0), time.Unix(to, 0) - filter.StartTime = &schema.TimeRange{From: &ufrom, To: &uto} - case "page": - x, err := strconv.Atoi(vals[0]) - if err != nil { - handleError(err, http.StatusBadRequest, rw) - return - } - page.Page = x - case "items-per-page": - x, err := strconv.Atoi(vals[0]) - if err != nil { - handleError(err, http.StatusBadRequest, rw) - return - } - page.ItemsPerPage = x - case "with-metadata": - withMetadata = true - default: - handleError(fmt.Errorf("invalid query parameter: %s", key), - http.StatusBadRequest, rw) - return - } - } - - jobs, err := api.JobRepository.QueryJobs(r.Context(), []*model.JobFilter{filter}, page, order) - if err != nil { - handleError(err, http.StatusInternalServerError, rw) - return - } - - results := make([]*schema.Job, 0, len(jobs)) - for _, job := range jobs { - if withMetadata { - if _, err = api.JobRepository.FetchMetadata(job); err != nil { - handleError(err, http.StatusInternalServerError, rw) - return - } - } - - job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID) - if err != nil { - handleError(err, http.StatusInternalServerError, rw) - return - } - - if job.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful { - job.Statistics, err = archive.GetStatistics(job) - if err != nil { - handleError(err, http.StatusInternalServerError, rw) - return - } - } - - results = append(results, job) - } - - log.Debugf("/api/jobs: %d jobs returned", len(results)) - rw.Header().Add("Content-Type", "application/json") - bw := bufio.NewWriter(rw) - defer bw.Flush() - - payload := GetJobsApiResponse{ - Jobs: results, - Items: page.ItemsPerPage, - Page: page.Page, - } - - if err := json.NewEncoder(bw).Encode(payload); err != nil { - handleError(err, http.StatusInternalServerError, rw) - return - } -} - -// getCompleteJobById godoc -// @summary Get job meta and optional all metric data -// @tags Job query -// @description Job to get is specified by database ID -// @description Returns full job resource information according to 'JobMeta' scheme and all metrics according to 'JobData'. -// @produce json -// @param id path int true "Database ID of Job" -// @param all-metrics query bool false "Include all available metrics" -// @success 200 {object} api.GetJobApiResponse "Job resource" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 404 {object} api.ErrorResponse "Resource not found" -// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /api/jobs/{id} [get] -func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) { - // Fetch job from db - id, ok := mux.Vars(r)["id"] - var job *schema.Job - var err error - if ok { - id, e := strconv.ParseInt(id, 10, 64) - if e != nil { - handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw) - return - } - - job, err = api.JobRepository.FindById(r.Context(), id) // Get Job from Repo by ID - } else { - handleError(fmt.Errorf("the parameter 'id' is required"), http.StatusBadRequest, rw) - return - } - if err != nil { - handleError(fmt.Errorf("finding job with db id %s failed: %w", id, err), http.StatusUnprocessableEntity, rw) - return - } - - job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID) - if err != nil { - handleError(err, http.StatusInternalServerError, rw) - return - - } - if _, err = api.JobRepository.FetchMetadata(job); err != nil { - - handleError(err, http.StatusInternalServerError, rw) - return - } - - var scopes []schema.MetricScope - - if job.NumNodes == 1 { - scopes = []schema.MetricScope{"core"} - } else { - scopes = []schema.MetricScope{"node"} - } - - var data schema.JobData - - metricConfigs := archive.GetCluster(job.Cluster).MetricConfig - resolution := 0 - - for _, mc := range metricConfigs { - resolution = max(resolution, mc.Timestep) - } - - if r.URL.Query().Get("all-metrics") == "true" { - data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context(), resolution) - if err != nil { - log.Warnf("REST: error while loading all-metrics job data for JobID %d on %s", job.JobID, job.Cluster) - return - } - } - - log.Debugf("/api/job/%s: get job %d", id, job.JobID) - rw.Header().Add("Content-Type", "application/json") - bw := bufio.NewWriter(rw) - defer bw.Flush() - - payload := GetCompleteJobApiResponse{ - Meta: job, - Data: data, - } - - if err := json.NewEncoder(bw).Encode(payload); err != nil { - handleError(err, http.StatusInternalServerError, rw) - return - } -} - -// getJobById godoc -// @summary Get job meta and configurable metric data -// @tags Job query -// @description Job to get is specified by database ID -// @description Returns full job resource information according to 'JobMeta' scheme and all metrics according to 'JobData'. -// @accept json -// @produce json -// @param id path int true "Database ID of Job" -// @param request body api.GetJobApiRequest true "Array of metric names" -// @success 200 {object} api.GetJobApiResponse "Job resource" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 404 {object} api.ErrorResponse "Resource not found" -// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /api/jobs/{id} [post] -func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { - // Fetch job from db - id, ok := mux.Vars(r)["id"] - var job *schema.Job - var err error - if ok { - id, e := strconv.ParseInt(id, 10, 64) - if e != nil { - handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw) - return - } - - job, err = api.JobRepository.FindById(r.Context(), id) - } else { - handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw) - return - } - if err != nil { - handleError(fmt.Errorf("finding job with db id %s failed: %w", id, err), http.StatusUnprocessableEntity, rw) - return - } - - job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID) - if err != nil { - handleError(err, http.StatusInternalServerError, rw) - return - - } - if _, err = api.JobRepository.FetchMetadata(job); err != nil { - - handleError(err, http.StatusInternalServerError, rw) - return - } - - var metrics GetJobApiRequest - if err = decode(r.Body, &metrics); err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - - var scopes []schema.MetricScope - - if job.NumNodes == 1 { - scopes = []schema.MetricScope{"core"} - } else { - scopes = []schema.MetricScope{"node"} - } - - metricConfigs := archive.GetCluster(job.Cluster).MetricConfig - resolution := 0 - - for _, mc := range metricConfigs { - resolution = max(resolution, mc.Timestep) - } - - data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context(), resolution) - if err != nil { - log.Warnf("REST: error while loading job data for JobID %d on %s", job.JobID, job.Cluster) - return - } - - res := []*JobMetricWithName{} - for name, md := range data { - for scope, metric := range md { - res = append(res, &JobMetricWithName{ - Name: name, - Scope: scope, - Metric: metric, - }) - } - } - - log.Debugf("/api/job/%s: get job %d", id, job.JobID) - rw.Header().Add("Content-Type", "application/json") - bw := bufio.NewWriter(rw) - defer bw.Flush() - - payload := GetJobApiResponse{ - Meta: job, - Data: res, - } - - if err := json.NewEncoder(bw).Encode(payload); err != nil { - handleError(err, http.StatusInternalServerError, rw) - return - } -} - -// editMeta godoc -// @summary Edit meta-data json -// @tags Job add and modify -// @description Edit key value pairs in job metadata json -// @description If a key already exists its content will be overwritten -// @accept json -// @produce json -// @param id path int true "Job Database ID" -// @param request body api.EditMetaRequest true "Kay value pair to add" -// @success 200 {object} schema.Job "Updated job resource" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 404 {object} api.ErrorResponse "Job does not exist" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /api/jobs/edit_meta/{id} [post] -func (api *RestApi) editMeta(rw http.ResponseWriter, r *http.Request) { - id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64) - if err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - - job, err := api.JobRepository.FindById(r.Context(), id) - if err != nil { - http.Error(rw, err.Error(), http.StatusNotFound) - return - } - - var req EditMetaRequest - if err := decode(r.Body, &req); err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - - if err := api.JobRepository.UpdateMetadata(job, req.Key, req.Value); err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - return - } - - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusOK) - json.NewEncoder(rw).Encode(job) -} - -// tagJob godoc -// @summary Adds one or more tags to a job -// @tags Job add and modify -// @description Adds tag(s) to a job specified by DB ID. Name and Type of Tag(s) can be chosen freely. -// @description Tag Scope for frontend visibility will default to "global" if none entered, other options: "admin" or specific username. -// @description If tagged job is already finished: Tag will be written directly to respective archive files. -// @accept json -// @produce json -// @param id path int true "Job Database ID" -// @param request body api.TagJobApiRequest true "Array of tag-objects to add" -// @success 200 {object} schema.Job "Updated job resource" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 404 {object} api.ErrorResponse "Job or tag does not exist" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /api/jobs/tag_job/{id} [post] -func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) { - id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64) - if err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - - job, err := api.JobRepository.FindById(r.Context(), id) - if err != nil { - http.Error(rw, err.Error(), http.StatusNotFound) - return - } - - job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID) - if err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - return - } - - var req TagJobApiRequest - if err := decode(r.Body, &req); err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - - for _, tag := range req { - tagId, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), *job.ID, tag.Type, tag.Name, tag.Scope) - if err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - return - } - - job.Tags = append(job.Tags, &schema.Tag{ - ID: tagId, - Type: tag.Type, - Name: tag.Name, - Scope: tag.Scope, - }) - } - - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusOK) - json.NewEncoder(rw).Encode(job) -} - -// removeTagJob godoc -// @summary Removes one or more tags from a job -// @tags Job add and modify -// @description Removes tag(s) from a job specified by DB ID. Name and Type of Tag(s) must match. -// @description Tag Scope is required for matching, options: "global", "admin". Private tags can not be deleted via API. -// @description If tagged job is already finished: Tag will be removed from respective archive files. -// @accept json -// @produce json -// @param id path int true "Job Database ID" -// @param request body api.TagJobApiRequest true "Array of tag-objects to remove" -// @success 200 {object} schema.Job "Updated job resource" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 404 {object} api.ErrorResponse "Job or tag does not exist" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /jobs/tag_job/{id} [delete] -func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) { - id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64) - if err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - - job, err := api.JobRepository.FindById(r.Context(), id) - if err != nil { - http.Error(rw, err.Error(), http.StatusNotFound) - return - } - - job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID) - if err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - return - } - - var req TagJobApiRequest - if err := decode(r.Body, &req); err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - - for _, rtag := range req { - // Only Global and Admin Tags - if rtag.Scope != "global" && rtag.Scope != "admin" { - log.Warnf("Cannot delete private tag for job %d: Skip", job.JobID) - continue - } - - remainingTags, err := api.JobRepository.RemoveJobTagByRequest(repository.GetUserFromContext(r.Context()), *job.ID, rtag.Type, rtag.Name, rtag.Scope) - if err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - return - } - - job.Tags = remainingTags - } - - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusOK) - json.NewEncoder(rw).Encode(job) -} - -// removeTags godoc -// @summary Removes all tags and job-relations for type:name tuple -// @tags Tag remove -// @description Removes tags by type and name. Name and Type of Tag(s) must match. -// @description Tag Scope is required for matching, options: "global", "admin". Private tags can not be deleted via API. -// @description Tag wills be removed from respective archive files. -// @accept json -// @produce plain -// @param request body api.TagJobApiRequest true "Array of tag-objects to remove" -// @success 200 {string} string "Success Response" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 404 {object} api.ErrorResponse "Job or tag does not exist" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /tags/ [delete] -func (api *RestApi) removeTags(rw http.ResponseWriter, r *http.Request) { - var req TagJobApiRequest - if err := decode(r.Body, &req); err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - - targetCount := len(req) - currentCount := 0 - for _, rtag := range req { - // Only Global and Admin Tags - if rtag.Scope != "global" && rtag.Scope != "admin" { - log.Warn("Cannot delete private tag: Skip") - continue - } - - err := api.JobRepository.RemoveTagByRequest(rtag.Type, rtag.Name, rtag.Scope) - if err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - return - } else { - currentCount++ - } - } - - rw.WriteHeader(http.StatusOK) - fmt.Fprintf(rw, "Deleted Tags from DB: %d successfull of %d requested\n", currentCount, targetCount) -} - -// startJob godoc -// @summary Adds a new job as "running" -// @tags Job add and modify -// @description Job specified in request body will be saved to database as "running" with new DB ID. -// @description Job specifications follow the 'JobMeta' scheme, API will fail to execute if requirements are not met. -// @accept json -// @produce json -// @param request body schema.JobMeta true "Job to add" -// @success 201 {object} api.DefaultJobApiResponse "Job added successfully" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: The combination of jobId, clusterId and startTime does already exist" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /api/jobs/start_job/ [post] -func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { - req := schema.Job{ - Exclusive: 1, - MonitoringStatus: schema.MonitoringStatusRunningOrArchiving, - } - if err := decode(r.Body, &req); err != nil { - handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) - return - } - - log.Printf("REST: %s\n", req.GoString()) - req.State = schema.JobStateRunning - - if err := importer.SanityChecks(&req); err != nil { - handleError(err, http.StatusBadRequest, rw) - return - } - - // aquire lock to avoid race condition between API calls - var unlockOnce sync.Once - api.RepositoryMutex.Lock() - defer unlockOnce.Do(api.RepositoryMutex.Unlock) - - // Check if combination of (job_id, cluster_id, start_time) already exists: - jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil) - if err != nil && err != sql.ErrNoRows { - handleError(fmt.Errorf("checking for duplicate failed: %w", err), http.StatusInternalServerError, rw) - return - } else if err == nil { - for _, job := range jobs { - if (req.StartTime - job.StartTime) < 86400 { - handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID), http.StatusUnprocessableEntity, rw) - return - } - } - } - - id, err := api.JobRepository.Start(&req) - if err != nil { - handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw) - return - } - // unlock here, adding Tags can be async - unlockOnce.Do(api.RepositoryMutex.Unlock) - - for _, tag := range req.Tags { - if _, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), id, tag.Type, tag.Name, tag.Scope); err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - handleError(fmt.Errorf("adding tag to new job %d failed: %w", id, err), http.StatusInternalServerError, rw) - return - } - } - - log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime) - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusCreated) - json.NewEncoder(rw).Encode(DefaultJobApiResponse{ - Message: "success", - }) -} - -// stopJobByRequest godoc -// @summary Marks job as completed and triggers archiving -// @tags Job add and modify -// @description Job to stop is specified by request body. All fields are required in this case. -// @description Returns full job resource information according to 'JobMeta' scheme. -// @produce json -// @param request body api.StopJobApiRequest true "All fields required" -// @success 200 {object} schema.JobMeta "Success message" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 404 {object} api.ErrorResponse "Resource not found" -// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: job has already been stopped" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /api/jobs/stop_job/ [post] -func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) { - // Parse request body - req := StopJobApiRequest{} - if err := decode(r.Body, &req); err != nil { - handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) - return - } - - // Fetch job (that will be stopped) from db - var job *schema.Job - var err error - if req.JobId == nil { - handleError(errors.New("the field 'jobId' is required"), http.StatusBadRequest, rw) - return - } - - // log.Printf("loading db job for stopJobByRequest... : stopJobApiRequest=%v", req) - job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime) - if err != nil { - job, err = api.JobRepository.FindCached(req.JobId, req.Cluster, req.StartTime) - // FIXME: Previous error is hidden - if err != nil { - handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw) - return - } - } - - api.checkAndHandleStopJob(rw, job, req) -} - -// deleteJobById godoc -// @summary Remove a job from the sql database -// @tags Job remove -// @description Job to remove is specified by database ID. This will not remove the job from the job archive. -// @produce json -// @param id path int true "Database ID of Job" -// @success 200 {object} api.DefaultJobApiResponse "Success message" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 404 {object} api.ErrorResponse "Resource not found" -// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /api/jobs/delete_job/{id} [delete] -func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) { - // Fetch job (that will be stopped) from db - id, ok := mux.Vars(r)["id"] - var err error - if ok { - id, e := strconv.ParseInt(id, 10, 64) - if e != nil { - handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw) - return - } - - err = api.JobRepository.DeleteJobById(id) - } else { - handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw) - return - } - if err != nil { - handleError(fmt.Errorf("deleting job failed: %w", err), http.StatusUnprocessableEntity, rw) - return - } - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusOK) - json.NewEncoder(rw).Encode(DefaultJobApiResponse{ - Message: fmt.Sprintf("Successfully deleted job %s", id), - }) -} - -// deleteJobByRequest godoc -// @summary Remove a job from the sql database -// @tags Job remove -// @description Job to delete is specified by request body. All fields are required in this case. -// @accept json -// @produce json -// @param request body api.DeleteJobApiRequest true "All fields required" -// @success 200 {object} api.DefaultJobApiResponse "Success message" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 404 {object} api.ErrorResponse "Resource not found" -// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /api/jobs/delete_job/ [delete] -func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) { - // Parse request body - req := DeleteJobApiRequest{} - if err := decode(r.Body, &req); err != nil { - handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) - return - } - - // Fetch job (that will be deleted) from db - var job *schema.Job - var err error - if req.JobId == nil { - handleError(errors.New("the field 'jobId' is required"), http.StatusBadRequest, rw) - return - } - - job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime) - if err != nil { - handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw) - return - } - - err = api.JobRepository.DeleteJobById(*job.ID) - if err != nil { - handleError(fmt.Errorf("deleting job failed: %w", err), http.StatusUnprocessableEntity, rw) - return - } - - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusOK) - json.NewEncoder(rw).Encode(DefaultJobApiResponse{ - Message: fmt.Sprintf("Successfully deleted job %d", job.ID), - }) -} - -// deleteJobBefore godoc -// @summary Remove a job from the sql database -// @tags Job remove -// @description Remove all jobs with start time before timestamp. The jobs will not be removed from the job archive. -// @produce json -// @param ts path int true "Unix epoch timestamp" -// @success 200 {object} api.DefaultJobApiResponse "Success message" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 404 {object} api.ErrorResponse "Resource not found" -// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /api/jobs/delete_job_before/{ts} [delete] -func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) { - var cnt int - // Fetch job (that will be stopped) from db - id, ok := mux.Vars(r)["ts"] - var err error - if ok { - ts, e := strconv.ParseInt(id, 10, 64) - if e != nil { - handleError(fmt.Errorf("integer expected in path for ts: %w", e), http.StatusBadRequest, rw) - return - } - - cnt, err = api.JobRepository.DeleteJobsBefore(ts) - } else { - handleError(errors.New("the parameter 'ts' is required"), http.StatusBadRequest, rw) - return - } - if err != nil { - handleError(fmt.Errorf("deleting jobs failed: %w", err), http.StatusUnprocessableEntity, rw) - return - } - - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusOK) - json.NewEncoder(rw).Encode(DefaultJobApiResponse{ - Message: fmt.Sprintf("Successfully deleted %d jobs", cnt), - }) -} - -func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobApiRequest) { - // Sanity checks - if job.State != schema.JobStateRunning { - handleError(fmt.Errorf("jobId %d (id %d) on %s : job has already been stopped (state is: %s)", job.JobID, job.ID, job.Cluster, job.State), http.StatusUnprocessableEntity, rw) - return - } - - if job == nil || job.StartTime > req.StopTime { - handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime), http.StatusBadRequest, rw) - return - } - - if req.State != "" && !req.State.Valid() { - handleError(fmt.Errorf("jobId %d (id %d) on %s : invalid requested job state: %#v", job.JobID, job.ID, job.Cluster, req.State), http.StatusBadRequest, rw) - return - } else if req.State == "" { - req.State = schema.JobStateCompleted - } - - // Mark job as stopped in the database (update state and duration) - job.Duration = int32(req.StopTime - job.StartTime) - job.State = req.State - api.JobRepository.Mutex.Lock() - if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { - if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { - api.JobRepository.Mutex.Unlock() - handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw) - return - } - } - api.JobRepository.Mutex.Unlock() - - log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) - - // Send a response (with status OK). This means that erros that happen from here on forward - // can *NOT* be communicated to the client. If reading from a MetricDataRepository or - // writing to the filesystem fails, the client will not know. - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusOK) - json.NewEncoder(rw).Encode(job) - - // Monitoring is disabled... - if job.MonitoringStatus == schema.MonitoringStatusDisabled { - return - } - - // Trigger async archiving - archiver.TriggerArchiving(job) -} - -func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) { - id := mux.Vars(r)["id"] - metrics := r.URL.Query()["metric"] - var scopes []schema.MetricScope - for _, scope := range r.URL.Query()["scope"] { - var s schema.MetricScope - if err := s.UnmarshalGQL(scope); err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - scopes = append(scopes, s) - } - - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusOK) - - type Respone struct { - Data *struct { - JobMetrics []*model.JobMetricWithName `json:"jobMetrics"` - } `json:"data"` - Error *struct { - Message string `json:"message"` - } `json:"error"` - } - - resolver := graph.GetResolverInstance() - data, err := resolver.Query().JobMetrics(r.Context(), id, metrics, scopes, nil) - if err != nil { - json.NewEncoder(rw).Encode(Respone{ - Error: &struct { - Message string "json:\"message\"" - }{Message: err.Error()}, - }) - return - } - - json.NewEncoder(rw).Encode(Respone{ - Data: &struct { - JobMetrics []*model.JobMetricWithName "json:\"jobMetrics\"" - }{JobMetrics: data}, - }) -} - -func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) { - // SecuredCheck() only worked with TokenAuth: Removed - - rw.Header().Set("Content-Type", "text/plain") - me := repository.GetUserFromContext(r.Context()) - if !me.HasRole(schema.RoleAdmin) { - http.Error(rw, "Only admins are allowed to create new users", http.StatusForbidden) - return - } - - username, password, role, name, email, project := r.FormValue("username"), - r.FormValue("password"), r.FormValue("role"), r.FormValue("name"), - r.FormValue("email"), r.FormValue("project") - - if len(password) == 0 && role != schema.GetRoleString(schema.RoleApi) { - http.Error(rw, "Only API users are allowed to have a blank password (login will be impossible)", http.StatusBadRequest) - return - } - - if len(project) != 0 && role != schema.GetRoleString(schema.RoleManager) { - http.Error(rw, "only managers require a project (can be changed later)", - http.StatusBadRequest) - return - } else if len(project) == 0 && role == schema.GetRoleString(schema.RoleManager) { - http.Error(rw, "managers require a project to manage (can be changed later)", - http.StatusBadRequest) - return - } - - if err := repository.GetUserRepository().AddUser(&schema.User{ - Username: username, - Name: name, - Password: password, - Email: email, - Projects: []string{project}, - Roles: []string{role}, - }); err != nil { - http.Error(rw, err.Error(), http.StatusUnprocessableEntity) - return - } - - fmt.Fprintf(rw, "User %v successfully created!\n", username) -} - -func (api *RestApi) deleteUser(rw http.ResponseWriter, r *http.Request) { - // SecuredCheck() only worked with TokenAuth: Removed - - if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) { - http.Error(rw, "Only admins are allowed to delete a user", http.StatusForbidden) - return - } - - username := r.FormValue("username") - if err := repository.GetUserRepository().DelUser(username); err != nil { - http.Error(rw, err.Error(), http.StatusUnprocessableEntity) - return - } - - rw.WriteHeader(http.StatusOK) -} - -// getUsers godoc -// @summary Returns a list of users -// @tags User -// @description Returns a JSON-encoded list of users. -// @description Required query-parameter defines if all users or only users with additional special roles are returned. -// @produce json -// @param not-just-user query bool true "If returned list should contain all users or only users with additional special roles" -// @success 200 {array} api.ApiReturnedUser "List of users returned successfully" -// @failure 400 {string} string "Bad Request" -// @failure 401 {string} string "Unauthorized" -// @failure 403 {string} string "Forbidden" -// @failure 500 {string} string "Internal Server Error" -// @security ApiKeyAuth -// @router /api/users/ [get] -func (api *RestApi) getUsers(rw http.ResponseWriter, r *http.Request) { - // SecuredCheck() only worked with TokenAuth: Removed - - if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) { - http.Error(rw, "Only admins are allowed to fetch a list of users", http.StatusForbidden) - return - } - - users, err := repository.GetUserRepository().ListUsers(r.URL.Query().Get("not-just-user") == "true") - if err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - return - } - - json.NewEncoder(rw).Encode(users) -} - -func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) { - // SecuredCheck() only worked with TokenAuth: Removed - - if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) { - http.Error(rw, "Only admins are allowed to update a user", http.StatusForbidden) - return - } - - // Get Values - newrole := r.FormValue("add-role") - delrole := r.FormValue("remove-role") - newproj := r.FormValue("add-project") - delproj := r.FormValue("remove-project") - - // TODO: Handle anything but roles... - if newrole != "" { - if err := repository.GetUserRepository().AddRole(r.Context(), mux.Vars(r)["id"], newrole); err != nil { - http.Error(rw, err.Error(), http.StatusUnprocessableEntity) - return - } - rw.Write([]byte("Add Role Success")) - } else if delrole != "" { - if err := repository.GetUserRepository().RemoveRole(r.Context(), mux.Vars(r)["id"], delrole); err != nil { - http.Error(rw, err.Error(), http.StatusUnprocessableEntity) - return - } - rw.Write([]byte("Remove Role Success")) - } else if newproj != "" { - if err := repository.GetUserRepository().AddProject(r.Context(), mux.Vars(r)["id"], newproj); err != nil { - http.Error(rw, err.Error(), http.StatusUnprocessableEntity) - return - } - rw.Write([]byte("Add Project Success")) - } else if delproj != "" { - if err := repository.GetUserRepository().RemoveProject(r.Context(), mux.Vars(r)["id"], delproj); err != nil { - http.Error(rw, err.Error(), http.StatusUnprocessableEntity) - return - } - rw.Write([]byte("Remove Project Success")) - } else { - http.Error(rw, "Not Add or Del [role|project]?", http.StatusInternalServerError) - } -} - func (api *RestApi) editNotice(rw http.ResponseWriter, r *http.Request) { // SecuredCheck() only worked with TokenAuth: Removed diff --git a/internal/api/user.go b/internal/api/user.go new file mode 100644 index 0000000..3ba9c87 --- /dev/null +++ b/internal/api/user.go @@ -0,0 +1,159 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package api + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/schema" + "github.com/gorilla/mux" +) + +type ApiReturnedUser struct { + Username string `json:"username"` + Name string `json:"name"` + Roles []string `json:"roles"` + Email string `json:"email"` + Projects []string `json:"projects"` +} + +// getUsers godoc +// @summary Returns a list of users +// @tags User +// @description Returns a JSON-encoded list of users. +// @description Required query-parameter defines if all users or only users with additional special roles are returned. +// @produce json +// @param not-just-user query bool true "If returned list should contain all users or only users with additional special roles" +// @success 200 {array} api.ApiReturnedUser "List of users returned successfully" +// @failure 400 {string} string "Bad Request" +// @failure 401 {string} string "Unauthorized" +// @failure 403 {string} string "Forbidden" +// @failure 500 {string} string "Internal Server Error" +// @security ApiKeyAuth +// @router /api/users/ [get] +func (api *RestApi) getUsers(rw http.ResponseWriter, r *http.Request) { + // SecuredCheck() only worked with TokenAuth: Removed + + if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) { + http.Error(rw, "Only admins are allowed to fetch a list of users", http.StatusForbidden) + return + } + + users, err := repository.GetUserRepository().ListUsers(r.URL.Query().Get("not-just-user") == "true") + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + json.NewEncoder(rw).Encode(users) +} + +func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) { + // SecuredCheck() only worked with TokenAuth: Removed + + if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) { + http.Error(rw, "Only admins are allowed to update a user", http.StatusForbidden) + return + } + + // Get Values + newrole := r.FormValue("add-role") + delrole := r.FormValue("remove-role") + newproj := r.FormValue("add-project") + delproj := r.FormValue("remove-project") + + // TODO: Handle anything but roles... + if newrole != "" { + if err := repository.GetUserRepository().AddRole(r.Context(), mux.Vars(r)["id"], newrole); err != nil { + http.Error(rw, err.Error(), http.StatusUnprocessableEntity) + return + } + rw.Write([]byte("Add Role Success")) + } else if delrole != "" { + if err := repository.GetUserRepository().RemoveRole(r.Context(), mux.Vars(r)["id"], delrole); err != nil { + http.Error(rw, err.Error(), http.StatusUnprocessableEntity) + return + } + rw.Write([]byte("Remove Role Success")) + } else if newproj != "" { + if err := repository.GetUserRepository().AddProject(r.Context(), mux.Vars(r)["id"], newproj); err != nil { + http.Error(rw, err.Error(), http.StatusUnprocessableEntity) + return + } + rw.Write([]byte("Add Project Success")) + } else if delproj != "" { + if err := repository.GetUserRepository().RemoveProject(r.Context(), mux.Vars(r)["id"], delproj); err != nil { + http.Error(rw, err.Error(), http.StatusUnprocessableEntity) + return + } + rw.Write([]byte("Remove Project Success")) + } else { + http.Error(rw, "Not Add or Del [role|project]?", http.StatusInternalServerError) + } +} + +func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) { + // SecuredCheck() only worked with TokenAuth: Removed + + rw.Header().Set("Content-Type", "text/plain") + me := repository.GetUserFromContext(r.Context()) + if !me.HasRole(schema.RoleAdmin) { + http.Error(rw, "Only admins are allowed to create new users", http.StatusForbidden) + return + } + + username, password, role, name, email, project := r.FormValue("username"), + r.FormValue("password"), r.FormValue("role"), r.FormValue("name"), + r.FormValue("email"), r.FormValue("project") + + if len(password) == 0 && role != schema.GetRoleString(schema.RoleApi) { + http.Error(rw, "Only API users are allowed to have a blank password (login will be impossible)", http.StatusBadRequest) + return + } + + if len(project) != 0 && role != schema.GetRoleString(schema.RoleManager) { + http.Error(rw, "only managers require a project (can be changed later)", + http.StatusBadRequest) + return + } else if len(project) == 0 && role == schema.GetRoleString(schema.RoleManager) { + http.Error(rw, "managers require a project to manage (can be changed later)", + http.StatusBadRequest) + return + } + + if err := repository.GetUserRepository().AddUser(&schema.User{ + Username: username, + Name: name, + Password: password, + Email: email, + Projects: []string{project}, + Roles: []string{role}, + }); err != nil { + http.Error(rw, err.Error(), http.StatusUnprocessableEntity) + return + } + + fmt.Fprintf(rw, "User %v successfully created!\n", username) +} + +func (api *RestApi) deleteUser(rw http.ResponseWriter, r *http.Request) { + // SecuredCheck() only worked with TokenAuth: Removed + + if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) { + http.Error(rw, "Only admins are allowed to delete a user", http.StatusForbidden) + return + } + + username := r.FormValue("username") + if err := repository.GetUserRepository().DelUser(username); err != nil { + http.Error(rw, err.Error(), http.StatusUnprocessableEntity) + return + } + + rw.WriteHeader(http.StatusOK) +}