mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-06-08 08:33:49 +02:00
Refactor rest api
This commit is contained in:
parent
6f9737c2c2
commit
1d8e7e072f
70
internal/api/cluster.go
Normal file
70
internal/api/cluster.go
Normal file
@ -0,0 +1,70 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package api
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
)
|
||||
|
||||
// GetClustersApiResponse model
|
||||
type GetClustersApiResponse struct {
|
||||
Clusters []*schema.Cluster `json:"clusters"` // Array of clusters
|
||||
}
|
||||
|
||||
// getClusters godoc
|
||||
// @summary Lists all cluster configs
|
||||
// @tags Cluster query
|
||||
// @description Get a list of all cluster configs. Specific cluster can be requested using query parameter.
|
||||
// @produce json
|
||||
// @param cluster query string false "Job Cluster"
|
||||
// @success 200 {object} api.GetClustersApiResponse "Array of clusters"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 403 {object} api.ErrorResponse "Forbidden"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /api/clusters/ [get]
|
||||
func (api *RestApi) getClusters(rw http.ResponseWriter, r *http.Request) {
|
||||
if user := repository.GetUserFromContext(r.Context()); user != nil &&
|
||||
!user.HasRole(schema.RoleApi) {
|
||||
|
||||
handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw)
|
||||
return
|
||||
}
|
||||
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
bw := bufio.NewWriter(rw)
|
||||
defer bw.Flush()
|
||||
|
||||
var clusters []*schema.Cluster
|
||||
|
||||
if r.URL.Query().Has("cluster") {
|
||||
name := r.URL.Query().Get("cluster")
|
||||
cluster := archive.GetCluster(name)
|
||||
if cluster == nil {
|
||||
handleError(fmt.Errorf("unknown cluster: %s", name), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
clusters = append(clusters, cluster)
|
||||
} else {
|
||||
clusters = archive.Clusters
|
||||
}
|
||||
|
||||
payload := GetClustersApiResponse{
|
||||
Clusters: clusters,
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(bw).Encode(payload); err != nil {
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
}
|
987
internal/api/job.go
Normal file
987
internal/api/job.go
Normal file
@ -0,0 +1,987 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package api
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/archiver"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/importer"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
// DefaultApiResponse model
|
||||
type DefaultJobApiResponse struct {
|
||||
Message string `json:"msg"`
|
||||
}
|
||||
|
||||
// StopJobApiRequest model
|
||||
type StopJobApiRequest struct {
|
||||
JobId *int64 `json:"jobId" example:"123000"`
|
||||
Cluster *string `json:"cluster" example:"fritz"`
|
||||
StartTime *int64 `json:"startTime" example:"1649723812"`
|
||||
State schema.JobState `json:"jobState" validate:"required" example:"completed"`
|
||||
StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"`
|
||||
}
|
||||
|
||||
// DeleteJobApiRequest model
|
||||
type DeleteJobApiRequest struct {
|
||||
JobId *int64 `json:"jobId" validate:"required" example:"123000"` // Cluster Job ID of job
|
||||
Cluster *string `json:"cluster" example:"fritz"` // Cluster of job
|
||||
StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch
|
||||
}
|
||||
|
||||
// GetJobsApiResponse model
|
||||
type GetJobsApiResponse struct {
|
||||
Jobs []*schema.Job `json:"jobs"` // Array of jobs
|
||||
Items int `json:"items"` // Number of jobs returned
|
||||
Page int `json:"page"` // Page id returned
|
||||
}
|
||||
|
||||
// ApiTag model
|
||||
type ApiTag struct {
|
||||
// Tag Type
|
||||
Type string `json:"type" example:"Debug"`
|
||||
Name string `json:"name" example:"Testjob"` // Tag Name
|
||||
Scope string `json:"scope" example:"global"` // Tag Scope for Frontend Display
|
||||
}
|
||||
|
||||
// ApiMeta model
|
||||
type EditMetaRequest struct {
|
||||
Key string `json:"key" example:"jobScript"`
|
||||
Value string `json:"value" example:"bash script"`
|
||||
}
|
||||
|
||||
type TagJobApiRequest []*ApiTag
|
||||
|
||||
type GetJobApiRequest []string
|
||||
|
||||
type GetJobApiResponse struct {
|
||||
Meta *schema.Job
|
||||
Data []*JobMetricWithName
|
||||
}
|
||||
|
||||
type GetCompleteJobApiResponse struct {
|
||||
Meta *schema.Job
|
||||
Data schema.JobData
|
||||
}
|
||||
|
||||
type JobMetricWithName struct {
|
||||
Metric *schema.JobMetric `json:"metric"`
|
||||
Name string `json:"name"`
|
||||
Scope schema.MetricScope `json:"scope"`
|
||||
}
|
||||
|
||||
// getJobs godoc
|
||||
// @summary Lists all jobs
|
||||
// @tags Job query
|
||||
// @description Get a list of all jobs. Filters can be applied using query parameters.
|
||||
// @description Number of results can be limited by page. Results are sorted by descending startTime.
|
||||
// @produce json
|
||||
// @param state query string false "Job State" Enums(running, completed, failed, cancelled, stopped, timeout)
|
||||
// @param cluster query string false "Job Cluster"
|
||||
// @param start-time query string false "Syntax: '$from-$to', as unix epoch timestamps in seconds"
|
||||
// @param items-per-page query int false "Items per page (Default: 25)"
|
||||
// @param page query int false "Page Number (Default: 1)"
|
||||
// @param with-metadata query bool false "Include metadata (e.g. jobScript) in response"
|
||||
// @success 200 {object} api.GetJobsApiResponse "Job array and page info"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 403 {object} api.ErrorResponse "Forbidden"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /api/jobs/ [get]
|
||||
func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
|
||||
withMetadata := false
|
||||
filter := &model.JobFilter{}
|
||||
page := &model.PageRequest{ItemsPerPage: 25, Page: 1}
|
||||
order := &model.OrderByInput{Field: "startTime", Type: "col", Order: model.SortDirectionEnumDesc}
|
||||
|
||||
for key, vals := range r.URL.Query() {
|
||||
switch key {
|
||||
case "state":
|
||||
for _, s := range vals {
|
||||
state := schema.JobState(s)
|
||||
if !state.Valid() {
|
||||
handleError(fmt.Errorf("invalid query parameter value: state"),
|
||||
http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
filter.State = append(filter.State, state)
|
||||
}
|
||||
case "cluster":
|
||||
filter.Cluster = &model.StringInput{Eq: &vals[0]}
|
||||
case "start-time":
|
||||
st := strings.Split(vals[0], "-")
|
||||
if len(st) != 2 {
|
||||
handleError(fmt.Errorf("invalid query parameter value: startTime"),
|
||||
http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
from, err := strconv.ParseInt(st[0], 10, 64)
|
||||
if err != nil {
|
||||
handleError(err, http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
to, err := strconv.ParseInt(st[1], 10, 64)
|
||||
if err != nil {
|
||||
handleError(err, http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
ufrom, uto := time.Unix(from, 0), time.Unix(to, 0)
|
||||
filter.StartTime = &schema.TimeRange{From: &ufrom, To: &uto}
|
||||
case "page":
|
||||
x, err := strconv.Atoi(vals[0])
|
||||
if err != nil {
|
||||
handleError(err, http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
page.Page = x
|
||||
case "items-per-page":
|
||||
x, err := strconv.Atoi(vals[0])
|
||||
if err != nil {
|
||||
handleError(err, http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
page.ItemsPerPage = x
|
||||
case "with-metadata":
|
||||
withMetadata = true
|
||||
default:
|
||||
handleError(fmt.Errorf("invalid query parameter: %s", key),
|
||||
http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
jobs, err := api.JobRepository.QueryJobs(r.Context(), []*model.JobFilter{filter}, page, order)
|
||||
if err != nil {
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
|
||||
results := make([]*schema.Job, 0, len(jobs))
|
||||
for _, job := range jobs {
|
||||
if withMetadata {
|
||||
if _, err = api.JobRepository.FetchMetadata(job); err != nil {
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
|
||||
if err != nil {
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
|
||||
if job.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful {
|
||||
job.Statistics, err = archive.GetStatistics(job)
|
||||
if err != nil {
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
results = append(results, job)
|
||||
}
|
||||
|
||||
log.Debugf("/api/jobs: %d jobs returned", len(results))
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
bw := bufio.NewWriter(rw)
|
||||
defer bw.Flush()
|
||||
|
||||
payload := GetJobsApiResponse{
|
||||
Jobs: results,
|
||||
Items: page.ItemsPerPage,
|
||||
Page: page.Page,
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(bw).Encode(payload); err != nil {
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// getCompleteJobById godoc
|
||||
// @summary Get job meta and optional all metric data
|
||||
// @tags Job query
|
||||
// @description Job to get is specified by database ID
|
||||
// @description Returns full job resource information according to 'JobMeta' scheme and all metrics according to 'JobData'.
|
||||
// @produce json
|
||||
// @param id path int true "Database ID of Job"
|
||||
// @param all-metrics query bool false "Include all available metrics"
|
||||
// @success 200 {object} api.GetJobApiResponse "Job resource"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 403 {object} api.ErrorResponse "Forbidden"
|
||||
// @failure 404 {object} api.ErrorResponse "Resource not found"
|
||||
// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /api/jobs/{id} [get]
|
||||
func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) {
|
||||
// Fetch job from db
|
||||
id, ok := mux.Vars(r)["id"]
|
||||
var job *schema.Job
|
||||
var err error
|
||||
if ok {
|
||||
id, e := strconv.ParseInt(id, 10, 64)
|
||||
if e != nil {
|
||||
handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
job, err = api.JobRepository.FindById(r.Context(), id) // Get Job from Repo by ID
|
||||
} else {
|
||||
handleError(fmt.Errorf("the parameter 'id' is required"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
handleError(fmt.Errorf("finding job with db id %s failed: %w", id, err), http.StatusUnprocessableEntity, rw)
|
||||
return
|
||||
}
|
||||
|
||||
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
|
||||
if err != nil {
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
|
||||
}
|
||||
if _, err = api.JobRepository.FetchMetadata(job); err != nil {
|
||||
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
|
||||
var scopes []schema.MetricScope
|
||||
|
||||
if job.NumNodes == 1 {
|
||||
scopes = []schema.MetricScope{"core"}
|
||||
} else {
|
||||
scopes = []schema.MetricScope{"node"}
|
||||
}
|
||||
|
||||
var data schema.JobData
|
||||
|
||||
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
|
||||
resolution := 0
|
||||
|
||||
for _, mc := range metricConfigs {
|
||||
resolution = max(resolution, mc.Timestep)
|
||||
}
|
||||
|
||||
if r.URL.Query().Get("all-metrics") == "true" {
|
||||
data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context(), resolution)
|
||||
if err != nil {
|
||||
log.Warnf("REST: error while loading all-metrics job data for JobID %d on %s", job.JobID, job.Cluster)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("/api/job/%s: get job %d", id, job.JobID)
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
bw := bufio.NewWriter(rw)
|
||||
defer bw.Flush()
|
||||
|
||||
payload := GetCompleteJobApiResponse{
|
||||
Meta: job,
|
||||
Data: data,
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(bw).Encode(payload); err != nil {
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// getJobById godoc
|
||||
// @summary Get job meta and configurable metric data
|
||||
// @tags Job query
|
||||
// @description Job to get is specified by database ID
|
||||
// @description Returns full job resource information according to 'JobMeta' scheme and all metrics according to 'JobData'.
|
||||
// @accept json
|
||||
// @produce json
|
||||
// @param id path int true "Database ID of Job"
|
||||
// @param request body api.GetJobApiRequest true "Array of metric names"
|
||||
// @success 200 {object} api.GetJobApiResponse "Job resource"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 403 {object} api.ErrorResponse "Forbidden"
|
||||
// @failure 404 {object} api.ErrorResponse "Resource not found"
|
||||
// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /api/jobs/{id} [post]
|
||||
func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
|
||||
// Fetch job from db
|
||||
id, ok := mux.Vars(r)["id"]
|
||||
var job *schema.Job
|
||||
var err error
|
||||
if ok {
|
||||
id, e := strconv.ParseInt(id, 10, 64)
|
||||
if e != nil {
|
||||
handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
job, err = api.JobRepository.FindById(r.Context(), id)
|
||||
} else {
|
||||
handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
handleError(fmt.Errorf("finding job with db id %s failed: %w", id, err), http.StatusUnprocessableEntity, rw)
|
||||
return
|
||||
}
|
||||
|
||||
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
|
||||
if err != nil {
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
|
||||
}
|
||||
if _, err = api.JobRepository.FetchMetadata(job); err != nil {
|
||||
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
|
||||
var metrics GetJobApiRequest
|
||||
if err = decode(r.Body, &metrics); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var scopes []schema.MetricScope
|
||||
|
||||
if job.NumNodes == 1 {
|
||||
scopes = []schema.MetricScope{"core"}
|
||||
} else {
|
||||
scopes = []schema.MetricScope{"node"}
|
||||
}
|
||||
|
||||
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
|
||||
resolution := 0
|
||||
|
||||
for _, mc := range metricConfigs {
|
||||
resolution = max(resolution, mc.Timestep)
|
||||
}
|
||||
|
||||
data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context(), resolution)
|
||||
if err != nil {
|
||||
log.Warnf("REST: error while loading job data for JobID %d on %s", job.JobID, job.Cluster)
|
||||
return
|
||||
}
|
||||
|
||||
res := []*JobMetricWithName{}
|
||||
for name, md := range data {
|
||||
for scope, metric := range md {
|
||||
res = append(res, &JobMetricWithName{
|
||||
Name: name,
|
||||
Scope: scope,
|
||||
Metric: metric,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("/api/job/%s: get job %d", id, job.JobID)
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
bw := bufio.NewWriter(rw)
|
||||
defer bw.Flush()
|
||||
|
||||
payload := GetJobApiResponse{
|
||||
Meta: job,
|
||||
Data: res,
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(bw).Encode(payload); err != nil {
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// editMeta godoc
|
||||
// @summary Edit meta-data json
|
||||
// @tags Job add and modify
|
||||
// @description Edit key value pairs in job metadata json
|
||||
// @description If a key already exists its content will be overwritten
|
||||
// @accept json
|
||||
// @produce json
|
||||
// @param id path int true "Job Database ID"
|
||||
// @param request body api.EditMetaRequest true "Kay value pair to add"
|
||||
// @success 200 {object} schema.Job "Updated job resource"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 404 {object} api.ErrorResponse "Job does not exist"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /api/jobs/edit_meta/{id} [post]
|
||||
func (api *RestApi) editMeta(rw http.ResponseWriter, r *http.Request) {
|
||||
id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64)
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
job, err := api.JobRepository.FindById(r.Context(), id)
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
var req EditMetaRequest
|
||||
if err := decode(r.Body, &req); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if err := api.JobRepository.UpdateMetadata(job, req.Key, req.Value); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(rw).Encode(job)
|
||||
}
|
||||
|
||||
// tagJob godoc
|
||||
// @summary Adds one or more tags to a job
|
||||
// @tags Job add and modify
|
||||
// @description Adds tag(s) to a job specified by DB ID. Name and Type of Tag(s) can be chosen freely.
|
||||
// @description Tag Scope for frontend visibility will default to "global" if none entered, other options: "admin" or specific username.
|
||||
// @description If tagged job is already finished: Tag will be written directly to respective archive files.
|
||||
// @accept json
|
||||
// @produce json
|
||||
// @param id path int true "Job Database ID"
|
||||
// @param request body api.TagJobApiRequest true "Array of tag-objects to add"
|
||||
// @success 200 {object} schema.Job "Updated job resource"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 404 {object} api.ErrorResponse "Job or tag does not exist"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /api/jobs/tag_job/{id} [post]
|
||||
func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
|
||||
id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64)
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
job, err := api.JobRepository.FindById(r.Context(), id)
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
var req TagJobApiRequest
|
||||
if err := decode(r.Body, &req); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
for _, tag := range req {
|
||||
tagId, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), *job.ID, tag.Type, tag.Name, tag.Scope)
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
job.Tags = append(job.Tags, &schema.Tag{
|
||||
ID: tagId,
|
||||
Type: tag.Type,
|
||||
Name: tag.Name,
|
||||
Scope: tag.Scope,
|
||||
})
|
||||
}
|
||||
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(rw).Encode(job)
|
||||
}
|
||||
|
||||
// removeTagJob godoc
|
||||
// @summary Removes one or more tags from a job
|
||||
// @tags Job add and modify
|
||||
// @description Removes tag(s) from a job specified by DB ID. Name and Type of Tag(s) must match.
|
||||
// @description Tag Scope is required for matching, options: "global", "admin". Private tags can not be deleted via API.
|
||||
// @description If tagged job is already finished: Tag will be removed from respective archive files.
|
||||
// @accept json
|
||||
// @produce json
|
||||
// @param id path int true "Job Database ID"
|
||||
// @param request body api.TagJobApiRequest true "Array of tag-objects to remove"
|
||||
// @success 200 {object} schema.Job "Updated job resource"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 404 {object} api.ErrorResponse "Job or tag does not exist"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /jobs/tag_job/{id} [delete]
|
||||
func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) {
|
||||
id, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64)
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
job, err := api.JobRepository.FindById(r.Context(), id)
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
var req TagJobApiRequest
|
||||
if err := decode(r.Body, &req); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
for _, rtag := range req {
|
||||
// Only Global and Admin Tags
|
||||
if rtag.Scope != "global" && rtag.Scope != "admin" {
|
||||
log.Warnf("Cannot delete private tag for job %d: Skip", job.JobID)
|
||||
continue
|
||||
}
|
||||
|
||||
remainingTags, err := api.JobRepository.RemoveJobTagByRequest(repository.GetUserFromContext(r.Context()), *job.ID, rtag.Type, rtag.Name, rtag.Scope)
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
job.Tags = remainingTags
|
||||
}
|
||||
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(rw).Encode(job)
|
||||
}
|
||||
|
||||
// removeTags godoc
|
||||
// @summary Removes all tags and job-relations for type:name tuple
|
||||
// @tags Tag remove
|
||||
// @description Removes tags by type and name. Name and Type of Tag(s) must match.
|
||||
// @description Tag Scope is required for matching, options: "global", "admin". Private tags can not be deleted via API.
|
||||
// @description Tag wills be removed from respective archive files.
|
||||
// @accept json
|
||||
// @produce plain
|
||||
// @param request body api.TagJobApiRequest true "Array of tag-objects to remove"
|
||||
// @success 200 {string} string "Success Response"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 404 {object} api.ErrorResponse "Job or tag does not exist"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /tags/ [delete]
|
||||
func (api *RestApi) removeTags(rw http.ResponseWriter, r *http.Request) {
|
||||
var req TagJobApiRequest
|
||||
if err := decode(r.Body, &req); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
targetCount := len(req)
|
||||
currentCount := 0
|
||||
for _, rtag := range req {
|
||||
// Only Global and Admin Tags
|
||||
if rtag.Scope != "global" && rtag.Scope != "admin" {
|
||||
log.Warn("Cannot delete private tag: Skip")
|
||||
continue
|
||||
}
|
||||
|
||||
err := api.JobRepository.RemoveTagByRequest(rtag.Type, rtag.Name, rtag.Scope)
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
} else {
|
||||
currentCount++
|
||||
}
|
||||
}
|
||||
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
fmt.Fprintf(rw, "Deleted Tags from DB: %d successfull of %d requested\n", currentCount, targetCount)
|
||||
}
|
||||
|
||||
// startJob godoc
|
||||
// @summary Adds a new job as "running"
|
||||
// @tags Job add and modify
|
||||
// @description Job specified in request body will be saved to database as "running" with new DB ID.
|
||||
// @description Job specifications follow the 'JobMeta' scheme, API will fail to execute if requirements are not met.
|
||||
// @accept json
|
||||
// @produce json
|
||||
// @param request body schema.JobMeta true "Job to add"
|
||||
// @success 201 {object} api.DefaultJobApiResponse "Job added successfully"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 403 {object} api.ErrorResponse "Forbidden"
|
||||
// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: The combination of jobId, clusterId and startTime does already exist"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /api/jobs/start_job/ [post]
|
||||
func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
|
||||
req := schema.Job{
|
||||
Exclusive: 1,
|
||||
MonitoringStatus: schema.MonitoringStatusRunningOrArchiving,
|
||||
}
|
||||
if err := decode(r.Body, &req); err != nil {
|
||||
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("REST: %s\n", req.GoString())
|
||||
req.State = schema.JobStateRunning
|
||||
|
||||
if err := importer.SanityChecks(&req); err != nil {
|
||||
handleError(err, http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
// aquire lock to avoid race condition between API calls
|
||||
var unlockOnce sync.Once
|
||||
api.RepositoryMutex.Lock()
|
||||
defer unlockOnce.Do(api.RepositoryMutex.Unlock)
|
||||
|
||||
// Check if combination of (job_id, cluster_id, start_time) already exists:
|
||||
jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
handleError(fmt.Errorf("checking for duplicate failed: %w", err), http.StatusInternalServerError, rw)
|
||||
return
|
||||
} else if err == nil {
|
||||
for _, job := range jobs {
|
||||
if (req.StartTime - job.StartTime) < 86400 {
|
||||
handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID), http.StatusUnprocessableEntity, rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
id, err := api.JobRepository.Start(&req)
|
||||
if err != nil {
|
||||
handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
// unlock here, adding Tags can be async
|
||||
unlockOnce.Do(api.RepositoryMutex.Unlock)
|
||||
|
||||
for _, tag := range req.Tags {
|
||||
if _, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), id, tag.Type, tag.Name, tag.Scope); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
handleError(fmt.Errorf("adding tag to new job %d failed: %w", id, err), http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime)
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
rw.WriteHeader(http.StatusCreated)
|
||||
json.NewEncoder(rw).Encode(DefaultJobApiResponse{
|
||||
Message: "success",
|
||||
})
|
||||
}
|
||||
|
||||
// stopJobByRequest godoc
|
||||
// @summary Marks job as completed and triggers archiving
|
||||
// @tags Job add and modify
|
||||
// @description Job to stop is specified by request body. All fields are required in this case.
|
||||
// @description Returns full job resource information according to 'JobMeta' scheme.
|
||||
// @produce json
|
||||
// @param request body api.StopJobApiRequest true "All fields required"
|
||||
// @success 200 {object} schema.JobMeta "Success message"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 403 {object} api.ErrorResponse "Forbidden"
|
||||
// @failure 404 {object} api.ErrorResponse "Resource not found"
|
||||
// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: job has already been stopped"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /api/jobs/stop_job/ [post]
|
||||
func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
|
||||
// Parse request body
|
||||
req := StopJobApiRequest{}
|
||||
if err := decode(r.Body, &req); err != nil {
|
||||
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
// Fetch job (that will be stopped) from db
|
||||
var job *schema.Job
|
||||
var err error
|
||||
if req.JobId == nil {
|
||||
handleError(errors.New("the field 'jobId' is required"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
// log.Printf("loading db job for stopJobByRequest... : stopJobApiRequest=%v", req)
|
||||
job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
|
||||
if err != nil {
|
||||
job, err = api.JobRepository.FindCached(req.JobId, req.Cluster, req.StartTime)
|
||||
// FIXME: Previous error is hidden
|
||||
if err != nil {
|
||||
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
api.checkAndHandleStopJob(rw, job, req)
|
||||
}
|
||||
|
||||
// deleteJobById godoc
|
||||
// @summary Remove a job from the sql database
|
||||
// @tags Job remove
|
||||
// @description Job to remove is specified by database ID. This will not remove the job from the job archive.
|
||||
// @produce json
|
||||
// @param id path int true "Database ID of Job"
|
||||
// @success 200 {object} api.DefaultJobApiResponse "Success message"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 403 {object} api.ErrorResponse "Forbidden"
|
||||
// @failure 404 {object} api.ErrorResponse "Resource not found"
|
||||
// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /api/jobs/delete_job/{id} [delete]
|
||||
func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) {
|
||||
// Fetch job (that will be stopped) from db
|
||||
id, ok := mux.Vars(r)["id"]
|
||||
var err error
|
||||
if ok {
|
||||
id, e := strconv.ParseInt(id, 10, 64)
|
||||
if e != nil {
|
||||
handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
err = api.JobRepository.DeleteJobById(id)
|
||||
} else {
|
||||
handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
handleError(fmt.Errorf("deleting job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
||||
return
|
||||
}
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(rw).Encode(DefaultJobApiResponse{
|
||||
Message: fmt.Sprintf("Successfully deleted job %s", id),
|
||||
})
|
||||
}
|
||||
|
||||
// deleteJobByRequest godoc
|
||||
// @summary Remove a job from the sql database
|
||||
// @tags Job remove
|
||||
// @description Job to delete is specified by request body. All fields are required in this case.
|
||||
// @accept json
|
||||
// @produce json
|
||||
// @param request body api.DeleteJobApiRequest true "All fields required"
|
||||
// @success 200 {object} api.DefaultJobApiResponse "Success message"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 403 {object} api.ErrorResponse "Forbidden"
|
||||
// @failure 404 {object} api.ErrorResponse "Resource not found"
|
||||
// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /api/jobs/delete_job/ [delete]
|
||||
func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) {
|
||||
// Parse request body
|
||||
req := DeleteJobApiRequest{}
|
||||
if err := decode(r.Body, &req); err != nil {
|
||||
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
// Fetch job (that will be deleted) from db
|
||||
var job *schema.Job
|
||||
var err error
|
||||
if req.JobId == nil {
|
||||
handleError(errors.New("the field 'jobId' is required"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
|
||||
if err != nil {
|
||||
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
||||
return
|
||||
}
|
||||
|
||||
err = api.JobRepository.DeleteJobById(*job.ID)
|
||||
if err != nil {
|
||||
handleError(fmt.Errorf("deleting job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
||||
return
|
||||
}
|
||||
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(rw).Encode(DefaultJobApiResponse{
|
||||
Message: fmt.Sprintf("Successfully deleted job %d", job.ID),
|
||||
})
|
||||
}
|
||||
|
||||
// deleteJobBefore godoc
|
||||
// @summary Remove a job from the sql database
|
||||
// @tags Job remove
|
||||
// @description Remove all jobs with start time before timestamp. The jobs will not be removed from the job archive.
|
||||
// @produce json
|
||||
// @param ts path int true "Unix epoch timestamp"
|
||||
// @success 200 {object} api.DefaultJobApiResponse "Success message"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 403 {object} api.ErrorResponse "Forbidden"
|
||||
// @failure 404 {object} api.ErrorResponse "Resource not found"
|
||||
// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /api/jobs/delete_job_before/{ts} [delete]
|
||||
func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
|
||||
var cnt int
|
||||
// Fetch job (that will be stopped) from db
|
||||
id, ok := mux.Vars(r)["ts"]
|
||||
var err error
|
||||
if ok {
|
||||
ts, e := strconv.ParseInt(id, 10, 64)
|
||||
if e != nil {
|
||||
handleError(fmt.Errorf("integer expected in path for ts: %w", e), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
cnt, err = api.JobRepository.DeleteJobsBefore(ts)
|
||||
} else {
|
||||
handleError(errors.New("the parameter 'ts' is required"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
handleError(fmt.Errorf("deleting jobs failed: %w", err), http.StatusUnprocessableEntity, rw)
|
||||
return
|
||||
}
|
||||
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(rw).Encode(DefaultJobApiResponse{
|
||||
Message: fmt.Sprintf("Successfully deleted %d jobs", cnt),
|
||||
})
|
||||
}
|
||||
|
||||
func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobApiRequest) {
|
||||
// Sanity checks
|
||||
if job.State != schema.JobStateRunning {
|
||||
handleError(fmt.Errorf("jobId %d (id %d) on %s : job has already been stopped (state is: %s)", job.JobID, job.ID, job.Cluster, job.State), http.StatusUnprocessableEntity, rw)
|
||||
return
|
||||
}
|
||||
|
||||
if job == nil || job.StartTime > req.StopTime {
|
||||
handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
if req.State != "" && !req.State.Valid() {
|
||||
handleError(fmt.Errorf("jobId %d (id %d) on %s : invalid requested job state: %#v", job.JobID, job.ID, job.Cluster, req.State), http.StatusBadRequest, rw)
|
||||
return
|
||||
} else if req.State == "" {
|
||||
req.State = schema.JobStateCompleted
|
||||
}
|
||||
|
||||
// Mark job as stopped in the database (update state and duration)
|
||||
job.Duration = int32(req.StopTime - job.StartTime)
|
||||
job.State = req.State
|
||||
api.JobRepository.Mutex.Lock()
|
||||
if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
|
||||
if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
|
||||
api.JobRepository.Mutex.Unlock()
|
||||
handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
api.JobRepository.Mutex.Unlock()
|
||||
|
||||
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State)
|
||||
|
||||
// Send a response (with status OK). This means that erros that happen from here on forward
|
||||
// can *NOT* be communicated to the client. If reading from a MetricDataRepository or
|
||||
// writing to the filesystem fails, the client will not know.
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(rw).Encode(job)
|
||||
|
||||
// Monitoring is disabled...
|
||||
if job.MonitoringStatus == schema.MonitoringStatusDisabled {
|
||||
return
|
||||
}
|
||||
|
||||
// Trigger async archiving
|
||||
archiver.TriggerArchiving(job)
|
||||
}
|
||||
|
||||
func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
|
||||
id := mux.Vars(r)["id"]
|
||||
metrics := r.URL.Query()["metric"]
|
||||
var scopes []schema.MetricScope
|
||||
for _, scope := range r.URL.Query()["scope"] {
|
||||
var s schema.MetricScope
|
||||
if err := s.UnmarshalGQL(scope); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
scopes = append(scopes, s)
|
||||
}
|
||||
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
|
||||
type Respone struct {
|
||||
Data *struct {
|
||||
JobMetrics []*model.JobMetricWithName `json:"jobMetrics"`
|
||||
} `json:"data"`
|
||||
Error *struct {
|
||||
Message string `json:"message"`
|
||||
} `json:"error"`
|
||||
}
|
||||
|
||||
resolver := graph.GetResolverInstance()
|
||||
data, err := resolver.Query().JobMetrics(r.Context(), id, metrics, scopes, nil)
|
||||
if err != nil {
|
||||
json.NewEncoder(rw).Encode(Respone{
|
||||
Error: &struct {
|
||||
Message string "json:\"message\""
|
||||
}{Message: err.Error()},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
json.NewEncoder(rw).Encode(Respone{
|
||||
Data: &struct {
|
||||
JobMetrics []*model.JobMetricWithName "json:\"jobMetrics\""
|
||||
}{JobMetrics: data},
|
||||
})
|
||||
}
|
30
internal/api/node.go
Normal file
30
internal/api/node.go
Normal file
@ -0,0 +1,30 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type Node struct {
|
||||
Name string `json:"hostname"`
|
||||
States []string `json:"states"`
|
||||
}
|
||||
|
||||
// updateNodeStatesRequest model
|
||||
type UpdateNodeStatesRequest struct {
|
||||
Nodes []Node `json:"nodes"`
|
||||
Cluster string `json:"cluster" example:"fritz"`
|
||||
}
|
||||
|
||||
func (api *RestApi) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
|
||||
// Parse request body
|
||||
req := UpdateNodeStatesRequest{}
|
||||
if err := decode(r.Body, &req); err != nil {
|
||||
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
}
|
1172
internal/api/rest.go
1172
internal/api/rest.go
File diff suppressed because it is too large
Load Diff
159
internal/api/user.go
Normal file
159
internal/api/user.go
Normal file
@ -0,0 +1,159 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
type ApiReturnedUser struct {
|
||||
Username string `json:"username"`
|
||||
Name string `json:"name"`
|
||||
Roles []string `json:"roles"`
|
||||
Email string `json:"email"`
|
||||
Projects []string `json:"projects"`
|
||||
}
|
||||
|
||||
// getUsers godoc
|
||||
// @summary Returns a list of users
|
||||
// @tags User
|
||||
// @description Returns a JSON-encoded list of users.
|
||||
// @description Required query-parameter defines if all users or only users with additional special roles are returned.
|
||||
// @produce json
|
||||
// @param not-just-user query bool true "If returned list should contain all users or only users with additional special roles"
|
||||
// @success 200 {array} api.ApiReturnedUser "List of users returned successfully"
|
||||
// @failure 400 {string} string "Bad Request"
|
||||
// @failure 401 {string} string "Unauthorized"
|
||||
// @failure 403 {string} string "Forbidden"
|
||||
// @failure 500 {string} string "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /api/users/ [get]
|
||||
func (api *RestApi) getUsers(rw http.ResponseWriter, r *http.Request) {
|
||||
// SecuredCheck() only worked with TokenAuth: Removed
|
||||
|
||||
if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
|
||||
http.Error(rw, "Only admins are allowed to fetch a list of users", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
users, err := repository.GetUserRepository().ListUsers(r.URL.Query().Get("not-just-user") == "true")
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
json.NewEncoder(rw).Encode(users)
|
||||
}
|
||||
|
||||
func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) {
|
||||
// SecuredCheck() only worked with TokenAuth: Removed
|
||||
|
||||
if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
|
||||
http.Error(rw, "Only admins are allowed to update a user", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
// Get Values
|
||||
newrole := r.FormValue("add-role")
|
||||
delrole := r.FormValue("remove-role")
|
||||
newproj := r.FormValue("add-project")
|
||||
delproj := r.FormValue("remove-project")
|
||||
|
||||
// TODO: Handle anything but roles...
|
||||
if newrole != "" {
|
||||
if err := repository.GetUserRepository().AddRole(r.Context(), mux.Vars(r)["id"], newrole); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
|
||||
return
|
||||
}
|
||||
rw.Write([]byte("Add Role Success"))
|
||||
} else if delrole != "" {
|
||||
if err := repository.GetUserRepository().RemoveRole(r.Context(), mux.Vars(r)["id"], delrole); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
|
||||
return
|
||||
}
|
||||
rw.Write([]byte("Remove Role Success"))
|
||||
} else if newproj != "" {
|
||||
if err := repository.GetUserRepository().AddProject(r.Context(), mux.Vars(r)["id"], newproj); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
|
||||
return
|
||||
}
|
||||
rw.Write([]byte("Add Project Success"))
|
||||
} else if delproj != "" {
|
||||
if err := repository.GetUserRepository().RemoveProject(r.Context(), mux.Vars(r)["id"], delproj); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
|
||||
return
|
||||
}
|
||||
rw.Write([]byte("Remove Project Success"))
|
||||
} else {
|
||||
http.Error(rw, "Not Add or Del [role|project]?", http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
|
||||
func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) {
|
||||
// SecuredCheck() only worked with TokenAuth: Removed
|
||||
|
||||
rw.Header().Set("Content-Type", "text/plain")
|
||||
me := repository.GetUserFromContext(r.Context())
|
||||
if !me.HasRole(schema.RoleAdmin) {
|
||||
http.Error(rw, "Only admins are allowed to create new users", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
username, password, role, name, email, project := r.FormValue("username"),
|
||||
r.FormValue("password"), r.FormValue("role"), r.FormValue("name"),
|
||||
r.FormValue("email"), r.FormValue("project")
|
||||
|
||||
if len(password) == 0 && role != schema.GetRoleString(schema.RoleApi) {
|
||||
http.Error(rw, "Only API users are allowed to have a blank password (login will be impossible)", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if len(project) != 0 && role != schema.GetRoleString(schema.RoleManager) {
|
||||
http.Error(rw, "only managers require a project (can be changed later)",
|
||||
http.StatusBadRequest)
|
||||
return
|
||||
} else if len(project) == 0 && role == schema.GetRoleString(schema.RoleManager) {
|
||||
http.Error(rw, "managers require a project to manage (can be changed later)",
|
||||
http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if err := repository.GetUserRepository().AddUser(&schema.User{
|
||||
Username: username,
|
||||
Name: name,
|
||||
Password: password,
|
||||
Email: email,
|
||||
Projects: []string{project},
|
||||
Roles: []string{role},
|
||||
}); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Fprintf(rw, "User %v successfully created!\n", username)
|
||||
}
|
||||
|
||||
func (api *RestApi) deleteUser(rw http.ResponseWriter, r *http.Request) {
|
||||
// SecuredCheck() only worked with TokenAuth: Removed
|
||||
|
||||
if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
|
||||
http.Error(rw, "Only admins are allowed to delete a user", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
username := r.FormValue("username")
|
||||
if err := repository.GetUserRepository().DelUser(username); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
|
||||
return
|
||||
}
|
||||
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user