mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-01-13 21:19:06 +01:00
Add stats and startTime filter to jobs api query
This commit is contained in:
parent
a528e42be6
commit
6ccf354035
@ -18,7 +18,7 @@ paths:
|
|||||||
get:
|
get:
|
||||||
operationId: 'getJobs'
|
operationId: 'getJobs'
|
||||||
summary: 'List all jobs'
|
summary: 'List all jobs'
|
||||||
description: 'Get a list of all jobs in the JSON schema defined via GraphQL (main difference: start-time is RFC3339 encoded and there are no statistics). Filters can be applied using query parameters.'
|
description: 'Get a list of all jobs. Filters can be applied using query parameters.'
|
||||||
parameters:
|
parameters:
|
||||||
- name: state
|
- name: state
|
||||||
in: query
|
in: query
|
||||||
@ -28,37 +28,30 @@ paths:
|
|||||||
- name: cluster
|
- name: cluster
|
||||||
in: query
|
in: query
|
||||||
schema: { type: string }
|
schema: { type: string }
|
||||||
|
- name: start-time
|
||||||
|
description: 'Syntax: "<from>-<to>", where <from> and <to> are unix timestamps in seconds'
|
||||||
|
in: query
|
||||||
|
schema: { type: string }
|
||||||
|
- name: page
|
||||||
|
in: query
|
||||||
|
schema: { type: integer }
|
||||||
|
- name: items-per-page
|
||||||
|
in: query
|
||||||
|
schema: { type: integer }
|
||||||
responses:
|
responses:
|
||||||
200:
|
200:
|
||||||
description: 'Array of jobs'
|
description: 'Array of jobs'
|
||||||
content:
|
content:
|
||||||
'application/json':
|
'application/json':
|
||||||
schema:
|
schema:
|
||||||
type: array
|
type: object
|
||||||
items:
|
properties:
|
||||||
# Not totally true: start-time is a RFC3339 string here!
|
jobs:
|
||||||
$ref: '#/components/schemas/Job'
|
type: array
|
||||||
|
items:
|
||||||
|
$ref: '#/components/schemas/Job'
|
||||||
400:
|
400:
|
||||||
description: 'Bad Request'
|
description: 'Bad Request'
|
||||||
'/api/jobs/{id}':
|
|
||||||
get:
|
|
||||||
operationId: 'getJob'
|
|
||||||
summary: 'Get job resource'
|
|
||||||
parameters:
|
|
||||||
- name: id
|
|
||||||
in: path
|
|
||||||
required: true
|
|
||||||
schema: { type: integer }
|
|
||||||
description: 'Database ID (Resource Identifier)'
|
|
||||||
responses:
|
|
||||||
200:
|
|
||||||
description: 'Job resource'
|
|
||||||
content:
|
|
||||||
'application/json':
|
|
||||||
schema:
|
|
||||||
$ref: '#/components/schemas/Job'
|
|
||||||
404:
|
|
||||||
description: 'Resource not found'
|
|
||||||
'/api/jobs/tag_job/{id}':
|
'/api/jobs/tag_job/{id}':
|
||||||
post:
|
post:
|
||||||
operationId: 'tagJob'
|
operationId: 'tagJob'
|
||||||
|
119
api/rest.go
119
api/rest.go
@ -12,7 +12,9 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ClusterCockpit/cc-backend/auth"
|
"github.com/ClusterCockpit/cc-backend/auth"
|
||||||
"github.com/ClusterCockpit/cc-backend/config"
|
"github.com/ClusterCockpit/cc-backend/config"
|
||||||
@ -41,7 +43,7 @@ func (api *RestApi) MountRoutes(r *mux.Router) {
|
|||||||
r.HandleFunc("/jobs/stop_job/{id}", api.stopJob).Methods(http.MethodPost, http.MethodPut)
|
r.HandleFunc("/jobs/stop_job/{id}", api.stopJob).Methods(http.MethodPost, http.MethodPut)
|
||||||
|
|
||||||
r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet)
|
r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet)
|
||||||
r.HandleFunc("/jobs/{id}", api.getJob).Methods(http.MethodGet)
|
// r.HandleFunc("/jobs/{id}", api.getJob).Methods(http.MethodGet)
|
||||||
r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch)
|
r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch)
|
||||||
|
|
||||||
r.HandleFunc("/jobs/metrics/{id}", api.getJobMetrics).Methods(http.MethodGet)
|
r.HandleFunc("/jobs/metrics/{id}", api.getJobMetrics).Methods(http.MethodGet)
|
||||||
@ -83,6 +85,12 @@ func handleError(err error, statusCode int, rw http.ResponseWriter) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func decode(r io.Reader, val interface{}) error {
|
||||||
|
dec := json.NewDecoder(r)
|
||||||
|
dec.DisallowUnknownFields()
|
||||||
|
return dec.Decode(val)
|
||||||
|
}
|
||||||
|
|
||||||
type TagJobApiRequest []*struct {
|
type TagJobApiRequest []*struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
@ -90,7 +98,9 @@ type TagJobApiRequest []*struct {
|
|||||||
|
|
||||||
// Return a list of jobs
|
// Return a list of jobs
|
||||||
func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
|
||||||
filter := model.JobFilter{}
|
filter := &model.JobFilter{}
|
||||||
|
page := &model.PageRequest{ItemsPerPage: 50, Page: 1}
|
||||||
|
order := &model.OrderByInput{Field: "startTime", Order: model.SortDirectionEnumDesc}
|
||||||
for key, vals := range r.URL.Query() {
|
for key, vals := range r.URL.Query() {
|
||||||
switch key {
|
switch key {
|
||||||
case "state":
|
case "state":
|
||||||
@ -104,65 +114,110 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
case "cluster":
|
case "cluster":
|
||||||
filter.Cluster = &model.StringInput{Eq: &vals[0]}
|
filter.Cluster = &model.StringInput{Eq: &vals[0]}
|
||||||
|
case "start-time":
|
||||||
|
st := strings.Split(vals[0], "-")
|
||||||
|
if len(st) != 2 {
|
||||||
|
http.Error(rw, "invalid query parameter value: startTime", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
from, err := strconv.ParseInt(st[0], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
to, err := strconv.ParseInt(st[1], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ufrom, uto := time.Unix(from, 0), time.Unix(to, 0)
|
||||||
|
filter.StartTime = &model.TimeRange{From: &ufrom, To: &uto}
|
||||||
|
case "page":
|
||||||
|
x, err := strconv.Atoi(vals[0])
|
||||||
|
if err != nil {
|
||||||
|
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
page.Page = x
|
||||||
|
case "items-per-page":
|
||||||
|
x, err := strconv.Atoi(vals[0])
|
||||||
|
if err != nil {
|
||||||
|
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
page.ItemsPerPage = x
|
||||||
default:
|
default:
|
||||||
http.Error(rw, "invalid query parameter: "+key, http.StatusBadRequest)
|
http.Error(rw, "invalid query parameter: "+key, http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
results, err := api.Resolver.Query().Jobs(r.Context(), []*model.JobFilter{&filter}, nil, nil)
|
jobs, err := api.JobRepository.QueryJobs(r.Context(), []*model.JobFilter{filter}, page, order)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
results := make([]*schema.JobMeta, 0, len(jobs))
|
||||||
|
for _, job := range jobs {
|
||||||
|
res := &schema.JobMeta{
|
||||||
|
ID: &job.ID,
|
||||||
|
BaseJob: job.BaseJob,
|
||||||
|
StartTime: job.StartTime.Unix(),
|
||||||
|
}
|
||||||
|
|
||||||
|
res.Tags, err = api.JobRepository.GetTags(&job.ID)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful {
|
||||||
|
res.Statistics, err = metricdata.GetStatistics(job)
|
||||||
|
if err != nil {
|
||||||
|
if err != nil {
|
||||||
|
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
results = append(results, res)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("/api/jobs: %d jobs returned", len(results))
|
||||||
bw := bufio.NewWriter(rw)
|
bw := bufio.NewWriter(rw)
|
||||||
defer bw.Flush()
|
defer bw.Flush()
|
||||||
|
if err := json.NewEncoder(bw).Encode(map[string]interface{}{
|
||||||
if err := json.NewEncoder(bw).Encode(results.Items); err != nil {
|
"jobs": results,
|
||||||
|
}); err != nil {
|
||||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return a single job
|
|
||||||
func (api *RestApi) getJob(rw http.ResponseWriter, r *http.Request) {
|
|
||||||
id := mux.Vars(r)["id"]
|
|
||||||
|
|
||||||
job, err := api.Resolver.Query().Job(r.Context(), id)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(rw, err.Error(), http.StatusNotFound)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
job.Tags, err = api.Resolver.Job().Tags(r.Context(), job)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
rw.Header().Add("Content-Type", "application/json")
|
|
||||||
rw.WriteHeader(http.StatusOK)
|
|
||||||
json.NewEncoder(rw).Encode(job)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add a tag to a job
|
// Add a tag to a job
|
||||||
func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
|
func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
|
||||||
id := mux.Vars(r)["id"]
|
iid, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64)
|
||||||
job, err := api.Resolver.Query().Job(r.Context(), id)
|
if err != nil {
|
||||||
|
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
job, err := api.JobRepository.FindById(iid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(rw, err.Error(), http.StatusNotFound)
|
http.Error(rw, err.Error(), http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
job.Tags, err = api.Resolver.Job().Tags(r.Context(), job)
|
job.Tags, err = api.JobRepository.GetTags(&job.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var req TagJobApiRequest
|
var req TagJobApiRequest
|
||||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
if err := decode(r.Body, &req); err != nil {
|
||||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -195,7 +250,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
req := schema.JobMeta{BaseJob: schema.JobDefaults}
|
req := schema.JobMeta{BaseJob: schema.JobDefaults}
|
||||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
if err := decode(r.Body, &req); err != nil {
|
||||||
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -264,7 +319,7 @@ func (api *RestApi) stopJob(rw http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
// Parse request body
|
// Parse request body
|
||||||
req := StopJobApiRequest{}
|
req := StopJobApiRequest{}
|
||||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
if err := decode(r.Body, &req); err != nil {
|
||||||
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -62,6 +62,27 @@ func loadFromArchive(job *schema.Job) (schema.JobData, error) {
|
|||||||
return data.(schema.JobData), nil
|
return data.(schema.JobData), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func loadMetaJson(job *schema.Job) (*schema.JobMeta, error) {
|
||||||
|
filename, err := getPath(job, "meta.json", true)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
bytes, err := os.ReadFile(filename)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var metaFile schema.JobMeta = schema.JobMeta{
|
||||||
|
BaseJob: schema.JobDefaults,
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(bytes, &metaFile); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &metaFile, nil
|
||||||
|
}
|
||||||
|
|
||||||
// If the job is archived, find its `meta.json` file and override the tags list
|
// If the job is archived, find its `meta.json` file and override the tags list
|
||||||
// in that JSON file. If the job is not archived, nothing is done.
|
// in that JSON file. If the job is not archived, nothing is done.
|
||||||
func UpdateTags(job *schema.Job, tags []*schema.Tag) error {
|
func UpdateTags(job *schema.Job, tags []*schema.Tag) error {
|
||||||
@ -108,21 +129,11 @@ func UpdateTags(job *schema.Job, tags []*schema.Tag) error {
|
|||||||
|
|
||||||
// Helper to metricdata.LoadAverages().
|
// Helper to metricdata.LoadAverages().
|
||||||
func loadAveragesFromArchive(job *schema.Job, metrics []string, data [][]schema.Float) error {
|
func loadAveragesFromArchive(job *schema.Job, metrics []string, data [][]schema.Float) error {
|
||||||
filename, err := getPath(job, "meta.json", true)
|
metaFile, err := loadMetaJson(job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
bytes, err := os.ReadFile(filename)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var metaFile schema.JobMeta
|
|
||||||
if err := json.Unmarshal(bytes, &metaFile); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, m := range metrics {
|
for i, m := range metrics {
|
||||||
if stat, ok := metaFile.Statistics[m]; ok {
|
if stat, ok := metaFile.Statistics[m]; ok {
|
||||||
data[i] = append(data[i], schema.Float(stat.Avg))
|
data[i] = append(data[i], schema.Float(stat.Avg))
|
||||||
@ -134,6 +145,15 @@ func loadAveragesFromArchive(job *schema.Job, metrics []string, data [][]schema.
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
|
||||||
|
metaFile, err := loadMetaJson(job)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return metaFile.Statistics, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Writes a running job to the job-archive
|
// Writes a running job to the job-archive
|
||||||
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
|
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
|
||||||
allMetrics := make([]string, 0)
|
allMetrics := make([]string, 0)
|
||||||
|
Loading…
Reference in New Issue
Block a user