From e81e56ea1d1ee22e21a8a8ba8d8a68a26f7b64a4 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 8 Mar 2024 08:51:05 +0100 Subject: [PATCH] Add rest endpoint edit_meta including helper routines Fixes #219 --- api/swagger.json | 99 +++++++++++++++++++++++++++++++++++ api/swagger.yaml | 68 +++++++++++++++++++++++++ internal/api/docs.go | 102 ++++++++++++++++++++++++++++++++++++- internal/api/rest.go | 63 +++++++++++++++++++++-- internal/repository/job.go | 33 ++++++------ pkg/archive/archive.go | 34 ++++++++++--- 6 files changed, 371 insertions(+), 28 deletions(-) diff --git a/api/swagger.json b/api/swagger.json index 6c3bc5c..0142aa7 100644 --- a/api/swagger.json +++ b/api/swagger.json @@ -327,6 +327,76 @@ } } }, + "/jobs/edit_meta/{id}": { + "post": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Edit key value pairs in job metadata json\nIf a key already exists its content will be overwritten", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "Job add and modify" + ], + "summary": "Edit meta-data json", + "parameters": [ + { + "type": "integer", + "description": "Job Database ID", + "name": "id", + "in": "path", + "required": true + }, + { + "description": "Kay value pair to add", + "name": "request", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/api.EditMetaRequest" + } + } + ], + "responses": { + "200": { + "description": "Updated job resource", + "schema": { + "$ref": "#/definitions/schema.Job" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.ErrorResponse" + } + }, + "401": { + "description": "Unauthorized", + "schema": { + "$ref": "#/definitions/api.ErrorResponse" + } + }, + "404": { + "description": "Job does not exist", + "schema": { + "$ref": "#/definitions/api.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/api.ErrorResponse" + } + } + } + } + }, "/jobs/start_job/": { "post": { "security": [ @@ -1114,6 +1184,19 @@ } } }, + "api.EditMetaRequest": { + "type": "object", + "properties": { + "key": { + "type": "string", + "example": "jobScript" + }, + "value": { + "type": "string", + "example": "bash script" + } + } + }, "api.ErrorResponse": { "type": "object", "properties": { @@ -1252,6 +1335,10 @@ "minimum": 0, "example": 1 }, + "flopsAnyAvg": { + "description": "FlopsAnyAvg as Float64", + "type": "number" + }, "id": { "description": "The unique identifier of a job in the database", "type": "integer" @@ -1278,6 +1365,18 @@ ], "example": "completed" }, + "loadAvg": { + "description": "LoadAvg as Float64", + "type": "number" + }, + "memBwAvg": { + "description": "MemBwAvg as Float64", + "type": "number" + }, + "memUsedMax": { + "description": "MemUsedMax as Float64", + "type": "number" + }, "metaData": { "description": "Additional information about the job", "type": "object", diff --git a/api/swagger.yaml b/api/swagger.yaml index cf3b3e3..add432a 100644 --- a/api/swagger.yaml +++ b/api/swagger.yaml @@ -50,6 +50,15 @@ definitions: msg: type: string type: object + api.EditMetaRequest: + properties: + key: + example: jobScript + type: string + value: + example: bash script + type: string + type: object api.ErrorResponse: properties: error: @@ -150,6 +159,9 @@ definitions: maximum: 2 minimum: 0 type: integer + flopsAnyAvg: + description: FlopsAnyAvg as Float64 + type: number id: description: The unique identifier of a job in the database type: integer @@ -169,6 +181,15 @@ definitions: - timeout - out_of_memory example: completed + loadAvg: + description: LoadAvg as Float64 + type: number + memBwAvg: + description: MemBwAvg as Float64 + type: number + memUsedMax: + description: MemUsedMax as Float64 + type: number metaData: additionalProperties: type: string @@ -810,6 +831,53 @@ paths: summary: Remove a job from the sql database tags: - Job remove + /jobs/edit_meta/{id}: + post: + consumes: + - application/json + description: |- + Edit key value pairs in job metadata json + If a key already exists its content will be overwritten + parameters: + - description: Job Database ID + in: path + name: id + required: true + type: integer + - description: Kay value pair to add + in: body + name: request + required: true + schema: + $ref: '#/definitions/api.EditMetaRequest' + produces: + - application/json + responses: + "200": + description: Updated job resource + schema: + $ref: '#/definitions/schema.Job' + "400": + description: Bad Request + schema: + $ref: '#/definitions/api.ErrorResponse' + "401": + description: Unauthorized + schema: + $ref: '#/definitions/api.ErrorResponse' + "404": + description: Job does not exist + schema: + $ref: '#/definitions/api.ErrorResponse' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/api.ErrorResponse' + security: + - ApiKeyAuth: [] + summary: Edit meta-data json + tags: + - Job add and modify /jobs/start_job/: post: consumes: diff --git a/internal/api/docs.go b/internal/api/docs.go index bf70cdb..c0a34e7 100644 --- a/internal/api/docs.go +++ b/internal/api/docs.go @@ -1,5 +1,4 @@ -// Code generated by swaggo/swag. DO NOT EDIT. - +// Package api Code generated by swaggo/swag. DO NOT EDIT package api import "github.com/swaggo/swag" @@ -334,6 +333,76 @@ const docTemplate = `{ } } }, + "/jobs/edit_meta/{id}": { + "post": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Edit key value pairs in job metadata json\nIf a key already exists its content will be overwritten", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "Job add and modify" + ], + "summary": "Edit meta-data json", + "parameters": [ + { + "type": "integer", + "description": "Job Database ID", + "name": "id", + "in": "path", + "required": true + }, + { + "description": "Kay value pair to add", + "name": "request", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/api.EditMetaRequest" + } + } + ], + "responses": { + "200": { + "description": "Updated job resource", + "schema": { + "$ref": "#/definitions/schema.Job" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.ErrorResponse" + } + }, + "401": { + "description": "Unauthorized", + "schema": { + "$ref": "#/definitions/api.ErrorResponse" + } + }, + "404": { + "description": "Job does not exist", + "schema": { + "$ref": "#/definitions/api.ErrorResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/api.ErrorResponse" + } + } + } + } + }, "/jobs/start_job/": { "post": { "security": [ @@ -1121,6 +1190,19 @@ const docTemplate = `{ } } }, + "api.EditMetaRequest": { + "type": "object", + "properties": { + "key": { + "type": "string", + "example": "jobScript" + }, + "value": { + "type": "string", + "example": "bash script" + } + } + }, "api.ErrorResponse": { "type": "object", "properties": { @@ -1259,6 +1341,10 @@ const docTemplate = `{ "minimum": 0, "example": 1 }, + "flopsAnyAvg": { + "description": "FlopsAnyAvg as Float64", + "type": "number" + }, "id": { "description": "The unique identifier of a job in the database", "type": "integer" @@ -1285,6 +1371,18 @@ const docTemplate = `{ ], "example": "completed" }, + "loadAvg": { + "description": "LoadAvg as Float64", + "type": "number" + }, + "memBwAvg": { + "description": "MemBwAvg as Float64", + "type": "number" + }, + "memUsedMax": { + "description": "MemUsedMax as Float64", + "type": "number" + }, "metaData": { "description": "Additional information about the job", "type": "object", diff --git a/internal/api/rest.go b/internal/api/rest.go index 11682ee..564bd1c 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -71,6 +71,7 @@ func (api *RestApi) MountRoutes(r *mux.Router) { r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet) r.HandleFunc("/jobs/{id}", api.getJobById).Methods(http.MethodPost) r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch) + r.HandleFunc("/jobs/edit_meta/{id}", api.editMeta).Methods(http.MethodPost, http.MethodPatch) r.HandleFunc("/jobs/metrics/{id}", api.getJobMetrics).Methods(http.MethodGet) r.HandleFunc("/jobs/delete_job/", api.deleteJobByRequest).Methods(http.MethodDelete) r.HandleFunc("/jobs/delete_job/{id}", api.deleteJobById).Methods(http.MethodDelete) @@ -146,6 +147,12 @@ type ApiTag struct { Name string `json:"name" example:"Testjob"` // Tag Name } +// ApiMeta model +type EditMetaRequest struct { + Key string `json:"key" example:"jobScript"` + Value string `json:"value" example:"bash script"` +} + type TagJobApiRequest []*ApiTag type GetJobApiRequest []string @@ -243,7 +250,6 @@ func securedCheck(r *http.Request) error { // @security ApiKeyAuth // @router /jobs/ [get] func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { - if user := repository.GetUserFromContext(r.Context()); user != nil && !user.HasRole(schema.RoleApi) { @@ -464,6 +470,57 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { } } +// editMeta godoc +// @summary Edit meta-data json +// @tags Job add and modify +// @description Edit key value pairs in job metadata json +// @description If a key already exists its content will be overwritten +// @accept json +// @produce json +// @param id path int true "Job Database ID" +// @param request body api.EditMetaRequest true "Kay value pair to add" +// @success 200 {object} schema.Job "Updated job resource" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 404 {object} api.ErrorResponse "Job does not exist" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /jobs/edit_meta/{id} [post] +func (api *RestApi) editMeta(rw http.ResponseWriter, r *http.Request) { + if user := repository.GetUserFromContext(r.Context()); user != nil && + !user.HasRole(schema.RoleApi) { + handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw) + return + } + + iid, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + job, err := api.JobRepository.FindById(iid) + if err != nil { + http.Error(rw, err.Error(), http.StatusNotFound) + return + } + + var req EditMetaRequest + if err := decode(r.Body, &req); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + if err := api.JobRepository.UpdateMetadata(job, req.Key, req.Value); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusOK) + json.NewEncoder(rw).Encode(job) +} + // tagJob godoc // @summary Adds one or more tags to a job // @tags Job add and modify @@ -873,7 +930,6 @@ func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) { } func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobApiRequest) { - // Sanity checks if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning { handleError(errors.New("stopTime must be larger than startTime and only running jobs can be stopped"), http.StatusBadRequest, rw) @@ -1015,7 +1071,8 @@ func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) { Password: password, Email: email, Projects: []string{project}, - Roles: []string{role}}); err != nil { + Roles: []string{role}, + }); err != nil { http.Error(rw, err.Error(), http.StatusUnprocessableEntity) return } diff --git a/internal/repository/job.go b/internal/repository/job.go index e1a997a..ec20d98 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -16,6 +16,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/metricdata" + "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -212,7 +213,7 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er } r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour) - return nil + return archive.UpdateMetadata(job, job.MetaData) } // Find executes a SQL query to find a specific batch job. @@ -223,8 +224,8 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er func (r *JobRepository) Find( jobId *int64, cluster *string, - startTime *int64) (*schema.Job, error) { - + startTime *int64, +) (*schema.Job, error) { start := time.Now() q := sq.Select(jobColumns...).From("job"). Where("job.job_id = ?", *jobId) @@ -248,8 +249,8 @@ func (r *JobRepository) Find( func (r *JobRepository) FindAll( jobId *int64, cluster *string, - startTime *int64) ([]*schema.Job, error) { - + startTime *int64, +) ([]*schema.Job, error) { start := time.Now() q := sq.Select(jobColumns...).From("job"). Where("job.job_id = ?", *jobId) @@ -292,7 +293,8 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) { func (r *JobRepository) FindConcurrentJobs( ctx context.Context, - job *schema.Job) (*model.JobLinkResultList, error) { + job *schema.Job, +) (*model.JobLinkResultList, error) { if job == nil { return nil, nil } @@ -420,8 +422,8 @@ func (r *JobRepository) Stop( jobId int64, duration int32, state schema.JobState, - monitoringStatus int32) (err error) { - + monitoringStatus int32, +) (err error) { stmt := sq.Update("job"). Set("job_state", state). Set("duration", duration). @@ -435,7 +437,7 @@ func (r *JobRepository) Stop( func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) { var cnt int qs := fmt.Sprintf("SELECT count(*) FROM job WHERE job.start_time < %d", startTime) - err := r.DB.Get(&cnt, qs) //ignore error as it will also occur in delete statement + err := r.DB.Get(&cnt, qs) // ignore error as it will also occur in delete statement _, err = r.DB.Exec(`DELETE FROM job WHERE job.start_time < ?`, startTime) if err != nil { log.Errorf(" DeleteJobsBefore(%d): error %#v", startTime, err) @@ -468,8 +470,8 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32 func (r *JobRepository) MarkArchived( jobId int64, monitoringStatus int32, - metricStats map[string]schema.JobStatistics) error { - + metricStats map[string]schema.JobStatistics, +) error { stmt := sq.Update("job"). Set("monitoring_status", monitoringStatus). Where("job.id = ?", jobId) @@ -578,8 +580,10 @@ func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm } } -var ErrNotFound = errors.New("no such jobname, project or user") -var ErrForbidden = errors.New("not authorized") +var ( + ErrNotFound = errors.New("no such jobname, project or user") + ErrForbidden = errors.New("not authorized") +) func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, table string, selectColumn string, whereColumn string, isLike bool) (result string, err error) { compareStr := " = ?" @@ -663,7 +667,6 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) { // AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host. // Hosts with zero jobs running on them will not show up! func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) { - start := time.Now() subclusters := make(map[string]map[string]int) rows, err := sq.Select("resources", "subcluster").From("job"). @@ -706,7 +709,6 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in } func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { - start := time.Now() res, err := sq.Update("job"). Set("monitoring_status", schema.MonitoringStatusArchivingFailed). @@ -735,7 +737,6 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { } func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) { - var query sq.SelectBuilder if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd { diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index beeb24d..7e7b6e8 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -52,9 +52,11 @@ type JobContainer struct { Data *schema.JobData } -var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) -var ar ArchiveBackend -var useArchive bool +var ( + cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) + ar ArchiveBackend + useArchive bool +) func Init(rawConfig json.RawMessage, disableArchive bool) error { useArchive = !disableArchive @@ -95,8 +97,8 @@ func GetHandle() ArchiveBackend { func LoadAveragesFromArchive( job *schema.Job, metrics []string, - data [][]schema.Float) error { - + data [][]schema.Float, +) error { metaFile, err := ar.LoadJobMeta(job) if err != nil { log.Warn("Error while loading job metadata from archiveBackend") @@ -115,7 +117,6 @@ func LoadAveragesFromArchive( } func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) { - metaFile, err := ar.LoadJobMeta(job) if err != nil { log.Warn("Error while loading job metadata from archiveBackend") @@ -125,10 +126,29 @@ func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) { return metaFile.Statistics, nil } +// If the job is archived, find its `meta.json` file and override the Metadata +// in that JSON file. If the job is not archived, nothing is done. +func UpdateMetadata(job *schema.Job, metadata map[string]string) error { + if job.State == schema.JobStateRunning || !useArchive { + return nil + } + + jobMeta, err := ar.LoadJobMeta(job) + if err != nil { + log.Warn("Error while loading job metadata from archiveBackend") + return err + } + + for k, v := range metadata { + jobMeta.MetaData[k] = v + } + + return ar.StoreJobMeta(jobMeta) +} + // If the job is archived, find its `meta.json` file and override the tags list // in that JSON file. If the job is not archived, nothing is done. func UpdateTags(job *schema.Job, tags []*schema.Tag) error { - if job.State == schema.JobStateRunning || !useArchive { return nil }