diff --git a/api/openapi.yaml b/api/openapi.yaml index 1bd154d..7d5f977 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -18,7 +18,7 @@ paths: get: operationId: 'getJobs' summary: 'List all jobs' - description: 'Get a list of all jobs in the JSON schema defined via GraphQL (main difference: start-time is RFC3339 encoded and there are no statistics). Filters can be applied using query parameters.' + description: 'Get a list of all jobs. Filters can be applied using query parameters.' parameters: - name: state in: query @@ -28,37 +28,30 @@ paths: - name: cluster in: query schema: { type: string } + - name: start-time + description: 'Syntax: "-", where and are unix timestamps in seconds' + in: query + schema: { type: string } + - name: page + in: query + schema: { type: integer } + - name: items-per-page + in: query + schema: { type: integer } responses: 200: description: 'Array of jobs' content: 'application/json': schema: - type: array - items: - # Not totally true: start-time is a RFC3339 string here! - $ref: '#/components/schemas/Job' + type: object + properties: + jobs: + type: array + items: + $ref: '#/components/schemas/Job' 400: description: 'Bad Request' - '/api/jobs/{id}': - get: - operationId: 'getJob' - summary: 'Get job resource' - parameters: - - name: id - in: path - required: true - schema: { type: integer } - description: 'Database ID (Resource Identifier)' - responses: - 200: - description: 'Job resource' - content: - 'application/json': - schema: - $ref: '#/components/schemas/Job' - 404: - description: 'Resource not found' '/api/jobs/tag_job/{id}': post: operationId: 'tagJob' diff --git a/api/rest.go b/api/rest.go index 7d367d9..7ca4e6c 100644 --- a/api/rest.go +++ b/api/rest.go @@ -12,7 +12,9 @@ import ( "os" "path/filepath" "strconv" + "strings" "sync" + "time" "github.com/ClusterCockpit/cc-backend/auth" "github.com/ClusterCockpit/cc-backend/config" @@ -41,7 +43,7 @@ func (api *RestApi) MountRoutes(r *mux.Router) { r.HandleFunc("/jobs/stop_job/{id}", api.stopJob).Methods(http.MethodPost, http.MethodPut) r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet) - r.HandleFunc("/jobs/{id}", api.getJob).Methods(http.MethodGet) + // r.HandleFunc("/jobs/{id}", api.getJob).Methods(http.MethodGet) r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch) r.HandleFunc("/jobs/metrics/{id}", api.getJobMetrics).Methods(http.MethodGet) @@ -83,6 +85,12 @@ func handleError(err error, statusCode int, rw http.ResponseWriter) { }) } +func decode(r io.Reader, val interface{}) error { + dec := json.NewDecoder(r) + dec.DisallowUnknownFields() + return dec.Decode(val) +} + type TagJobApiRequest []*struct { Name string `json:"name"` Type string `json:"type"` @@ -90,7 +98,9 @@ type TagJobApiRequest []*struct { // Return a list of jobs func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { - filter := model.JobFilter{} + filter := &model.JobFilter{} + page := &model.PageRequest{ItemsPerPage: 50, Page: 1} + order := &model.OrderByInput{Field: "startTime", Order: model.SortDirectionEnumDesc} for key, vals := range r.URL.Query() { switch key { case "state": @@ -104,65 +114,110 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { } case "cluster": filter.Cluster = &model.StringInput{Eq: &vals[0]} + case "start-time": + st := strings.Split(vals[0], "-") + if len(st) != 2 { + http.Error(rw, "invalid query parameter value: startTime", http.StatusBadRequest) + return + } + from, err := strconv.ParseInt(st[0], 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + to, err := strconv.ParseInt(st[1], 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + ufrom, uto := time.Unix(from, 0), time.Unix(to, 0) + filter.StartTime = &model.TimeRange{From: &ufrom, To: &uto} + case "page": + x, err := strconv.Atoi(vals[0]) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + page.Page = x + case "items-per-page": + x, err := strconv.Atoi(vals[0]) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + page.ItemsPerPage = x default: http.Error(rw, "invalid query parameter: "+key, http.StatusBadRequest) return } } - results, err := api.Resolver.Query().Jobs(r.Context(), []*model.JobFilter{&filter}, nil, nil) + jobs, err := api.JobRepository.QueryJobs(r.Context(), []*model.JobFilter{filter}, page, order) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return } + results := make([]*schema.JobMeta, 0, len(jobs)) + for _, job := range jobs { + res := &schema.JobMeta{ + ID: &job.ID, + BaseJob: job.BaseJob, + StartTime: job.StartTime.Unix(), + } + + res.Tags, err = api.JobRepository.GetTags(&job.ID) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + if res.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful { + res.Statistics, err = metricdata.GetStatistics(job) + if err != nil { + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + } + } + + results = append(results, res) + } + + log.Debugf("/api/jobs: %d jobs returned", len(results)) bw := bufio.NewWriter(rw) defer bw.Flush() - - if err := json.NewEncoder(bw).Encode(results.Items); err != nil { + if err := json.NewEncoder(bw).Encode(map[string]interface{}{ + "jobs": results, + }); err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return } } -// Return a single job -func (api *RestApi) getJob(rw http.ResponseWriter, r *http.Request) { - id := mux.Vars(r)["id"] - - job, err := api.Resolver.Query().Job(r.Context(), id) - if err != nil { - http.Error(rw, err.Error(), http.StatusNotFound) - return - } - - job.Tags, err = api.Resolver.Job().Tags(r.Context(), job) - if err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - return - } - - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusOK) - json.NewEncoder(rw).Encode(job) -} - // Add a tag to a job func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) { - id := mux.Vars(r)["id"] - job, err := api.Resolver.Query().Job(r.Context(), id) + iid, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + job, err := api.JobRepository.FindById(iid) if err != nil { http.Error(rw, err.Error(), http.StatusNotFound) return } - job.Tags, err = api.Resolver.Job().Tags(r.Context(), job) + job.Tags, err = api.JobRepository.GetTags(&job.ID) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return } var req TagJobApiRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + if err := decode(r.Body, &req); err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) return } @@ -195,7 +250,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { } req := schema.JobMeta{BaseJob: schema.JobDefaults} - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + if err := decode(r.Body, &req); err != nil { handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) return } @@ -264,7 +319,7 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) { // Parse request body req := StopJobApiRequest{} - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + if err := decode(r.Body, &req); err != nil { handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) return } diff --git a/metricdata/archive.go b/metricdata/archive.go index ba439b0..9959b8f 100644 --- a/metricdata/archive.go +++ b/metricdata/archive.go @@ -62,6 +62,27 @@ func loadFromArchive(job *schema.Job) (schema.JobData, error) { return data.(schema.JobData), nil } +func loadMetaJson(job *schema.Job) (*schema.JobMeta, error) { + filename, err := getPath(job, "meta.json", true) + if err != nil { + return nil, err + } + + bytes, err := os.ReadFile(filename) + if err != nil { + return nil, err + } + + var metaFile schema.JobMeta = schema.JobMeta{ + BaseJob: schema.JobDefaults, + } + if err := json.Unmarshal(bytes, &metaFile); err != nil { + return nil, err + } + + return &metaFile, nil +} + // If the job is archived, find its `meta.json` file and override the tags list // in that JSON file. If the job is not archived, nothing is done. func UpdateTags(job *schema.Job, tags []*schema.Tag) error { @@ -108,21 +129,11 @@ func UpdateTags(job *schema.Job, tags []*schema.Tag) error { // Helper to metricdata.LoadAverages(). func loadAveragesFromArchive(job *schema.Job, metrics []string, data [][]schema.Float) error { - filename, err := getPath(job, "meta.json", true) + metaFile, err := loadMetaJson(job) if err != nil { return err } - bytes, err := os.ReadFile(filename) - if err != nil { - return err - } - - var metaFile schema.JobMeta - if err := json.Unmarshal(bytes, &metaFile); err != nil { - return err - } - for i, m := range metrics { if stat, ok := metaFile.Statistics[m]; ok { data[i] = append(data[i], schema.Float(stat.Avg)) @@ -134,6 +145,15 @@ func loadAveragesFromArchive(job *schema.Job, metrics []string, data [][]schema. return nil } +func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) { + metaFile, err := loadMetaJson(job) + if err != nil { + return nil, err + } + + return metaFile.Statistics, nil +} + // Writes a running job to the job-archive func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { allMetrics := make([]string, 0)