mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-07-23 04:51:39 +02:00
Merge branch 'hotfix' of https://github.com/ClusterCockpit/cc-backend into hotfix
This commit is contained in:
@@ -1,5 +1,4 @@
|
||||
// Code generated by swaggo/swag. DO NOT EDIT.
|
||||
|
||||
// Package api Code generated by swaggo/swag. DO NOT EDIT
|
||||
package api
|
||||
|
||||
import "github.com/swaggo/swag"
|
||||
@@ -24,6 +23,63 @@ const docTemplate = `{
|
||||
"host": "{{.Host}}",
|
||||
"basePath": "{{.BasePath}}",
|
||||
"paths": {
|
||||
"/clusters/": {
|
||||
"get": {
|
||||
"security": [
|
||||
{
|
||||
"ApiKeyAuth": []
|
||||
}
|
||||
],
|
||||
"description": "Get a list of all cluster configs. Specific cluster can be requested using query parameter.",
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"Cluster query"
|
||||
],
|
||||
"summary": "Lists all cluster configs",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Job Cluster",
|
||||
"name": "cluster",
|
||||
"in": "query"
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Array of clusters",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.GetClustersApiResponse"
|
||||
}
|
||||
},
|
||||
"400": {
|
||||
"description": "Bad Request",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
},
|
||||
"401": {
|
||||
"description": "Unauthorized",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
},
|
||||
"403": {
|
||||
"description": "Forbidden",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
},
|
||||
"500": {
|
||||
"description": "Internal Server Error",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/jobs/": {
|
||||
"get": {
|
||||
"security": [
|
||||
@@ -334,6 +390,76 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"/jobs/edit_meta/{id}": {
|
||||
"post": {
|
||||
"security": [
|
||||
{
|
||||
"ApiKeyAuth": []
|
||||
}
|
||||
],
|
||||
"description": "Edit key value pairs in job metadata json\nIf a key already exists its content will be overwritten",
|
||||
"consumes": [
|
||||
"application/json"
|
||||
],
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"Job add and modify"
|
||||
],
|
||||
"summary": "Edit meta-data json",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "integer",
|
||||
"description": "Job Database ID",
|
||||
"name": "id",
|
||||
"in": "path",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"description": "Kay value pair to add",
|
||||
"name": "request",
|
||||
"in": "body",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.EditMetaRequest"
|
||||
}
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Updated job resource",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/schema.Job"
|
||||
}
|
||||
},
|
||||
"400": {
|
||||
"description": "Bad Request",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
},
|
||||
"401": {
|
||||
"description": "Unauthorized",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
},
|
||||
"404": {
|
||||
"description": "Job does not exist",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
},
|
||||
"500": {
|
||||
"description": "Internal Server Error",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/jobs/start_job/": {
|
||||
"post": {
|
||||
"security": [
|
||||
@@ -631,6 +757,80 @@ const docTemplate = `{
|
||||
}
|
||||
},
|
||||
"/jobs/{id}": {
|
||||
"get": {
|
||||
"security": [
|
||||
{
|
||||
"ApiKeyAuth": []
|
||||
}
|
||||
],
|
||||
"description": "Job to get is specified by database ID\nReturns full job resource information according to 'JobMeta' scheme and all metrics according to 'JobData'.",
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"Job query"
|
||||
],
|
||||
"summary": "Get job meta and optional all metric data",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "integer",
|
||||
"description": "Database ID of Job",
|
||||
"name": "id",
|
||||
"in": "path",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"type": "boolean",
|
||||
"description": "Include all available metrics",
|
||||
"name": "all-metrics",
|
||||
"in": "query"
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Job resource",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.GetJobApiResponse"
|
||||
}
|
||||
},
|
||||
"400": {
|
||||
"description": "Bad Request",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
},
|
||||
"401": {
|
||||
"description": "Unauthorized",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
},
|
||||
"403": {
|
||||
"description": "Forbidden",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
},
|
||||
"404": {
|
||||
"description": "Resource not found",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
},
|
||||
"422": {
|
||||
"description": "Unprocessable Entity: finding job failed: sql: no rows in result set",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
},
|
||||
"500": {
|
||||
"description": "Internal Server Error",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ErrorResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"post": {
|
||||
"security": [
|
||||
{
|
||||
@@ -647,7 +847,7 @@ const docTemplate = `{
|
||||
"tags": [
|
||||
"Job query"
|
||||
],
|
||||
"summary": "Get complete job meta and metric data",
|
||||
"summary": "Get job meta and configurable metric data",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "integer",
|
||||
@@ -1121,6 +1321,19 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"api.EditMetaRequest": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"key": {
|
||||
"type": "string",
|
||||
"example": "jobScript"
|
||||
},
|
||||
"value": {
|
||||
"type": "string",
|
||||
"example": "bash script"
|
||||
}
|
||||
}
|
||||
},
|
||||
"api.ErrorResponse": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@@ -1134,6 +1347,18 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"api.GetClustersApiResponse": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"clusters": {
|
||||
"description": "Array of clusters",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/schema.Cluster"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"api.GetJobApiResponse": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@@ -1229,6 +1454,40 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"schema.Accelerator": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"model": {
|
||||
"type": "string"
|
||||
},
|
||||
"type": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"schema.Cluster": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"metricConfig": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/schema.MetricConfig"
|
||||
}
|
||||
},
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"subClusters": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/schema.SubCluster"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"schema.Job": {
|
||||
"description": "Information of a HPC job.",
|
||||
"type": "object",
|
||||
@@ -1259,6 +1518,10 @@ const docTemplate = `{
|
||||
"minimum": 0,
|
||||
"example": 1
|
||||
},
|
||||
"flopsAnyAvg": {
|
||||
"description": "FlopsAnyAvg as Float64",
|
||||
"type": "number"
|
||||
},
|
||||
"id": {
|
||||
"description": "The unique identifier of a job in the database",
|
||||
"type": "integer"
|
||||
@@ -1285,6 +1548,18 @@ const docTemplate = `{
|
||||
],
|
||||
"example": "completed"
|
||||
},
|
||||
"loadAvg": {
|
||||
"description": "LoadAvg as Float64",
|
||||
"type": "number"
|
||||
},
|
||||
"memBwAvg": {
|
||||
"description": "MemBwAvg as Float64",
|
||||
"type": "number"
|
||||
},
|
||||
"memUsedMax": {
|
||||
"description": "MemUsedMax as Float64",
|
||||
"type": "number"
|
||||
},
|
||||
"metaData": {
|
||||
"description": "Additional information about the job",
|
||||
"type": "object",
|
||||
@@ -1611,6 +1886,44 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"schema.MetricConfig": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"aggregation": {
|
||||
"type": "string"
|
||||
},
|
||||
"alert": {
|
||||
"type": "number"
|
||||
},
|
||||
"caution": {
|
||||
"type": "number"
|
||||
},
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"normal": {
|
||||
"type": "number"
|
||||
},
|
||||
"peak": {
|
||||
"type": "number"
|
||||
},
|
||||
"scope": {
|
||||
"$ref": "#/definitions/schema.MetricScope"
|
||||
},
|
||||
"subClusters": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/schema.SubClusterConfig"
|
||||
}
|
||||
},
|
||||
"timestep": {
|
||||
"type": "integer"
|
||||
},
|
||||
"unit": {
|
||||
"$ref": "#/definitions/schema.Unit"
|
||||
}
|
||||
}
|
||||
},
|
||||
"schema.MetricScope": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
@@ -1646,6 +1959,17 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"schema.MetricValue": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"unit": {
|
||||
"$ref": "#/definitions/schema.Unit"
|
||||
},
|
||||
"value": {
|
||||
"type": "number"
|
||||
}
|
||||
}
|
||||
},
|
||||
"schema.Resource": {
|
||||
"description": "A resource used by a job",
|
||||
"type": "object",
|
||||
@@ -1726,6 +2050,64 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"schema.SubCluster": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"coresPerSocket": {
|
||||
"type": "integer"
|
||||
},
|
||||
"flopRateScalar": {
|
||||
"$ref": "#/definitions/schema.MetricValue"
|
||||
},
|
||||
"flopRateSimd": {
|
||||
"$ref": "#/definitions/schema.MetricValue"
|
||||
},
|
||||
"memoryBandwidth": {
|
||||
"$ref": "#/definitions/schema.MetricValue"
|
||||
},
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"nodes": {
|
||||
"type": "string"
|
||||
},
|
||||
"processorType": {
|
||||
"type": "string"
|
||||
},
|
||||
"socketsPerNode": {
|
||||
"type": "integer"
|
||||
},
|
||||
"threadsPerCore": {
|
||||
"type": "integer"
|
||||
},
|
||||
"topology": {
|
||||
"$ref": "#/definitions/schema.Topology"
|
||||
}
|
||||
}
|
||||
},
|
||||
"schema.SubClusterConfig": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"alert": {
|
||||
"type": "number"
|
||||
},
|
||||
"caution": {
|
||||
"type": "number"
|
||||
},
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"normal": {
|
||||
"type": "number"
|
||||
},
|
||||
"peak": {
|
||||
"type": "number"
|
||||
},
|
||||
"remove": {
|
||||
"type": "boolean"
|
||||
}
|
||||
}
|
||||
},
|
||||
"schema.Tag": {
|
||||
"description": "Defines a tag using name and type.",
|
||||
"type": "object",
|
||||
@@ -1746,6 +2128,59 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"schema.Topology": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"accelerators": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/schema.Accelerator"
|
||||
}
|
||||
},
|
||||
"core": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
},
|
||||
"die": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
},
|
||||
"memoryDomain": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
},
|
||||
"node": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "integer"
|
||||
}
|
||||
},
|
||||
"socket": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"schema.Unit": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
@@ -70,12 +70,16 @@ func (api *RestApi) MountRoutes(r *mux.Router) {
|
||||
|
||||
r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet)
|
||||
r.HandleFunc("/jobs/{id}", api.getJobById).Methods(http.MethodPost)
|
||||
r.HandleFunc("/jobs/{id}", api.getCompleteJobById).Methods(http.MethodGet)
|
||||
r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch)
|
||||
r.HandleFunc("/jobs/edit_meta/{id}", api.editMeta).Methods(http.MethodPost, http.MethodPatch)
|
||||
r.HandleFunc("/jobs/metrics/{id}", api.getJobMetrics).Methods(http.MethodGet)
|
||||
r.HandleFunc("/jobs/delete_job/", api.deleteJobByRequest).Methods(http.MethodDelete)
|
||||
r.HandleFunc("/jobs/delete_job/{id}", api.deleteJobById).Methods(http.MethodDelete)
|
||||
r.HandleFunc("/jobs/delete_job_before/{ts}", api.deleteJobBefore).Methods(http.MethodDelete)
|
||||
|
||||
r.HandleFunc("/clusters/", api.getClusters).Methods(http.MethodGet)
|
||||
|
||||
if api.MachineStateDir != "" {
|
||||
r.HandleFunc("/machine_state/{cluster}/{host}", api.getMachineState).Methods(http.MethodGet)
|
||||
r.HandleFunc("/machine_state/{cluster}/{host}", api.putMachineState).Methods(http.MethodPut, http.MethodPost)
|
||||
@@ -110,12 +114,11 @@ type UpdateUserApiResponse struct {
|
||||
|
||||
// StopJobApiRequest model
|
||||
type StopJobApiRequest struct {
|
||||
// Stop Time of job as epoch
|
||||
JobId *int64 `json:"jobId" example:"123000"`
|
||||
Cluster *string `json:"cluster" example:"fritz"`
|
||||
StartTime *int64 `json:"startTime" example:"1649723812"`
|
||||
State schema.JobState `json:"jobState" validate:"required" example:"completed"`
|
||||
StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"`
|
||||
State schema.JobState `json:"jobState" validate:"required" example:"completed"` // Final job state
|
||||
JobId *int64 `json:"jobId" example:"123000"` // Cluster Job ID of job
|
||||
Cluster *string `json:"cluster" example:"fritz"` // Cluster of job
|
||||
StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch
|
||||
}
|
||||
|
||||
// DeleteJobApiRequest model
|
||||
@@ -132,6 +135,11 @@ type GetJobsApiResponse struct {
|
||||
Page int `json:"page"` // Page id returned
|
||||
}
|
||||
|
||||
// GetClustersApiResponse model
|
||||
type GetClustersApiResponse struct {
|
||||
Clusters []*schema.Cluster `json:"clusters"` // Array of clusters
|
||||
}
|
||||
|
||||
// ErrorResponse model
|
||||
type ErrorResponse struct {
|
||||
// Statustext of Errorcode
|
||||
@@ -146,6 +154,12 @@ type ApiTag struct {
|
||||
Name string `json:"name" example:"Testjob"` // Tag Name
|
||||
}
|
||||
|
||||
// ApiMeta model
|
||||
type EditMetaRequest struct {
|
||||
Key string `json:"key" example:"jobScript"`
|
||||
Value string `json:"value" example:"bash script"`
|
||||
}
|
||||
|
||||
type TagJobApiRequest []*ApiTag
|
||||
|
||||
type GetJobApiRequest []string
|
||||
@@ -155,10 +169,15 @@ type GetJobApiResponse struct {
|
||||
Data []*JobMetricWithName
|
||||
}
|
||||
|
||||
type GetCompleteJobApiResponse struct {
|
||||
Meta *schema.Job
|
||||
Data schema.JobData
|
||||
}
|
||||
|
||||
type JobMetricWithName struct {
|
||||
Metric *schema.JobMetric `json:"metric"`
|
||||
Name string `json:"name"`
|
||||
Scope schema.MetricScope `json:"scope"`
|
||||
Metric *schema.JobMetric `json:"metric"`
|
||||
}
|
||||
|
||||
type ApiReturnedUser struct {
|
||||
@@ -223,6 +242,55 @@ func securedCheck(r *http.Request) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// getClusters godoc
|
||||
// @summary Lists all cluster configs
|
||||
// @tags Cluster query
|
||||
// @description Get a list of all cluster configs. Specific cluster can be requested using query parameter.
|
||||
// @produce json
|
||||
// @param cluster query string false "Job Cluster"
|
||||
// @success 200 {object} api.GetClustersApiResponse "Array of clusters"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 403 {object} api.ErrorResponse "Forbidden"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /clusters/ [get]
|
||||
func (api *RestApi) getClusters(rw http.ResponseWriter, r *http.Request) {
|
||||
if user := repository.GetUserFromContext(r.Context()); user != nil &&
|
||||
!user.HasRole(schema.RoleApi) {
|
||||
|
||||
handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw)
|
||||
return
|
||||
}
|
||||
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
bw := bufio.NewWriter(rw)
|
||||
defer bw.Flush()
|
||||
|
||||
var clusters []*schema.Cluster
|
||||
|
||||
if r.URL.Query().Has("cluster") {
|
||||
name := r.URL.Query().Get("cluster")
|
||||
cluster := archive.GetCluster(name)
|
||||
if cluster == nil {
|
||||
handleError(fmt.Errorf("unknown cluster: %s", name), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
clusters = append(clusters, cluster)
|
||||
} else {
|
||||
clusters = archive.Clusters
|
||||
}
|
||||
|
||||
payload := GetClustersApiResponse{
|
||||
Clusters: clusters,
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(bw).Encode(payload); err != nil {
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// getJobs godoc
|
||||
// @summary Lists all jobs
|
||||
// @tags Job query
|
||||
@@ -243,7 +311,6 @@ func securedCheck(r *http.Request) error {
|
||||
// @security ApiKeyAuth
|
||||
// @router /jobs/ [get]
|
||||
func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
|
||||
|
||||
if user := repository.GetUserFromContext(r.Context()); user != nil &&
|
||||
!user.HasRole(schema.RoleApi) {
|
||||
|
||||
@@ -342,10 +409,8 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
|
||||
if res.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful {
|
||||
res.Statistics, err = archive.GetStatistics(job)
|
||||
if err != nil {
|
||||
if err != nil {
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -370,14 +435,107 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// getJobById godoc
|
||||
// @summary Get complete job meta and metric data
|
||||
// @summary Get job meta and optional all metric data
|
||||
// @tags Job query
|
||||
// @description Job to get is specified by database ID
|
||||
// @description Returns full job resource information according to 'JobMeta' scheme and all metrics according to 'JobData'.
|
||||
// @produce json
|
||||
// @param id path int true "Database ID of Job"
|
||||
// @param all-metrics query bool false "Include all available metrics"
|
||||
// @success 200 {object} api.GetJobApiResponse "Job resource"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 403 {object} api.ErrorResponse "Forbidden"
|
||||
// @failure 404 {object} api.ErrorResponse "Resource not found"
|
||||
// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /jobs/{id} [get]
|
||||
func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) {
|
||||
if user := repository.GetUserFromContext(r.Context()); user != nil &&
|
||||
!user.HasRole(schema.RoleApi) {
|
||||
|
||||
handleError(fmt.Errorf("missing role: %v",
|
||||
schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw)
|
||||
return
|
||||
}
|
||||
|
||||
// Fetch job from db
|
||||
id, ok := mux.Vars(r)["id"]
|
||||
var job *schema.Job
|
||||
var err error
|
||||
if ok {
|
||||
id, e := strconv.ParseInt(id, 10, 64)
|
||||
if e != nil {
|
||||
handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
job, err = api.JobRepository.FindById(id)
|
||||
} else {
|
||||
handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
||||
return
|
||||
}
|
||||
|
||||
job.Tags, err = api.JobRepository.GetTags(&job.ID)
|
||||
if err != nil {
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
|
||||
}
|
||||
if _, err = api.JobRepository.FetchMetadata(job); err != nil {
|
||||
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
|
||||
var scopes []schema.MetricScope
|
||||
|
||||
if job.NumNodes == 1 {
|
||||
scopes = []schema.MetricScope{"core"}
|
||||
} else {
|
||||
scopes = []schema.MetricScope{"node"}
|
||||
}
|
||||
|
||||
var data schema.JobData
|
||||
|
||||
if r.URL.Query().Get("all-metrics") == "true" {
|
||||
data, err = metricdata.LoadData(job, nil, scopes, r.Context())
|
||||
if err != nil {
|
||||
log.Warn("Error while loading job data")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("/api/job/%s: get job %d", id, job.JobID)
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
bw := bufio.NewWriter(rw)
|
||||
defer bw.Flush()
|
||||
|
||||
payload := GetCompleteJobApiResponse{
|
||||
Meta: job,
|
||||
Data: data,
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(bw).Encode(payload); err != nil {
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// getJobById godoc
|
||||
// @summary Get job meta and configurable metric data
|
||||
// @tags Job query
|
||||
// @description Job to get is specified by database ID
|
||||
// @description Returns full job resource information according to 'JobMeta' scheme and all metrics according to 'JobData'.
|
||||
// @accept json
|
||||
// @produce json
|
||||
// @param id path int true "Database ID of Job"
|
||||
// @param request body api.GetJobApiRequest true "Array of metric names"
|
||||
// @param id path int true "Database ID of Job"
|
||||
// @param request body api.GetJobApiRequest true "Array of metric names"
|
||||
// @success 200 {object} api.GetJobApiResponse "Job resource"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
@@ -417,6 +575,18 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
job.Tags, err = api.JobRepository.GetTags(&job.ID)
|
||||
if err != nil {
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
|
||||
}
|
||||
if _, err = api.JobRepository.FetchMetadata(job); err != nil {
|
||||
|
||||
handleError(err, http.StatusInternalServerError, rw)
|
||||
return
|
||||
}
|
||||
|
||||
var metrics GetJobApiRequest
|
||||
if err = decode(r.Body, &metrics); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||
@@ -464,6 +634,57 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
// editMeta godoc
|
||||
// @summary Edit meta-data json
|
||||
// @tags Job add and modify
|
||||
// @description Edit key value pairs in job metadata json
|
||||
// @description If a key already exists its content will be overwritten
|
||||
// @accept json
|
||||
// @produce json
|
||||
// @param id path int true "Job Database ID"
|
||||
// @param request body api.EditMetaRequest true "Kay value pair to add"
|
||||
// @success 200 {object} schema.Job "Updated job resource"
|
||||
// @failure 400 {object} api.ErrorResponse "Bad Request"
|
||||
// @failure 401 {object} api.ErrorResponse "Unauthorized"
|
||||
// @failure 404 {object} api.ErrorResponse "Job does not exist"
|
||||
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
|
||||
// @security ApiKeyAuth
|
||||
// @router /jobs/edit_meta/{id} [post]
|
||||
func (api *RestApi) editMeta(rw http.ResponseWriter, r *http.Request) {
|
||||
if user := repository.GetUserFromContext(r.Context()); user != nil &&
|
||||
!user.HasRole(schema.RoleApi) {
|
||||
handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw)
|
||||
return
|
||||
}
|
||||
|
||||
iid, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64)
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
job, err := api.JobRepository.FindById(iid)
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
var req EditMetaRequest
|
||||
if err := decode(r.Body, &req); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if err := api.JobRepository.UpdateMetadata(job, req.Key, req.Value); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(rw).Encode(job)
|
||||
}
|
||||
|
||||
// tagJob godoc
|
||||
// @summary Adds one or more tags to a job
|
||||
// @tags Job add and modify
|
||||
@@ -873,7 +1094,6 @@ func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobApiRequest) {
|
||||
|
||||
// Sanity checks
|
||||
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
|
||||
handleError(errors.New("stopTime must be larger than startTime and only running jobs can be stopped"), http.StatusBadRequest, rw)
|
||||
@@ -1015,12 +1235,13 @@ func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) {
|
||||
Password: password,
|
||||
Email: email,
|
||||
Projects: []string{project},
|
||||
Roles: []string{role}}); err != nil {
|
||||
Roles: []string{role},
|
||||
}); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
|
||||
return
|
||||
}
|
||||
|
||||
rw.Write([]byte(fmt.Sprintf("User %v successfully created!\n", username)))
|
||||
fmt.Fprintf(rw, "User %v successfully created!\n", username)
|
||||
}
|
||||
|
||||
// deleteUser godoc
|
||||
|
@@ -17,7 +17,7 @@ import (
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/golang-jwt/jwt/v4"
|
||||
"github.com/golang-jwt/jwt/v5"
|
||||
)
|
||||
|
||||
type JWTAuthenticator struct {
|
||||
@@ -49,8 +49,8 @@ func (ja *JWTAuthenticator) Init() error {
|
||||
|
||||
func (ja *JWTAuthenticator) AuthViaJWT(
|
||||
rw http.ResponseWriter,
|
||||
r *http.Request) (*schema.User, error) {
|
||||
|
||||
r *http.Request,
|
||||
) (*schema.User, error) {
|
||||
rawtoken := r.Header.Get("X-Auth-Token")
|
||||
if rawtoken == "" {
|
||||
rawtoken = r.Header.Get("Authorization")
|
||||
@@ -73,9 +73,9 @@ func (ja *JWTAuthenticator) AuthViaJWT(
|
||||
log.Warn("Error while parsing JWT token")
|
||||
return nil, err
|
||||
}
|
||||
if err := token.Claims.Valid(); err != nil {
|
||||
if !token.Valid {
|
||||
log.Warn("jwt token claims are not valid")
|
||||
return nil, err
|
||||
return nil, errors.New("jwt token claims are not valid")
|
||||
}
|
||||
|
||||
// Token is valid, extract payload
|
||||
@@ -88,7 +88,6 @@ func (ja *JWTAuthenticator) AuthViaJWT(
|
||||
if config.Keys.JwtConfig.ValidateUser {
|
||||
ur := repository.GetUserRepository()
|
||||
user, err := ur.GetUser(sub)
|
||||
|
||||
// Deny any logins for unknown usernames
|
||||
if err != nil {
|
||||
log.Warn("Could not find user from JWT in internal database.")
|
||||
@@ -117,7 +116,6 @@ func (ja *JWTAuthenticator) AuthViaJWT(
|
||||
|
||||
// Generate a new JWT that can be used for authentication
|
||||
func (ja *JWTAuthenticator) ProvideJWT(user *schema.User) (string, error) {
|
||||
|
||||
if ja.privateKey == nil {
|
||||
return "", errors.New("environment variable 'JWT_PRIVATE_KEY' not set")
|
||||
}
|
||||
|
@@ -17,7 +17,7 @@ import (
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/golang-jwt/jwt/v4"
|
||||
"github.com/golang-jwt/jwt/v5"
|
||||
)
|
||||
|
||||
type JWTCookieSessionAuthenticator struct {
|
||||
@@ -90,8 +90,8 @@ func (ja *JWTCookieSessionAuthenticator) CanLogin(
|
||||
user *schema.User,
|
||||
username string,
|
||||
rw http.ResponseWriter,
|
||||
r *http.Request) (*schema.User, bool) {
|
||||
|
||||
r *http.Request,
|
||||
) (*schema.User, bool) {
|
||||
jc := config.Keys.JwtConfig
|
||||
cookieName := ""
|
||||
if jc.CookieName != "" {
|
||||
@@ -113,8 +113,8 @@ func (ja *JWTCookieSessionAuthenticator) CanLogin(
|
||||
func (ja *JWTCookieSessionAuthenticator) Login(
|
||||
user *schema.User,
|
||||
rw http.ResponseWriter,
|
||||
r *http.Request) (*schema.User, error) {
|
||||
|
||||
r *http.Request,
|
||||
) (*schema.User, error) {
|
||||
jc := config.Keys.JwtConfig
|
||||
jwtCookie, err := r.Cookie(jc.CookieName)
|
||||
var rawtoken string
|
||||
@@ -144,10 +144,9 @@ func (ja *JWTCookieSessionAuthenticator) Login(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Check token validity and extract paypload
|
||||
if err := token.Claims.Valid(); err != nil {
|
||||
if !token.Valid {
|
||||
log.Warn("jwt token claims are not valid")
|
||||
return nil, err
|
||||
return nil, errors.New("jwt token claims are not valid")
|
||||
}
|
||||
|
||||
claims := token.Claims.(jwt.MapClaims)
|
||||
|
@@ -17,7 +17,7 @@ import (
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/golang-jwt/jwt/v4"
|
||||
"github.com/golang-jwt/jwt/v5"
|
||||
)
|
||||
|
||||
type JWTSessionAuthenticator struct {
|
||||
@@ -44,8 +44,8 @@ func (ja *JWTSessionAuthenticator) CanLogin(
|
||||
user *schema.User,
|
||||
username string,
|
||||
rw http.ResponseWriter,
|
||||
r *http.Request) (*schema.User, bool) {
|
||||
|
||||
r *http.Request,
|
||||
) (*schema.User, bool) {
|
||||
return user, r.Header.Get("Authorization") != "" ||
|
||||
r.URL.Query().Get("login-token") != ""
|
||||
}
|
||||
@@ -53,8 +53,8 @@ func (ja *JWTSessionAuthenticator) CanLogin(
|
||||
func (ja *JWTSessionAuthenticator) Login(
|
||||
user *schema.User,
|
||||
rw http.ResponseWriter,
|
||||
r *http.Request) (*schema.User, error) {
|
||||
|
||||
r *http.Request,
|
||||
) (*schema.User, error) {
|
||||
rawtoken := strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ")
|
||||
if rawtoken == "" {
|
||||
rawtoken = r.URL.Query().Get("login-token")
|
||||
@@ -71,9 +71,9 @@ func (ja *JWTSessionAuthenticator) Login(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = token.Claims.Valid(); err != nil {
|
||||
if !token.Valid {
|
||||
log.Warn("jwt token claims are not valid")
|
||||
return nil, err
|
||||
return nil, errors.New("jwt token claims are not valid")
|
||||
}
|
||||
|
||||
claims := token.Claims.(jwt.MapClaims)
|
||||
|
@@ -32,32 +32,32 @@ type CCMetricStoreConfig struct {
|
||||
}
|
||||
|
||||
type CCMetricStore struct {
|
||||
here2there map[string]string
|
||||
there2here map[string]string
|
||||
client http.Client
|
||||
jwt string
|
||||
url string
|
||||
queryEndpoint string
|
||||
client http.Client
|
||||
here2there map[string]string
|
||||
there2here map[string]string
|
||||
}
|
||||
|
||||
type ApiQueryRequest struct {
|
||||
Cluster string `json:"cluster"`
|
||||
Queries []ApiQuery `json:"queries"`
|
||||
ForAllNodes []string `json:"for-all-nodes"`
|
||||
From int64 `json:"from"`
|
||||
To int64 `json:"to"`
|
||||
WithStats bool `json:"with-stats"`
|
||||
WithData bool `json:"with-data"`
|
||||
Queries []ApiQuery `json:"queries"`
|
||||
ForAllNodes []string `json:"for-all-nodes"`
|
||||
}
|
||||
|
||||
type ApiQuery struct {
|
||||
Type *string `json:"type,omitempty"`
|
||||
SubType *string `json:"subtype,omitempty"`
|
||||
Metric string `json:"metric"`
|
||||
Hostname string `json:"host"`
|
||||
Aggregate bool `json:"aggreg"`
|
||||
Type *string `json:"type,omitempty"`
|
||||
TypeIds []string `json:"type-ids,omitempty"`
|
||||
SubType *string `json:"subtype,omitempty"`
|
||||
SubTypeIds []string `json:"subtype-ids,omitempty"`
|
||||
Aggregate bool `json:"aggreg"`
|
||||
}
|
||||
|
||||
type ApiQueryResponse struct {
|
||||
@@ -67,16 +67,15 @@ type ApiQueryResponse struct {
|
||||
|
||||
type ApiMetricData struct {
|
||||
Error *string `json:"error"`
|
||||
Data []schema.Float `json:"data"`
|
||||
From int64 `json:"from"`
|
||||
To int64 `json:"to"`
|
||||
Data []schema.Float `json:"data"`
|
||||
Avg schema.Float `json:"avg"`
|
||||
Min schema.Float `json:"min"`
|
||||
Max schema.Float `json:"max"`
|
||||
}
|
||||
|
||||
func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error {
|
||||
|
||||
var config CCMetricStoreConfig
|
||||
if err := json.Unmarshal(rawConfig, &config); err != nil {
|
||||
log.Warn("Error while unmarshaling raw json config")
|
||||
@@ -122,8 +121,8 @@ func (ccms *CCMetricStore) toLocalName(metric string) string {
|
||||
|
||||
func (ccms *CCMetricStore) doRequest(
|
||||
ctx context.Context,
|
||||
body *ApiQueryRequest) (*ApiQueryResponse, error) {
|
||||
|
||||
body *ApiQueryRequest,
|
||||
) (*ApiQueryResponse, error) {
|
||||
buf := &bytes.Buffer{}
|
||||
if err := json.NewEncoder(buf).Encode(body); err != nil {
|
||||
log.Warn("Error while encoding request body")
|
||||
@@ -162,8 +161,8 @@ func (ccms *CCMetricStore) LoadData(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
scopes []schema.MetricScope,
|
||||
ctx context.Context) (schema.JobData, error) {
|
||||
|
||||
ctx context.Context,
|
||||
) (schema.JobData, error) {
|
||||
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes)
|
||||
if err != nil {
|
||||
log.Warn("Error while building queries")
|
||||
@@ -186,7 +185,7 @@ func (ccms *CCMetricStore) LoadData(
|
||||
}
|
||||
|
||||
var errors []string
|
||||
var jobData schema.JobData = make(schema.JobData)
|
||||
jobData := make(schema.JobData)
|
||||
for i, row := range resBody.Results {
|
||||
query := req.Queries[i]
|
||||
metric := ccms.toLocalName(query.Metric)
|
||||
@@ -267,8 +266,8 @@ var (
|
||||
func (ccms *CCMetricStore) buildQueries(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
scopes []schema.MetricScope) ([]ApiQuery, []schema.MetricScope, error) {
|
||||
|
||||
scopes []schema.MetricScope,
|
||||
) ([]ApiQuery, []schema.MetricScope, error) {
|
||||
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
|
||||
assignedScope := []schema.MetricScope{}
|
||||
|
||||
@@ -509,8 +508,8 @@ func (ccms *CCMetricStore) buildQueries(
|
||||
func (ccms *CCMetricStore) LoadStats(
|
||||
job *schema.Job,
|
||||
metrics []string,
|
||||
ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) {
|
||||
|
||||
ctx context.Context,
|
||||
) (map[string]map[string]schema.MetricStatistics, error) {
|
||||
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}) // #166 Add scope shere for analysis view accelerator normalization?
|
||||
if err != nil {
|
||||
log.Warn("Error while building query")
|
||||
@@ -571,8 +570,8 @@ func (ccms *CCMetricStore) LoadNodeData(
|
||||
metrics, nodes []string,
|
||||
scopes []schema.MetricScope,
|
||||
from, to time.Time,
|
||||
ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
|
||||
|
||||
ctx context.Context,
|
||||
) (map[string]map[string][]*schema.JobMetric, error) {
|
||||
req := ApiQueryRequest{
|
||||
Cluster: cluster,
|
||||
From: from.Unix(),
|
||||
@@ -657,7 +656,6 @@ func (ccms *CCMetricStore) LoadNodeData(
|
||||
}
|
||||
|
||||
func intToStringSlice(is []int) []string {
|
||||
|
||||
ss := make([]string, len(is))
|
||||
for i, x := range is {
|
||||
ss[i] = strconv.Itoa(x)
|
||||
|
@@ -16,6 +16,7 @@ import (
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
@@ -29,13 +30,11 @@ var (
|
||||
)
|
||||
|
||||
type JobRepository struct {
|
||||
DB *sqlx.DB
|
||||
driver string
|
||||
|
||||
stmtCache *sq.StmtCache
|
||||
cache *lrucache.Cache
|
||||
|
||||
DB *sqlx.DB
|
||||
stmtCache *sq.StmtCache
|
||||
cache *lrucache.Cache
|
||||
archiveChannel chan *schema.Job
|
||||
driver string
|
||||
archivePending sync.WaitGroup
|
||||
}
|
||||
|
||||
@@ -212,7 +211,7 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
|
||||
}
|
||||
|
||||
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
|
||||
return nil
|
||||
return archive.UpdateMetadata(job, job.MetaData)
|
||||
}
|
||||
|
||||
// Find executes a SQL query to find a specific batch job.
|
||||
@@ -223,8 +222,8 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
|
||||
func (r *JobRepository) Find(
|
||||
jobId *int64,
|
||||
cluster *string,
|
||||
startTime *int64) (*schema.Job, error) {
|
||||
|
||||
startTime *int64,
|
||||
) (*schema.Job, error) {
|
||||
start := time.Now()
|
||||
q := sq.Select(jobColumns...).From("job").
|
||||
Where("job.job_id = ?", *jobId)
|
||||
@@ -248,8 +247,8 @@ func (r *JobRepository) Find(
|
||||
func (r *JobRepository) FindAll(
|
||||
jobId *int64,
|
||||
cluster *string,
|
||||
startTime *int64) ([]*schema.Job, error) {
|
||||
|
||||
startTime *int64,
|
||||
) ([]*schema.Job, error) {
|
||||
start := time.Now()
|
||||
q := sq.Select(jobColumns...).From("job").
|
||||
Where("job.job_id = ?", *jobId)
|
||||
@@ -292,7 +291,8 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {
|
||||
|
||||
func (r *JobRepository) FindConcurrentJobs(
|
||||
ctx context.Context,
|
||||
job *schema.Job) (*model.JobLinkResultList, error) {
|
||||
job *schema.Job,
|
||||
) (*model.JobLinkResultList, error) {
|
||||
if job == nil {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -420,8 +420,8 @@ func (r *JobRepository) Stop(
|
||||
jobId int64,
|
||||
duration int32,
|
||||
state schema.JobState,
|
||||
monitoringStatus int32) (err error) {
|
||||
|
||||
monitoringStatus int32,
|
||||
) (err error) {
|
||||
stmt := sq.Update("job").
|
||||
Set("job_state", state).
|
||||
Set("duration", duration).
|
||||
@@ -434,11 +434,14 @@ func (r *JobRepository) Stop(
|
||||
|
||||
func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
|
||||
var cnt int
|
||||
qs := fmt.Sprintf("SELECT count(*) FROM job WHERE job.start_time < %d", startTime)
|
||||
err := r.DB.Get(&cnt, qs) //ignore error as it will also occur in delete statement
|
||||
_, err = r.DB.Exec(`DELETE FROM job WHERE job.start_time < ?`, startTime)
|
||||
q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime)
|
||||
q.RunWith(r.DB).QueryRow().Scan(cnt)
|
||||
qd := sq.Delete("job").Where("job.start_time < ?", startTime)
|
||||
_, err := qd.RunWith(r.DB).Exec()
|
||||
|
||||
if err != nil {
|
||||
log.Errorf(" DeleteJobsBefore(%d): error %#v", startTime, err)
|
||||
s, _, _ := qd.ToSql()
|
||||
log.Errorf(" DeleteJobsBefore(%d) with %s: error %#v", startTime, s, err)
|
||||
} else {
|
||||
log.Debugf("DeleteJobsBefore(%d): Deleted %d jobs", startTime, cnt)
|
||||
}
|
||||
@@ -446,9 +449,12 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
|
||||
}
|
||||
|
||||
func (r *JobRepository) DeleteJobById(id int64) error {
|
||||
_, err := r.DB.Exec(`DELETE FROM job WHERE job.id = ?`, id)
|
||||
qd := sq.Delete("job").Where("job.id = ?", id)
|
||||
_, err := qd.RunWith(r.DB).Exec()
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("DeleteJobById(%d): error %#v", id, err)
|
||||
s, _, _ := qd.ToSql()
|
||||
log.Errorf("DeleteJobById(%d) with %s : error %#v", id, s, err)
|
||||
} else {
|
||||
log.Debugf("DeleteJobById(%d): Success", id)
|
||||
}
|
||||
@@ -468,8 +474,8 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
|
||||
func (r *JobRepository) MarkArchived(
|
||||
jobId int64,
|
||||
monitoringStatus int32,
|
||||
metricStats map[string]schema.JobStatistics) error {
|
||||
|
||||
metricStats map[string]schema.JobStatistics,
|
||||
) error {
|
||||
stmt := sq.Update("job").
|
||||
Set("monitoring_status", monitoringStatus).
|
||||
Where("job.id = ?", jobId)
|
||||
@@ -578,8 +584,10 @@ func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm
|
||||
}
|
||||
}
|
||||
|
||||
var ErrNotFound = errors.New("no such jobname, project or user")
|
||||
var ErrForbidden = errors.New("not authorized")
|
||||
var (
|
||||
ErrNotFound = errors.New("no such jobname, project or user")
|
||||
ErrForbidden = errors.New("not authorized")
|
||||
)
|
||||
|
||||
func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, table string, selectColumn string, whereColumn string, isLike bool) (result string, err error) {
|
||||
compareStr := " = ?"
|
||||
@@ -663,7 +671,6 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) {
|
||||
// AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host.
|
||||
// Hosts with zero jobs running on them will not show up!
|
||||
func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) {
|
||||
|
||||
start := time.Now()
|
||||
subclusters := make(map[string]map[string]int)
|
||||
rows, err := sq.Select("resources", "subcluster").From("job").
|
||||
@@ -706,7 +713,6 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
|
||||
}
|
||||
|
||||
func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
|
||||
|
||||
start := time.Now()
|
||||
res, err := sq.Update("job").
|
||||
Set("monitoring_status", schema.MonitoringStatusArchivingFailed).
|
||||
@@ -735,7 +741,6 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
|
||||
}
|
||||
|
||||
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) {
|
||||
|
||||
var query sq.SelectBuilder
|
||||
|
||||
if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {
|
||||
|
@@ -16,7 +16,7 @@ import (
|
||||
"github.com/golang-migrate/migrate/v4/source/iofs"
|
||||
)
|
||||
|
||||
const Version uint = 6
|
||||
const Version uint = 7
|
||||
|
||||
//go:embed migrations/*
|
||||
var migrationFiles embed.FS
|
||||
@@ -57,7 +57,7 @@ func checkDBVersion(backend string, db *sql.DB) error {
|
||||
log.Fatalf("unsupported database backend: %s", backend)
|
||||
}
|
||||
|
||||
v, _, err := m.Version()
|
||||
v, dirty, err := m.Version()
|
||||
if err != nil {
|
||||
if err == migrate.ErrNilVersion {
|
||||
log.Warn("Legacy database without version or missing database file!")
|
||||
@@ -68,18 +68,18 @@ func checkDBVersion(backend string, db *sql.DB) error {
|
||||
|
||||
if v < Version {
|
||||
return fmt.Errorf("unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend -migrate-db", v, Version)
|
||||
} else if v > Version {
|
||||
return fmt.Errorf("unsupported database version %d, need %d.\nPlease refer to documentation how to downgrade db with external migrate tool", v, Version)
|
||||
}
|
||||
|
||||
if v > Version {
|
||||
return fmt.Errorf("unsupported database version %d, need %d.\nPlease refer to documentation how to downgrade db with external migrate tool", v, Version)
|
||||
if dirty {
|
||||
return fmt.Errorf("last migration to version %d has failed, please fix the db manually and force version with -force-db flag", Version)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func MigrateDB(backend string, db string) error {
|
||||
var m *migrate.Migrate
|
||||
|
||||
func getMigrateInstance(backend string, db string) (m *migrate.Migrate, err error) {
|
||||
switch backend {
|
||||
case "sqlite3":
|
||||
d, err := iofs.New(migrationFiles, "migrations/sqlite3")
|
||||
@@ -89,22 +89,31 @@ func MigrateDB(backend string, db string) error {
|
||||
|
||||
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("sqlite3://%s?_foreign_keys=on", db))
|
||||
if err != nil {
|
||||
return err
|
||||
return m, err
|
||||
}
|
||||
case "mysql":
|
||||
d, err := iofs.New(migrationFiles, "migrations/mysql")
|
||||
if err != nil {
|
||||
return err
|
||||
return m, err
|
||||
}
|
||||
|
||||
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("mysql://%s?multiStatements=true", db))
|
||||
if err != nil {
|
||||
return err
|
||||
return m, err
|
||||
}
|
||||
default:
|
||||
log.Fatalf("unsupported database backend: %s", backend)
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func MigrateDB(backend string, db string) error {
|
||||
m, err := getMigrateInstance(backend, db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := m.Up(); err != nil {
|
||||
if err == migrate.ErrNoChange {
|
||||
log.Info("DB already up to date!")
|
||||
@@ -116,3 +125,35 @@ func MigrateDB(backend string, db string) error {
|
||||
m.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func RevertDB(backend string, db string) error {
|
||||
m, err := getMigrateInstance(backend, db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := m.Migrate(Version - 1); err != nil {
|
||||
if err == migrate.ErrNoChange {
|
||||
log.Info("DB already up to date!")
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
m.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func ForceDB(backend string, db string) error {
|
||||
m, err := getMigrateInstance(backend, db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := m.Force(int(Version)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.Close()
|
||||
return nil
|
||||
}
|
||||
|
@@ -0,0 +1,3 @@
|
||||
SET FOREIGN_KEY_CHECKS = 0;
|
||||
ALTER TABLE tag MODIFY id INTEGER;
|
||||
SET FOREIGN_KEY_CHECKS = 1;
|
@@ -0,0 +1,3 @@
|
||||
SET FOREIGN_KEY_CHECKS = 0;
|
||||
ALTER TABLE tag MODIFY id INTEGER AUTO_INCREMENT;
|
||||
SET FOREIGN_KEY_CHECKS = 1;
|
@@ -15,8 +15,11 @@ import (
|
||||
|
||||
// Add the tag with id `tagId` to the job with the database id `jobId`.
|
||||
func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {
|
||||
if _, err := r.stmtCache.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES ($1, $2)`, job, tag); err != nil {
|
||||
log.Error("Error while running query")
|
||||
q := sq.Insert("jobtag").Columns("job_id", "tag_id").Values(job, tag)
|
||||
|
||||
if _, err := q.RunWith(r.stmtCache).Exec(); err != nil {
|
||||
s, _, _ := q.ToSql()
|
||||
log.Errorf("Error adding tag with %s: %v", s, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -37,8 +40,11 @@ func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {
|
||||
|
||||
// Removes a tag from a job
|
||||
func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) {
|
||||
if _, err := r.stmtCache.Exec("DELETE FROM jobtag WHERE jobtag.job_id = $1 AND jobtag.tag_id = $2", job, tag); err != nil {
|
||||
log.Error("Error while running query")
|
||||
q := sq.Delete("jobtag").Where("jobtag.job_id = ?", job).Where("jobtag.tag_id = ?", tag)
|
||||
|
||||
if _, err := q.RunWith(r.stmtCache).Exec(); err != nil {
|
||||
s, _, _ := q.ToSql()
|
||||
log.Errorf("Error adding tag with %s: %v", s, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -59,8 +65,12 @@ func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) {
|
||||
|
||||
// CreateTag creates a new tag with the specified type and name and returns its database id.
|
||||
func (r *JobRepository) CreateTag(tagType string, tagName string) (tagId int64, err error) {
|
||||
res, err := r.stmtCache.Exec("INSERT INTO tag (tag_type, tag_name) VALUES ($1, $2)", tagType, tagName)
|
||||
q := sq.Insert("tag").Columns("tag_type", "tag_name").Values(tagType, tagName)
|
||||
|
||||
res, err := q.RunWith(r.stmtCache).Exec()
|
||||
if err != nil {
|
||||
s, _, _ := q.ToSql()
|
||||
log.Errorf("Error inserting tag with %s: %v", s, err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
@@ -154,7 +164,8 @@ func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) {
|
||||
|
||||
rows, err := q.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
log.Error("Error while running query")
|
||||
s, _, _ := q.ToSql()
|
||||
log.Errorf("Error get tags with %s: %v", s, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user