feat: Add rest endpoint to add/edit Metadata entry

Add rest endpoint edit_meta including helper routines
This commit is contained in:
Jan Eitzinger 2024-03-08 09:01:39 +01:00 committed by GitHub
commit ce97780741
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 371 additions and 28 deletions

View File

@ -327,6 +327,76 @@
} }
} }
}, },
"/jobs/edit_meta/{id}": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Edit key value pairs in job metadata json\nIf a key already exists its content will be overwritten",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"Job add and modify"
],
"summary": "Edit meta-data json",
"parameters": [
{
"type": "integer",
"description": "Job Database ID",
"name": "id",
"in": "path",
"required": true
},
{
"description": "Kay value pair to add",
"name": "request",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/api.EditMetaRequest"
}
}
],
"responses": {
"200": {
"description": "Updated job resource",
"schema": {
"$ref": "#/definitions/schema.Job"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"401": {
"description": "Unauthorized",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"404": {
"description": "Job does not exist",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
}
}
}
},
"/jobs/start_job/": { "/jobs/start_job/": {
"post": { "post": {
"security": [ "security": [
@ -1114,6 +1184,19 @@
} }
} }
}, },
"api.EditMetaRequest": {
"type": "object",
"properties": {
"key": {
"type": "string",
"example": "jobScript"
},
"value": {
"type": "string",
"example": "bash script"
}
}
},
"api.ErrorResponse": { "api.ErrorResponse": {
"type": "object", "type": "object",
"properties": { "properties": {
@ -1252,6 +1335,10 @@
"minimum": 0, "minimum": 0,
"example": 1 "example": 1
}, },
"flopsAnyAvg": {
"description": "FlopsAnyAvg as Float64",
"type": "number"
},
"id": { "id": {
"description": "The unique identifier of a job in the database", "description": "The unique identifier of a job in the database",
"type": "integer" "type": "integer"
@ -1278,6 +1365,18 @@
], ],
"example": "completed" "example": "completed"
}, },
"loadAvg": {
"description": "LoadAvg as Float64",
"type": "number"
},
"memBwAvg": {
"description": "MemBwAvg as Float64",
"type": "number"
},
"memUsedMax": {
"description": "MemUsedMax as Float64",
"type": "number"
},
"metaData": { "metaData": {
"description": "Additional information about the job", "description": "Additional information about the job",
"type": "object", "type": "object",

View File

@ -50,6 +50,15 @@ definitions:
msg: msg:
type: string type: string
type: object type: object
api.EditMetaRequest:
properties:
key:
example: jobScript
type: string
value:
example: bash script
type: string
type: object
api.ErrorResponse: api.ErrorResponse:
properties: properties:
error: error:
@ -150,6 +159,9 @@ definitions:
maximum: 2 maximum: 2
minimum: 0 minimum: 0
type: integer type: integer
flopsAnyAvg:
description: FlopsAnyAvg as Float64
type: number
id: id:
description: The unique identifier of a job in the database description: The unique identifier of a job in the database
type: integer type: integer
@ -169,6 +181,15 @@ definitions:
- timeout - timeout
- out_of_memory - out_of_memory
example: completed example: completed
loadAvg:
description: LoadAvg as Float64
type: number
memBwAvg:
description: MemBwAvg as Float64
type: number
memUsedMax:
description: MemUsedMax as Float64
type: number
metaData: metaData:
additionalProperties: additionalProperties:
type: string type: string
@ -810,6 +831,53 @@ paths:
summary: Remove a job from the sql database summary: Remove a job from the sql database
tags: tags:
- Job remove - Job remove
/jobs/edit_meta/{id}:
post:
consumes:
- application/json
description: |-
Edit key value pairs in job metadata json
If a key already exists its content will be overwritten
parameters:
- description: Job Database ID
in: path
name: id
required: true
type: integer
- description: Kay value pair to add
in: body
name: request
required: true
schema:
$ref: '#/definitions/api.EditMetaRequest'
produces:
- application/json
responses:
"200":
description: Updated job resource
schema:
$ref: '#/definitions/schema.Job'
"400":
description: Bad Request
schema:
$ref: '#/definitions/api.ErrorResponse'
"401":
description: Unauthorized
schema:
$ref: '#/definitions/api.ErrorResponse'
"404":
description: Job does not exist
schema:
$ref: '#/definitions/api.ErrorResponse'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/api.ErrorResponse'
security:
- ApiKeyAuth: []
summary: Edit meta-data json
tags:
- Job add and modify
/jobs/start_job/: /jobs/start_job/:
post: post:
consumes: consumes:

View File

@ -1,5 +1,4 @@
// Code generated by swaggo/swag. DO NOT EDIT. // Package api Code generated by swaggo/swag. DO NOT EDIT
package api package api
import "github.com/swaggo/swag" import "github.com/swaggo/swag"
@ -334,6 +333,76 @@ const docTemplate = `{
} }
} }
}, },
"/jobs/edit_meta/{id}": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Edit key value pairs in job metadata json\nIf a key already exists its content will be overwritten",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"Job add and modify"
],
"summary": "Edit meta-data json",
"parameters": [
{
"type": "integer",
"description": "Job Database ID",
"name": "id",
"in": "path",
"required": true
},
{
"description": "Kay value pair to add",
"name": "request",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/api.EditMetaRequest"
}
}
],
"responses": {
"200": {
"description": "Updated job resource",
"schema": {
"$ref": "#/definitions/schema.Job"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"401": {
"description": "Unauthorized",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"404": {
"description": "Job does not exist",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
}
}
}
},
"/jobs/start_job/": { "/jobs/start_job/": {
"post": { "post": {
"security": [ "security": [
@ -1121,6 +1190,19 @@ const docTemplate = `{
} }
} }
}, },
"api.EditMetaRequest": {
"type": "object",
"properties": {
"key": {
"type": "string",
"example": "jobScript"
},
"value": {
"type": "string",
"example": "bash script"
}
}
},
"api.ErrorResponse": { "api.ErrorResponse": {
"type": "object", "type": "object",
"properties": { "properties": {
@ -1259,6 +1341,10 @@ const docTemplate = `{
"minimum": 0, "minimum": 0,
"example": 1 "example": 1
}, },
"flopsAnyAvg": {
"description": "FlopsAnyAvg as Float64",
"type": "number"
},
"id": { "id": {
"description": "The unique identifier of a job in the database", "description": "The unique identifier of a job in the database",
"type": "integer" "type": "integer"
@ -1285,6 +1371,18 @@ const docTemplate = `{
], ],
"example": "completed" "example": "completed"
}, },
"loadAvg": {
"description": "LoadAvg as Float64",
"type": "number"
},
"memBwAvg": {
"description": "MemBwAvg as Float64",
"type": "number"
},
"memUsedMax": {
"description": "MemUsedMax as Float64",
"type": "number"
},
"metaData": { "metaData": {
"description": "Additional information about the job", "description": "Additional information about the job",
"type": "object", "type": "object",

View File

@ -71,6 +71,7 @@ func (api *RestApi) MountRoutes(r *mux.Router) {
r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet) r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet)
r.HandleFunc("/jobs/{id}", api.getJobById).Methods(http.MethodPost) r.HandleFunc("/jobs/{id}", api.getJobById).Methods(http.MethodPost)
r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch) r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch)
r.HandleFunc("/jobs/edit_meta/{id}", api.editMeta).Methods(http.MethodPost, http.MethodPatch)
r.HandleFunc("/jobs/metrics/{id}", api.getJobMetrics).Methods(http.MethodGet) r.HandleFunc("/jobs/metrics/{id}", api.getJobMetrics).Methods(http.MethodGet)
r.HandleFunc("/jobs/delete_job/", api.deleteJobByRequest).Methods(http.MethodDelete) r.HandleFunc("/jobs/delete_job/", api.deleteJobByRequest).Methods(http.MethodDelete)
r.HandleFunc("/jobs/delete_job/{id}", api.deleteJobById).Methods(http.MethodDelete) r.HandleFunc("/jobs/delete_job/{id}", api.deleteJobById).Methods(http.MethodDelete)
@ -146,6 +147,12 @@ type ApiTag struct {
Name string `json:"name" example:"Testjob"` // Tag Name Name string `json:"name" example:"Testjob"` // Tag Name
} }
// ApiMeta model
type EditMetaRequest struct {
Key string `json:"key" example:"jobScript"`
Value string `json:"value" example:"bash script"`
}
type TagJobApiRequest []*ApiTag type TagJobApiRequest []*ApiTag
type GetJobApiRequest []string type GetJobApiRequest []string
@ -243,7 +250,6 @@ func securedCheck(r *http.Request) error {
// @security ApiKeyAuth // @security ApiKeyAuth
// @router /jobs/ [get] // @router /jobs/ [get]
func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
if user := repository.GetUserFromContext(r.Context()); user != nil && if user := repository.GetUserFromContext(r.Context()); user != nil &&
!user.HasRole(schema.RoleApi) { !user.HasRole(schema.RoleApi) {
@ -464,6 +470,57 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
} }
} }
// editMeta godoc
// @summary Edit meta-data json
// @tags Job add and modify
// @description Edit key value pairs in job metadata json
// @description If a key already exists its content will be overwritten
// @accept json
// @produce json
// @param id path int true "Job Database ID"
// @param request body api.EditMetaRequest true "Kay value pair to add"
// @success 200 {object} schema.Job "Updated job resource"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 404 {object} api.ErrorResponse "Job does not exist"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /jobs/edit_meta/{id} [post]
func (api *RestApi) editMeta(rw http.ResponseWriter, r *http.Request) {
if user := repository.GetUserFromContext(r.Context()); user != nil &&
!user.HasRole(schema.RoleApi) {
handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw)
return
}
iid, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
job, err := api.JobRepository.FindById(iid)
if err != nil {
http.Error(rw, err.Error(), http.StatusNotFound)
return
}
var req EditMetaRequest
if err := decode(r.Body, &req); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
if err := api.JobRepository.UpdateMetadata(job, req.Key, req.Value); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(job)
}
// tagJob godoc // tagJob godoc
// @summary Adds one or more tags to a job // @summary Adds one or more tags to a job
// @tags Job add and modify // @tags Job add and modify
@ -873,7 +930,6 @@ func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
} }
func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobApiRequest) { func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobApiRequest) {
// Sanity checks // Sanity checks
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning { if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
handleError(errors.New("stopTime must be larger than startTime and only running jobs can be stopped"), http.StatusBadRequest, rw) handleError(errors.New("stopTime must be larger than startTime and only running jobs can be stopped"), http.StatusBadRequest, rw)
@ -1015,7 +1071,8 @@ func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) {
Password: password, Password: password,
Email: email, Email: email,
Projects: []string{project}, Projects: []string{project},
Roles: []string{role}}); err != nil { Roles: []string{role},
}); err != nil {
http.Error(rw, err.Error(), http.StatusUnprocessableEntity) http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
return return
} }

View File

@ -16,6 +16,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
@ -212,7 +213,7 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
} }
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour) r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
return nil return archive.UpdateMetadata(job, job.MetaData)
} }
// Find executes a SQL query to find a specific batch job. // Find executes a SQL query to find a specific batch job.
@ -223,8 +224,8 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
func (r *JobRepository) Find( func (r *JobRepository) Find(
jobId *int64, jobId *int64,
cluster *string, cluster *string,
startTime *int64) (*schema.Job, error) { startTime *int64,
) (*schema.Job, error) {
start := time.Now() start := time.Now()
q := sq.Select(jobColumns...).From("job"). q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId) Where("job.job_id = ?", *jobId)
@ -248,8 +249,8 @@ func (r *JobRepository) Find(
func (r *JobRepository) FindAll( func (r *JobRepository) FindAll(
jobId *int64, jobId *int64,
cluster *string, cluster *string,
startTime *int64) ([]*schema.Job, error) { startTime *int64,
) ([]*schema.Job, error) {
start := time.Now() start := time.Now()
q := sq.Select(jobColumns...).From("job"). q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId) Where("job.job_id = ?", *jobId)
@ -292,7 +293,8 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {
func (r *JobRepository) FindConcurrentJobs( func (r *JobRepository) FindConcurrentJobs(
ctx context.Context, ctx context.Context,
job *schema.Job) (*model.JobLinkResultList, error) { job *schema.Job,
) (*model.JobLinkResultList, error) {
if job == nil { if job == nil {
return nil, nil return nil, nil
} }
@ -420,8 +422,8 @@ func (r *JobRepository) Stop(
jobId int64, jobId int64,
duration int32, duration int32,
state schema.JobState, state schema.JobState,
monitoringStatus int32) (err error) { monitoringStatus int32,
) (err error) {
stmt := sq.Update("job"). stmt := sq.Update("job").
Set("job_state", state). Set("job_state", state).
Set("duration", duration). Set("duration", duration).
@ -435,7 +437,7 @@ func (r *JobRepository) Stop(
func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) { func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
var cnt int var cnt int
qs := fmt.Sprintf("SELECT count(*) FROM job WHERE job.start_time < %d", startTime) qs := fmt.Sprintf("SELECT count(*) FROM job WHERE job.start_time < %d", startTime)
err := r.DB.Get(&cnt, qs) //ignore error as it will also occur in delete statement err := r.DB.Get(&cnt, qs) // ignore error as it will also occur in delete statement
_, err = r.DB.Exec(`DELETE FROM job WHERE job.start_time < ?`, startTime) _, err = r.DB.Exec(`DELETE FROM job WHERE job.start_time < ?`, startTime)
if err != nil { if err != nil {
log.Errorf(" DeleteJobsBefore(%d): error %#v", startTime, err) log.Errorf(" DeleteJobsBefore(%d): error %#v", startTime, err)
@ -468,8 +470,8 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
func (r *JobRepository) MarkArchived( func (r *JobRepository) MarkArchived(
jobId int64, jobId int64,
monitoringStatus int32, monitoringStatus int32,
metricStats map[string]schema.JobStatistics) error { metricStats map[string]schema.JobStatistics,
) error {
stmt := sq.Update("job"). stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus). Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobId) Where("job.id = ?", jobId)
@ -578,8 +580,10 @@ func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm
} }
} }
var ErrNotFound = errors.New("no such jobname, project or user") var (
var ErrForbidden = errors.New("not authorized") ErrNotFound = errors.New("no such jobname, project or user")
ErrForbidden = errors.New("not authorized")
)
func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, table string, selectColumn string, whereColumn string, isLike bool) (result string, err error) { func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, table string, selectColumn string, whereColumn string, isLike bool) (result string, err error) {
compareStr := " = ?" compareStr := " = ?"
@ -663,7 +667,6 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) {
// AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host. // AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host.
// Hosts with zero jobs running on them will not show up! // Hosts with zero jobs running on them will not show up!
func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) { func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) {
start := time.Now() start := time.Now()
subclusters := make(map[string]map[string]int) subclusters := make(map[string]map[string]int)
rows, err := sq.Select("resources", "subcluster").From("job"). rows, err := sq.Select("resources", "subcluster").From("job").
@ -706,7 +709,6 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
} }
func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
start := time.Now() start := time.Now()
res, err := sq.Update("job"). res, err := sq.Update("job").
Set("monitoring_status", schema.MonitoringStatusArchivingFailed). Set("monitoring_status", schema.MonitoringStatusArchivingFailed).
@ -735,7 +737,6 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
} }
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) { func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) {
var query sq.SelectBuilder var query sq.SelectBuilder
if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd { if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {

View File

@ -52,9 +52,11 @@ type JobContainer struct {
Data *schema.JobData Data *schema.JobData
} }
var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024) var (
var ar ArchiveBackend cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
var useArchive bool ar ArchiveBackend
useArchive bool
)
func Init(rawConfig json.RawMessage, disableArchive bool) error { func Init(rawConfig json.RawMessage, disableArchive bool) error {
useArchive = !disableArchive useArchive = !disableArchive
@ -95,8 +97,8 @@ func GetHandle() ArchiveBackend {
func LoadAveragesFromArchive( func LoadAveragesFromArchive(
job *schema.Job, job *schema.Job,
metrics []string, metrics []string,
data [][]schema.Float) error { data [][]schema.Float,
) error {
metaFile, err := ar.LoadJobMeta(job) metaFile, err := ar.LoadJobMeta(job)
if err != nil { if err != nil {
log.Warn("Error while loading job metadata from archiveBackend") log.Warn("Error while loading job metadata from archiveBackend")
@ -115,7 +117,6 @@ func LoadAveragesFromArchive(
} }
func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) { func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
metaFile, err := ar.LoadJobMeta(job) metaFile, err := ar.LoadJobMeta(job)
if err != nil { if err != nil {
log.Warn("Error while loading job metadata from archiveBackend") log.Warn("Error while loading job metadata from archiveBackend")
@ -125,10 +126,29 @@ func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
return metaFile.Statistics, nil return metaFile.Statistics, nil
} }
// If the job is archived, find its `meta.json` file and override the Metadata
// in that JSON file. If the job is not archived, nothing is done.
func UpdateMetadata(job *schema.Job, metadata map[string]string) error {
if job.State == schema.JobStateRunning || !useArchive {
return nil
}
jobMeta, err := ar.LoadJobMeta(job)
if err != nil {
log.Warn("Error while loading job metadata from archiveBackend")
return err
}
for k, v := range metadata {
jobMeta.MetaData[k] = v
}
return ar.StoreJobMeta(jobMeta)
}
// If the job is archived, find its `meta.json` file and override the tags list // If the job is archived, find its `meta.json` file and override the tags list
// in that JSON file. If the job is not archived, nothing is done. // in that JSON file. If the job is not archived, nothing is done.
func UpdateTags(job *schema.Job, tags []*schema.Tag) error { func UpdateTags(job *schema.Job, tags []*schema.Tag) error {
if job.State == schema.JobStateRunning || !useArchive { if job.State == schema.JobStateRunning || !useArchive {
return nil return nil
} }