Merge branch 'master' into 134-job-tagging

This commit is contained in:
2023-09-27 05:30:36 +02:00
52 changed files with 4139 additions and 1371 deletions

View File

@@ -1,4 +1,5 @@
// Code generated by swaggo/swag. DO NOT EDIT
// Code generated by swaggo/swag. DO NOT EDIT.
package api
import "github.com/swaggo/swag"
@@ -35,7 +36,7 @@ const docTemplate = `{
"application/json"
],
"tags": [
"query"
"Job query"
],
"summary": "Lists all jobs",
"parameters": [
@@ -133,7 +134,7 @@ const docTemplate = `{
"application/json"
],
"tags": [
"remove"
"Job remove"
],
"summary": "Remove a job from the sql database",
"parameters": [
@@ -205,7 +206,7 @@ const docTemplate = `{
"application/json"
],
"tags": [
"remove"
"Job remove"
],
"summary": "Remove a job from the sql database",
"parameters": [
@@ -275,7 +276,7 @@ const docTemplate = `{
"application/json"
],
"tags": [
"remove"
"Job remove"
],
"summary": "Remove a job from the sql database",
"parameters": [
@@ -348,7 +349,7 @@ const docTemplate = `{
"application/json"
],
"tags": [
"add and modify"
"Job add and modify"
],
"summary": "Adds a new job as \"running\"",
"parameters": [
@@ -414,7 +415,7 @@ const docTemplate = `{
"application/json"
],
"tags": [
"add and modify"
"Job add and modify"
],
"summary": "Marks job as completed and triggers archiving",
"parameters": [
@@ -489,7 +490,7 @@ const docTemplate = `{
"application/json"
],
"tags": [
"add and modify"
"Job add and modify"
],
"summary": "Marks job as completed and triggers archiving",
"parameters": [
@@ -571,7 +572,7 @@ const docTemplate = `{
"application/json"
],
"tags": [
"add and modify"
"Job add and modify"
],
"summary": "Adds one or more tags to a job",
"parameters": [
@@ -644,7 +645,7 @@ const docTemplate = `{
"application/json"
],
"tags": [
"query"
"Job query"
],
"summary": "Get complete job meta and metric data",
"parameters": [
@@ -713,9 +714,367 @@ const docTemplate = `{
}
}
}
},
"/user/{id}": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Modifies user defined by username (id) in one of four possible ways.\nIf more than one formValue is set then only the highest priority field is used.\nOnly accessible from IPs registered with apiAllowedIPs configuration option.",
"consumes": [
"multipart/form-data"
],
"produces": [
"text/plain"
],
"tags": [
"User"
],
"summary": "Updates an existing user",
"parameters": [
{
"type": "string",
"description": "Database ID of User",
"name": "id",
"in": "path",
"required": true
},
{
"enum": [
"admin",
"support",
"manager",
"user",
"api"
],
"type": "string",
"description": "Priority 1: Role to add",
"name": "add-role",
"in": "formData"
},
{
"enum": [
"admin",
"support",
"manager",
"user",
"api"
],
"type": "string",
"description": "Priority 2: Role to remove",
"name": "remove-role",
"in": "formData"
},
{
"type": "string",
"description": "Priority 3: Project to add",
"name": "add-project",
"in": "formData"
},
{
"type": "string",
"description": "Priority 4: Project to remove",
"name": "remove-project",
"in": "formData"
}
],
"responses": {
"200": {
"description": "Success Response Message",
"schema": {
"type": "string"
}
},
"400": {
"description": "Bad Request",
"schema": {
"type": "string"
}
},
"401": {
"description": "Unauthorized",
"schema": {
"type": "string"
}
},
"403": {
"description": "Forbidden",
"schema": {
"type": "string"
}
},
"422": {
"description": "Unprocessable Entity: The user could not be updated",
"schema": {
"type": "string"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"type": "string"
}
}
}
}
},
"/users/": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Returns a JSON-encoded list of users.\nRequired query-parameter defines if all users or only users with additional special roles are returned.\nOnly accessible from IPs registered with apiAllowedIPs configuration option.",
"produces": [
"application/json"
],
"tags": [
"User"
],
"summary": "Returns a list of users",
"parameters": [
{
"type": "boolean",
"description": "If returned list should contain all users or only users with additional special roles",
"name": "not-just-user",
"in": "query",
"required": true
}
],
"responses": {
"200": {
"description": "List of users returned successfully",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ApiReturnedUser"
}
}
},
"400": {
"description": "Bad Request",
"schema": {
"type": "string"
}
},
"401": {
"description": "Unauthorized",
"schema": {
"type": "string"
}
},
"403": {
"description": "Forbidden",
"schema": {
"type": "string"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"type": "string"
}
}
}
},
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "User specified in form data will be saved to database.\nOnly accessible from IPs registered with apiAllowedIPs configuration option.",
"consumes": [
"multipart/form-data"
],
"produces": [
"text/plain"
],
"tags": [
"User"
],
"summary": "Adds a new user",
"parameters": [
{
"type": "string",
"description": "Unique user ID",
"name": "username",
"in": "formData",
"required": true
},
{
"type": "string",
"description": "User password",
"name": "password",
"in": "formData",
"required": true
},
{
"enum": [
"admin",
"support",
"manager",
"user",
"api"
],
"type": "string",
"description": "User role",
"name": "role",
"in": "formData",
"required": true
},
{
"type": "string",
"description": "Managed project, required for new manager role user",
"name": "project",
"in": "formData"
},
{
"type": "string",
"description": "Users name",
"name": "name",
"in": "formData"
},
{
"type": "string",
"description": "Users email",
"name": "email",
"in": "formData"
}
],
"responses": {
"200": {
"description": "Success Response",
"schema": {
"type": "string"
}
},
"400": {
"description": "Bad Request",
"schema": {
"type": "string"
}
},
"401": {
"description": "Unauthorized",
"schema": {
"type": "string"
}
},
"403": {
"description": "Forbidden",
"schema": {
"type": "string"
}
},
"422": {
"description": "Unprocessable Entity: creating user failed",
"schema": {
"type": "string"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"type": "string"
}
}
}
},
"delete": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "User defined by username in form data will be deleted from database.\nOnly accessible from IPs registered with apiAllowedIPs configuration option.",
"consumes": [
"multipart/form-data"
],
"produces": [
"text/plain"
],
"tags": [
"User"
],
"summary": "Deletes a user",
"parameters": [
{
"type": "string",
"description": "User ID to delete",
"name": "username",
"in": "formData",
"required": true
}
],
"responses": {
"200": {
"description": "User deleted successfully"
},
"400": {
"description": "Bad Request",
"schema": {
"type": "string"
}
},
"401": {
"description": "Unauthorized",
"schema": {
"type": "string"
}
},
"403": {
"description": "Forbidden",
"schema": {
"type": "string"
}
},
"422": {
"description": "Unprocessable Entity: deleting user failed",
"schema": {
"type": "string"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"type": "string"
}
}
}
}
}
},
"definitions": {
"api.ApiReturnedUser": {
"type": "object",
"properties": {
"email": {
"type": "string"
},
"name": {
"type": "string"
},
"projects": {
"type": "array",
"items": {
"type": "string"
}
},
"roles": {
"type": "array",
"items": {
"type": "string"
}
},
"username": {
"type": "string"
}
}
},
"api.ApiTag": {
"type": "object",
"properties": {
@@ -1372,7 +1731,7 @@ const docTemplate = `{
"type": "object",
"properties": {
"id": {
"description": "The unique DB identifier of a tag\nThe unique DB identifier of a tag",
"description": "The unique DB identifier of a tag",
"type": "integer"
},
"name": {
@@ -1405,17 +1764,12 @@ const docTemplate = `{
"name": "X-Auth-Token",
"in": "header"
}
},
"tags": [
{
"name": "Job API"
}
]
}
}`
// SwaggerInfo holds exported Swagger Info so clients can modify it
var SwaggerInfo = &swag.Spec{
Version: "1",
Version: "1.0.0",
Host: "localhost:8080",
BasePath: "/api",
Schemes: []string{},
@@ -1423,6 +1777,8 @@ var SwaggerInfo = &swag.Spec{
Description: "API for batch job control.",
InfoInstanceName: "swagger",
SwaggerTemplate: docTemplate,
LeftDelim: "{{",
RightDelim: "}}",
}
func init() {

View File

@@ -37,8 +37,6 @@ import (
// @version 1.0.0
// @description API for batch job control.
// @tag.name Job API
// @contact.name ClusterCockpit Project
// @contact.url https://github.com/ClusterCockpit
// @contact.email support@clustercockpit.org
@@ -77,8 +75,6 @@ func (api *RestApi) MountRoutes(r *mux.Router) {
r.HandleFunc("/jobs/delete_job/", api.deleteJobByRequest).Methods(http.MethodDelete)
r.HandleFunc("/jobs/delete_job/{id}", api.deleteJobById).Methods(http.MethodDelete)
r.HandleFunc("/jobs/delete_job_before/{ts}", api.deleteJobBefore).Methods(http.MethodDelete)
// r.HandleFunc("/secured/addProject/{id}/{project}", api.secureUpdateUser).Methods(http.MethodPost)
// r.HandleFunc("/secured/addRole/{id}/{role}", api.secureUpdateUser).Methods(http.MethodPost)
if api.MachineStateDir != "" {
r.HandleFunc("/machine_state/{cluster}/{host}", api.getMachineState).Methods(http.MethodGet)
@@ -165,6 +161,14 @@ type JobMetricWithName struct {
Metric *schema.JobMetric `json:"metric"`
}
type ApiReturnedUser struct {
Username string `json:"username"`
Name string `json:"name"`
Roles []string `json:"roles"`
Email string `json:"email"`
Projects []string `json:"projects"`
}
func handleError(err error, statusCode int, rw http.ResponseWriter) {
log.Warnf("REST ERROR : %s", err.Error())
rw.Header().Add("Content-Type", "application/json")
@@ -193,6 +197,10 @@ func securedCheck(r *http.Request) error {
return fmt.Errorf("missing configuration key ApiAllowedIPs")
}
if config.Keys.ApiAllowedIPs[0] == "*" {
return nil
}
// extract IP address
IPAddress := r.Header.Get("X-Real-Ip")
if IPAddress == "" {
@@ -202,6 +210,10 @@ func securedCheck(r *http.Request) error {
IPAddress = r.RemoteAddr
}
if strings.Contains(IPAddress, ":") {
IPAddress = strings.Split(IPAddress, ":")[0]
}
// check if IP is allowed
if !util.Contains(config.Keys.ApiAllowedIPs, IPAddress) {
return fmt.Errorf("unknown ip: %v", IPAddress)
@@ -213,7 +225,7 @@ func securedCheck(r *http.Request) error {
// getJobs godoc
// @summary Lists all jobs
// @tags query
// @tags Job query
// @description Get a list of all jobs. Filters can be applied using query parameters.
// @description Number of results can be limited by page. Results are sorted by descending startTime.
// @produce json
@@ -359,7 +371,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
// getJobById godoc
// @summary Get complete job meta and metric data
// @tags query
// @tags Job query
// @description Job to get is specified by database ID
// @description Returns full job resource information according to 'JobMeta' scheme and all metrics according to 'JobData'.
// @accept json
@@ -454,7 +466,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
// tagJob godoc
// @summary Adds one or more tags to a job
// @tags add and modify
// @tags Job add and modify
// @description Adds tag(s) to a job specified by DB ID. Name and Type of Tag(s) can be chosen freely.
// @description If tagged job is already finished: Tag will be written directly to respective archive files.
// @accept json
@@ -521,7 +533,7 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
// startJob godoc
// @summary Adds a new job as "running"
// @tags add and modify
// @tags Job add and modify
// @description Job specified in request body will be saved to database as "running" with new DB ID.
// @description Job specifications follow the 'JobMeta' scheme, API will fail to execute if requirements are not met.
// @accept json
@@ -602,7 +614,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
// stopJobById godoc
// @summary Marks job as completed and triggers archiving
// @tags add and modify
// @tags Job add and modify
// @description Job to stop is specified by database ID. Only stopTime and final state are required in request body.
// @description Returns full job resource information according to 'JobMeta' scheme.
// @accept json
@@ -659,7 +671,7 @@ func (api *RestApi) stopJobById(rw http.ResponseWriter, r *http.Request) {
// stopJobByRequest godoc
// @summary Marks job as completed and triggers archiving
// @tags add and modify
// @tags Job add and modify
// @description Job to stop is specified by request body. All fields are required in this case.
// @description Returns full job resource information according to 'JobMeta' scheme.
// @produce json
@@ -708,7 +720,7 @@ func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
// deleteJobById godoc
// @summary Remove a job from the sql database
// @tags remove
// @tags Job remove
// @description Job to remove is specified by database ID. This will not remove the job from the job archive.
// @produce json
// @param id path int true "Database ID of Job"
@@ -755,7 +767,7 @@ func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) {
// deleteJobByRequest godoc
// @summary Remove a job from the sql database
// @tags remove
// @tags Job remove
// @description Job to delete is specified by request body. All fields are required in this case.
// @accept json
// @produce json
@@ -813,7 +825,7 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request)
// deleteJobBefore godoc
// @summary Remove a job from the sql database
// @tags remove
// @tags Job remove
// @description Remove all jobs with start time before timestamp. The jobs will not be removed from the job archive.
// @produce json
// @param ts path int true "Unix epoch timestamp"
@@ -943,43 +955,32 @@ func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
})
}
func (api *RestApi) getJWT(rw http.ResponseWriter, r *http.Request) {
err := securedCheck(r)
if err != nil {
http.Error(rw, err.Error(), http.StatusForbidden)
}
rw.Header().Set("Content-Type", "text/plain")
username := r.FormValue("username")
me := repository.GetUserFromContext(r.Context())
if !me.HasRole(schema.RoleAdmin) {
if username != me.Username {
http.Error(rw, "Only admins are allowed to sign JWTs not for themselves",
http.StatusForbidden)
return
}
}
user, err := repository.GetUserRepository().GetUser(username)
if err != nil {
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
return
}
jwt, err := api.Authentication.JwtAuth.ProvideJWT(user)
if err != nil {
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
return
}
rw.WriteHeader(http.StatusOK)
rw.Write([]byte(jwt))
}
// createUser godoc
// @summary Adds a new user
// @tags User
// @description User specified in form data will be saved to database.
// @description Only accessible from IPs registered with apiAllowedIPs configuration option.
// @accept mpfd
// @produce plain
// @param username formData string true "Unique user ID"
// @param password formData string true "User password"
// @param role formData string true "User role" Enums(admin, support, manager, user, api)
// @param project formData string false "Managed project, required for new manager role user"
// @param name formData string false "Users name"
// @param email formData string false "Users email"
// @success 200 {string} string "Success Response"
// @failure 400 {string} string "Bad Request"
// @failure 401 {string} string "Unauthorized"
// @failure 403 {string} string "Forbidden"
// @failure 422 {string} string "Unprocessable Entity: creating user failed"
// @failure 500 {string} string "Internal Server Error"
// @security ApiKeyAuth
// @router /users/ [post]
func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) {
err := securedCheck(r)
if err != nil {
http.Error(rw, err.Error(), http.StatusForbidden)
return
}
rw.Header().Set("Content-Type", "text/plain")
@@ -1022,10 +1023,27 @@ func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) {
rw.Write([]byte(fmt.Sprintf("User %v successfully created!\n", username)))
}
// deleteUser godoc
// @summary Deletes a user
// @tags User
// @description User defined by username in form data will be deleted from database.
// @description Only accessible from IPs registered with apiAllowedIPs configuration option.
// @accept mpfd
// @produce plain
// @param username formData string true "User ID to delete"
// @success 200 "User deleted successfully"
// @failure 400 {string} string "Bad Request"
// @failure 401 {string} string "Unauthorized"
// @failure 403 {string} string "Forbidden"
// @failure 422 {string} string "Unprocessable Entity: deleting user failed"
// @failure 500 {string} string "Internal Server Error"
// @security ApiKeyAuth
// @router /users/ [delete]
func (api *RestApi) deleteUser(rw http.ResponseWriter, r *http.Request) {
err := securedCheck(r)
if err != nil {
http.Error(rw, err.Error(), http.StatusForbidden)
return
}
if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
@@ -1042,10 +1060,26 @@ func (api *RestApi) deleteUser(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(http.StatusOK)
}
// getUsers godoc
// @summary Returns a list of users
// @tags User
// @description Returns a JSON-encoded list of users.
// @description Required query-parameter defines if all users or only users with additional special roles are returned.
// @description Only accessible from IPs registered with apiAllowedIPs configuration option.
// @produce json
// @param not-just-user query bool true "If returned list should contain all users or only users with additional special roles"
// @success 200 {array} api.ApiReturnedUser "List of users returned successfully"
// @failure 400 {string} string "Bad Request"
// @failure 401 {string} string "Unauthorized"
// @failure 403 {string} string "Forbidden"
// @failure 500 {string} string "Internal Server Error"
// @security ApiKeyAuth
// @router /users/ [get]
func (api *RestApi) getUsers(rw http.ResponseWriter, r *http.Request) {
err := securedCheck(r)
if err != nil {
http.Error(rw, err.Error(), http.StatusForbidden)
return
}
if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
@@ -1062,31 +1096,32 @@ func (api *RestApi) getUsers(rw http.ResponseWriter, r *http.Request) {
json.NewEncoder(rw).Encode(users)
}
func (api *RestApi) getRoles(rw http.ResponseWriter, r *http.Request) {
err := securedCheck(r)
if err != nil {
http.Error(rw, err.Error(), http.StatusForbidden)
}
user := repository.GetUserFromContext(r.Context())
if !user.HasRole(schema.RoleAdmin) {
http.Error(rw, "only admins are allowed to fetch a list of roles", http.StatusForbidden)
return
}
roles, err := schema.GetValidRoles(user)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(rw).Encode(roles)
}
// updateUser godoc
// @summary Updates an existing user
// @tags User
// @description Modifies user defined by username (id) in one of four possible ways.
// @description If more than one formValue is set then only the highest priority field is used.
// @description Only accessible from IPs registered with apiAllowedIPs configuration option.
// @accept mpfd
// @produce plain
// @param id path string true "Database ID of User"
// @param add-role formData string false "Priority 1: Role to add" Enums(admin, support, manager, user, api)
// @param remove-role formData string false "Priority 2: Role to remove" Enums(admin, support, manager, user, api)
// @param add-project formData string false "Priority 3: Project to add"
// @param remove-project formData string false "Priority 4: Project to remove"
// @success 200 {string} string "Success Response Message"
// @failure 400 {string} string "Bad Request"
// @failure 401 {string} string "Unauthorized"
// @failure 403 {string} string "Forbidden"
// @failure 422 {string} string "Unprocessable Entity: The user could not be updated"
// @failure 500 {string} string "Internal Server Error"
// @security ApiKeyAuth
// @router /user/{id} [post]
func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) {
err := securedCheck(r)
if err != nil {
http.Error(rw, err.Error(), http.StatusForbidden)
return
}
if user := repository.GetUserFromContext(r.Context()); !user.HasRole(schema.RoleAdmin) {
@@ -1130,70 +1165,61 @@ func (api *RestApi) updateUser(rw http.ResponseWriter, r *http.Request) {
}
}
// func (api *RestApi) secureUpdateUser(rw http.ResponseWriter, r *http.Request) {
// if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
// handleError(fmt.Errorf("missing role: %v", auth.GetRoleString(auth.RoleApi)), http.StatusForbidden, rw)
// return
// }
//
// // IP CHECK HERE (WIP)
// // Probably better as private routine
// IPAddress := r.Header.Get("X-Real-Ip")
// if IPAddress == "" {
// IPAddress = r.Header.Get("X-Forwarded-For")
// }
// if IPAddress == "" {
// IPAddress = r.RemoteAddr
// }
//
// // Also This
// ipOk := false
// for _, a := range config.Keys.ApiAllowedAddrs {
// if a == IPAddress {
// ipOk = true
// }
// }
//
// if IPAddress == "" || ipOk == false {
// handleError(fmt.Errorf("unknown ip: %v", IPAddress), http.StatusForbidden, rw)
// return
// }
// // IP CHECK END
//
// // Get Values
// id := mux.Vars(r)["id"]
// newproj := mux.Vars(r)["project"]
// newrole := mux.Vars(r)["role"]
//
// // TODO: Handle anything but roles...
// if newrole != "" {
// if err := api.Authentication.AddRole(r.Context(), id, newrole); err != nil {
// handleError(errors.New(err.Error()), http.StatusUnprocessableEntity, rw)
// return
// }
//
// rw.Header().Add("Content-Type", "application/json")
// rw.WriteHeader(http.StatusOK)
// json.NewEncoder(rw).Encode(UpdateUserApiResponse{
// Message: fmt.Sprintf("Successfully added role %s to %s", newrole, id),
// })
//
// } else if newproj != "" {
// if err := api.Authentication.AddProject(r.Context(), id, newproj); err != nil {
// handleError(errors.New(err.Error()), http.StatusUnprocessableEntity, rw)
// return
// }
//
// rw.Header().Add("Content-Type", "application/json")
// rw.WriteHeader(http.StatusOK)
// json.NewEncoder(rw).Encode(UpdateUserApiResponse{
// Message: fmt.Sprintf("Successfully added project %s to %s", newproj, id),
// })
//
// } else {
// handleError(errors.New("Not Add [role|project]?"), http.StatusBadRequest, rw)
// }
// }
func (api *RestApi) getJWT(rw http.ResponseWriter, r *http.Request) {
err := securedCheck(r)
if err != nil {
http.Error(rw, err.Error(), http.StatusForbidden)
return
}
rw.Header().Set("Content-Type", "text/plain")
username := r.FormValue("username")
me := repository.GetUserFromContext(r.Context())
if !me.HasRole(schema.RoleAdmin) {
if username != me.Username {
http.Error(rw, "Only admins are allowed to sign JWTs not for themselves",
http.StatusForbidden)
return
}
}
user, err := repository.GetUserRepository().GetUser(username)
if err != nil {
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
return
}
jwt, err := api.Authentication.JwtAuth.ProvideJWT(user)
if err != nil {
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
return
}
rw.WriteHeader(http.StatusOK)
rw.Write([]byte(jwt))
}
func (api *RestApi) getRoles(rw http.ResponseWriter, r *http.Request) {
err := securedCheck(r)
if err != nil {
http.Error(rw, err.Error(), http.StatusForbidden)
return
}
user := repository.GetUserFromContext(r.Context())
if !user.HasRole(schema.RoleAdmin) {
http.Error(rw, "only admins are allowed to fetch a list of roles", http.StatusForbidden)
return
}
roles, err := schema.GetValidRoles(user)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(rw).Encode(roles)
}
func (api *RestApi) updateConfiguration(rw http.ResponseWriter, r *http.Request) {
rw.Header().Set("Content-Type", "text/plain")

View File

@@ -6,6 +6,7 @@ package auth
import (
"crypto/ed25519"
"database/sql"
"encoding/base64"
"errors"
"fmt"
@@ -152,31 +153,35 @@ func (ja *JWTCookieSessionAuthenticator) Login(
claims := token.Claims.(jwt.MapClaims)
sub, _ := claims["sub"].(string)
var name string
if wrap, ok := claims["name"].(map[string]interface{}); ok {
if vals, ok := wrap["values"].([]interface{}); ok {
if len(vals) != 0 {
name = fmt.Sprintf("%v", vals[0])
for i := 1; i < len(vals); i++ {
name += fmt.Sprintf(" %v", vals[i])
}
}
}
}
var roles []string
projects := make([]string, 0)
if jc.ValidateUser {
var err error
user, err = repository.GetUserRepository().GetUser(sub)
if err != nil && err != sql.ErrNoRows {
log.Errorf("Error while loading user '%v'", sub)
}
// Deny any logins for unknown usernames
if user == nil {
log.Warn("Could not find user from JWT in internal database.")
return nil, errors.New("unknown user")
}
// Take user roles from database instead of trusting the JWT
roles = user.Roles
} else {
var name string
if wrap, ok := claims["name"].(map[string]interface{}); ok {
if vals, ok := wrap["values"].([]interface{}); ok {
if len(vals) != 0 {
name = fmt.Sprintf("%v", vals[0])
for i := 1; i < len(vals); i++ {
name += fmt.Sprintf(" %v", vals[i])
}
}
}
}
// Extract roles from JWT (if present)
if rawroles, ok := claims["roles"].([]interface{}); ok {
for _, rr := range rawroles {
@@ -185,20 +190,6 @@ func (ja *JWTCookieSessionAuthenticator) Login(
}
}
}
}
// (Ask browser to) Delete JWT cookie
deletedCookie := &http.Cookie{
Name: jc.CookieName,
Value: "",
Path: "/",
MaxAge: -1,
HttpOnly: true,
}
http.SetCookie(rw, deletedCookie)
if user == nil {
projects := make([]string, 0)
user = &schema.User{
Username: sub,
Name: name,
@@ -215,5 +206,15 @@ func (ja *JWTCookieSessionAuthenticator) Login(
}
}
// (Ask browser to) Delete JWT cookie
deletedCookie := &http.Cookie{
Name: jc.CookieName,
Value: "",
Path: "/",
MaxAge: -1,
HttpOnly: true,
}
http.SetCookie(rw, deletedCookie)
return user, nil
}

View File

@@ -5,6 +5,7 @@
package auth
import (
"database/sql"
"encoding/base64"
"errors"
"fmt"
@@ -78,31 +79,35 @@ func (ja *JWTSessionAuthenticator) Login(
claims := token.Claims.(jwt.MapClaims)
sub, _ := claims["sub"].(string)
var name string
if wrap, ok := claims["name"].(map[string]interface{}); ok {
if vals, ok := wrap["values"].([]interface{}); ok {
if len(vals) != 0 {
name = fmt.Sprintf("%v", vals[0])
for i := 1; i < len(vals); i++ {
name += fmt.Sprintf(" %v", vals[i])
}
}
}
}
var roles []string
projects := make([]string, 0)
if config.Keys.JwtConfig.ValidateUser {
var err error
user, err = repository.GetUserRepository().GetUser(sub)
if err != nil && err != sql.ErrNoRows {
log.Errorf("Error while loading user '%v'", sub)
}
// Deny any logins for unknown usernames
if user == nil {
log.Warn("Could not find user from JWT in internal database.")
return nil, errors.New("unknown user")
}
// Take user roles from database instead of trusting the JWT
roles = user.Roles
} else {
var name string
if wrap, ok := claims["name"].(map[string]interface{}); ok {
if vals, ok := wrap["values"].([]interface{}); ok {
if len(vals) != 0 {
name = fmt.Sprintf("%v", vals[0])
for i := 1; i < len(vals); i++ {
name += fmt.Sprintf(" %v", vals[i])
}
}
}
}
// Extract roles from JWT (if present)
if rawroles, ok := claims["roles"].([]interface{}); ok {
for _, rr := range rawroles {
@@ -113,23 +118,17 @@ func (ja *JWTSessionAuthenticator) Login(
}
}
}
}
projects := make([]string, 0)
// Java/Grails Issued Token
// if rawprojs, ok := claims["projects"].([]interface{}); ok {
// for _, pp := range rawprojs {
// if p, ok := pp.(string); ok {
// projects = append(projects, p)
// }
// }
// } else if rawprojs, ok := claims["projects"]; ok {
// for _, p := range rawprojs.([]string) {
// projects = append(projects, p)
// }
// }
if rawprojs, ok := claims["projects"].([]interface{}); ok {
for _, pp := range rawprojs {
if p, ok := pp.(string); ok {
projects = append(projects, p)
}
}
} else if rawprojs, ok := claims["projects"]; ok {
projects = append(projects, rawprojs.([]string)...)
}
if user == nil {
user = &schema.User{
Username: sub,
Name: name,

View File

@@ -21,6 +21,7 @@ import (
type LdapAuthenticator struct {
syncPassword string
UserAttr string
}
var _ Authenticator = (*LdapAuthenticator)(nil)
@@ -31,11 +32,13 @@ func (la *LdapAuthenticator) Init() error {
log.Warn("environment variable 'LDAP_ADMIN_PASSWORD' not set (ldap sync will not work)")
}
if config.Keys.LdapConfig.SyncInterval != "" {
interval, err := time.ParseDuration(config.Keys.LdapConfig.SyncInterval)
lc := config.Keys.LdapConfig
if lc.SyncInterval != "" {
interval, err := time.ParseDuration(lc.SyncInterval)
if err != nil {
log.Warnf("Could not parse duration for sync interval: %v",
config.Keys.LdapConfig.SyncInterval)
lc.SyncInterval)
return err
}
@@ -58,6 +61,12 @@ func (la *LdapAuthenticator) Init() error {
log.Info("LDAP configuration key sync_interval invalid")
}
if lc.UserAttr != "" {
la.UserAttr = lc.UserAttr
} else {
la.UserAttr = "gecos"
}
return nil
}
@@ -86,7 +95,7 @@ func (la *LdapAuthenticator) CanLogin(
lc.UserBase,
ldap.ScopeWholeSubtree, ldap.NeverDerefAliases, 0, 0, false,
fmt.Sprintf("(&%s(uid=%s))", lc.UserFilter, username),
[]string{"dn", "uid", "gecos"}, nil)
[]string{"dn", "uid", la.UserAttr}, nil)
sr, err := l.Search(searchRequest)
if err != nil {
@@ -100,7 +109,7 @@ func (la *LdapAuthenticator) CanLogin(
}
entry := sr.Entries[0]
name := entry.GetAttributeValue("gecos")
name := entry.GetAttributeValue(la.UserAttr)
var roles []string
roles = append(roles, schema.GetRoleString(schema.RoleUser))
projects := make([]string, 0)
@@ -176,7 +185,7 @@ func (la *LdapAuthenticator) Sync() error {
lc.UserBase,
ldap.ScopeWholeSubtree, ldap.NeverDerefAliases, 0, 0, false,
lc.UserFilter,
[]string{"dn", "uid", "gecos"}, nil))
[]string{"dn", "uid", la.UserAttr}, nil))
if err != nil {
log.Warn("LDAP search error")
return err
@@ -192,7 +201,7 @@ func (la *LdapAuthenticator) Sync() error {
_, ok := users[username]
if !ok {
users[username] = IN_LDAP
newnames[username] = entry.GetAttributeValue("gecos")
newnames[username] = entry.GetAttributeValue(la.UserAttr)
} else {
users[username] = IN_BOTH
}

View File

@@ -26,21 +26,25 @@ var Keys schema.ProgramConfig = schema.ProgramConfig{
StopJobsExceedingWalltime: 0,
ShortRunningJobsDuration: 5 * 60,
UiDefaults: map[string]interface{}{
"analysis_view_histogramMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"analysis_view_scatterPlotMetrics": [][]string{{"flops_any", "mem_bw"}, {"flops_any", "cpu_load"}, {"cpu_load", "mem_bw"}},
"job_view_nodestats_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"job_view_polarPlotMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"job_view_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"plot_general_colorBackground": true,
"plot_general_colorscheme": []string{"#00bfff", "#0000ff", "#ff00ff", "#ff0000", "#ff8000", "#ffff00", "#80ff00"},
"plot_general_lineWidth": 3,
"plot_list_jobsPerPage": 50,
"plot_list_selectedMetrics": []string{"cpu_load", "mem_used", "flops_any", "mem_bw"},
"plot_view_plotsPerRow": 3,
"plot_view_showPolarplot": true,
"plot_view_showRoofline": true,
"plot_view_showStatTable": true,
"system_view_selectedMetric": "cpu_load",
"analysis_view_histogramMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"analysis_view_scatterPlotMetrics": [][]string{{"flops_any", "mem_bw"}, {"flops_any", "cpu_load"}, {"cpu_load", "mem_bw"}},
"job_view_nodestats_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"job_view_polarPlotMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"job_view_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"plot_general_colorBackground": true,
"plot_general_colorscheme": []string{"#00bfff", "#0000ff", "#ff00ff", "#ff0000", "#ff8000", "#ffff00", "#80ff00"},
"plot_general_lineWidth": 3,
"plot_list_jobsPerPage": 50,
"plot_list_selectedMetrics": []string{"cpu_load", "mem_used", "flops_any", "mem_bw"},
"plot_view_plotsPerRow": 3,
"plot_view_showPolarplot": true,
"plot_view_showRoofline": true,
"plot_view_showStatTable": true,
"system_view_selectedMetric": "cpu_load",
"analysis_view_selectedTopEntity": "user",
"analysis_view_selectedTopCategory": "totalWalltime",
"status_view_selectedTopUserCategory": "totalJobs",
"status_view_selectedTopProjectCategory": "totalJobs",
},
}

File diff suppressed because it is too large Load Diff

View File

@@ -22,8 +22,8 @@ type FloatRange struct {
}
type Footprints struct {
Nodehours []schema.Float `json:"nodehours"`
Metrics []*MetricFootprints `json:"metrics"`
TimeWeights *TimeWeights `json:"timeWeights"`
Metrics []*MetricFootprints `json:"metrics"`
}
type HistoPoint struct {
@@ -91,11 +91,16 @@ type JobsStatistics struct {
RunningJobs int `json:"runningJobs"`
ShortJobs int `json:"shortJobs"`
TotalWalltime int `json:"totalWalltime"`
TotalNodes int `json:"totalNodes"`
TotalNodeHours int `json:"totalNodeHours"`
TotalCores int `json:"totalCores"`
TotalCoreHours int `json:"totalCoreHours"`
TotalAccs int `json:"totalAccs"`
TotalAccHours int `json:"totalAccHours"`
HistDuration []*HistoPoint `json:"histDuration"`
HistNumNodes []*HistoPoint `json:"histNumNodes"`
HistNumCores []*HistoPoint `json:"histNumCores"`
HistNumAccs []*HistoPoint `json:"histNumAccs"`
}
type MetricFootprints struct {
@@ -133,6 +138,12 @@ type TimeRangeOutput struct {
To time.Time `json:"to"`
}
type TimeWeights struct {
NodeHours []schema.Float `json:"nodeHours"`
AccHours []schema.Float `json:"accHours"`
CoreHours []schema.Float `json:"coreHours"`
}
type User struct {
Username string `json:"username"`
Name string `json:"name"`
@@ -182,6 +193,59 @@ func (e Aggregate) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}
type SortByAggregate string
const (
SortByAggregateTotalwalltime SortByAggregate = "TOTALWALLTIME"
SortByAggregateTotaljobs SortByAggregate = "TOTALJOBS"
SortByAggregateTotalnodes SortByAggregate = "TOTALNODES"
SortByAggregateTotalnodehours SortByAggregate = "TOTALNODEHOURS"
SortByAggregateTotalcores SortByAggregate = "TOTALCORES"
SortByAggregateTotalcorehours SortByAggregate = "TOTALCOREHOURS"
SortByAggregateTotalaccs SortByAggregate = "TOTALACCS"
SortByAggregateTotalacchours SortByAggregate = "TOTALACCHOURS"
)
var AllSortByAggregate = []SortByAggregate{
SortByAggregateTotalwalltime,
SortByAggregateTotaljobs,
SortByAggregateTotalnodes,
SortByAggregateTotalnodehours,
SortByAggregateTotalcores,
SortByAggregateTotalcorehours,
SortByAggregateTotalaccs,
SortByAggregateTotalacchours,
}
func (e SortByAggregate) IsValid() bool {
switch e {
case SortByAggregateTotalwalltime, SortByAggregateTotaljobs, SortByAggregateTotalnodes, SortByAggregateTotalnodehours, SortByAggregateTotalcores, SortByAggregateTotalcorehours, SortByAggregateTotalaccs, SortByAggregateTotalacchours:
return true
}
return false
}
func (e SortByAggregate) String() string {
return string(e)
}
func (e *SortByAggregate) UnmarshalGQL(v interface{}) error {
str, ok := v.(string)
if !ok {
return fmt.Errorf("enums must be strings")
}
*e = SortByAggregate(str)
if !e.IsValid() {
return fmt.Errorf("%s is not a valid SortByAggregate", str)
}
return nil
}
func (e SortByAggregate) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}
type SortDirectionEnum string
const (
@@ -222,44 +286,3 @@ func (e *SortDirectionEnum) UnmarshalGQL(v interface{}) error {
func (e SortDirectionEnum) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}
type Weights string
const (
WeightsNodeCount Weights = "NODE_COUNT"
WeightsNodeHours Weights = "NODE_HOURS"
)
var AllWeights = []Weights{
WeightsNodeCount,
WeightsNodeHours,
}
func (e Weights) IsValid() bool {
switch e {
case WeightsNodeCount, WeightsNodeHours:
return true
}
return false
}
func (e Weights) String() string {
return string(e)
}
func (e *Weights) UnmarshalGQL(v interface{}) error {
str, ok := v.(string)
if !ok {
return fmt.Errorf("enums must be strings")
}
*e = Weights(str)
if !e.IsValid() {
return fmt.Errorf("%s is not a valid Weights", str)
}
return nil
}
func (e Weights) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}

View File

@@ -244,34 +244,34 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag
}
// JobsStatistics is the resolver for the jobsStatistics field.
func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
var err error
var stats []*model.JobsStatistics
if requireField(ctx, "totalJobs") {
if requireField(ctx, "totalJobs") || requireField(ctx, "totalWalltime") || requireField(ctx, "totalNodes") || requireField(ctx, "totalCores") ||
requireField(ctx, "totalAccs") || requireField(ctx, "totalNodeHours") || requireField(ctx, "totalCoreHours") || requireField(ctx, "totalAccHours") {
if groupBy == nil {
stats, err = r.Repo.JobsStats(ctx, filter)
} else {
stats, err = r.Repo.JobsStatsGrouped(ctx, filter, groupBy)
stats, err = r.Repo.JobsStatsGrouped(ctx, filter, page, sortBy, groupBy)
}
} else {
stats = make([]*model.JobsStatistics, 0, 1)
stats = append(stats,
&model.JobsStatistics{})
stats = append(stats, &model.JobsStatistics{})
}
if groupBy != nil {
if requireField(ctx, "shortJobs") {
stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "short")
}
if requireField(ctx, "RunningJobs") {
if requireField(ctx, "runningJobs") {
stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "running")
}
} else {
if requireField(ctx, "shortJobs") {
stats, err = r.Repo.AddJobCount(ctx, filter, stats, "short")
}
if requireField(ctx, "RunningJobs") {
if requireField(ctx, "runningJobs") {
stats, err = r.Repo.AddJobCount(ctx, filter, stats, "running")
}
}
@@ -280,7 +280,7 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
return nil, err
}
if requireField(ctx, "histDuration") || requireField(ctx, "histNumNodes") {
if requireField(ctx, "histDuration") || requireField(ctx, "histNumNodes") || requireField(ctx, "histNumCores") || requireField(ctx, "histNumAccs") {
if groupBy == nil {
stats[0], err = r.Repo.AddHistograms(ctx, filter, stats[0])
if err != nil {
@@ -294,24 +294,6 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
return stats, nil
}
// JobsCount is the resolver for the jobsCount field.
func (r *queryResolver) JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) ([]*model.Count, error) {
counts, err := r.Repo.CountGroupedJobs(ctx, groupBy, filter, weight, limit)
if err != nil {
log.Warn("Error while counting grouped jobs")
return nil, err
}
res := make([]*model.Count, 0, len(counts))
for name, count := range counts {
res = append(res, &model.Count{
Name: name,
Count: count,
})
}
return res, nil
}
// RooflineHeatmap is the resolver for the rooflineHeatmap field.
func (r *queryResolver) RooflineHeatmap(ctx context.Context, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) {
return r.rooflineHeatmap(ctx, filter, rows, cols, minX, minY, maxX, maxY)

View File

@@ -6,7 +6,6 @@ package graph
import (
"context"
"errors"
"fmt"
"math"
@@ -15,6 +14,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
// "github.com/ClusterCockpit/cc-backend/pkg/archive"
)
const MAX_JOBS_FOR_ANALYSIS = 500
@@ -32,7 +32,7 @@ func (r *queryResolver) rooflineHeatmap(
return nil, err
}
if len(jobs) > MAX_JOBS_FOR_ANALYSIS {
return nil, fmt.Errorf("GRAPH/STATS > too many jobs matched (max: %d)", MAX_JOBS_FOR_ANALYSIS)
return nil, fmt.Errorf("GRAPH/UTIL > too many jobs matched (max: %d)", MAX_JOBS_FOR_ANALYSIS)
}
fcols, frows := float64(cols), float64(rows)
@@ -49,20 +49,24 @@ func (r *queryResolver) rooflineHeatmap(
jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx)
if err != nil {
log.Error("Error while loading metrics for roofline")
log.Errorf("Error while loading roofline metrics for job %d", job.ID)
return nil, err
}
flops_, membw_ := jobdata["flops_any"], jobdata["mem_bw"]
if flops_ == nil && membw_ == nil {
return nil, fmt.Errorf("GRAPH/STATS > 'flops_any' or 'mem_bw' missing for job %d", job.ID)
log.Infof("rooflineHeatmap(): 'flops_any' or 'mem_bw' missing for job %d", job.ID)
continue
// return nil, fmt.Errorf("GRAPH/UTIL > 'flops_any' or 'mem_bw' missing for job %d", job.ID)
}
flops, ok1 := flops_["node"]
membw, ok2 := membw_["node"]
if !ok1 || !ok2 {
log.Info("rooflineHeatmap() query not implemented for where flops_any or mem_bw not available at 'node' level")
continue
// TODO/FIXME:
return nil, errors.New("GRAPH/STATS > todo: rooflineHeatmap() query not implemented for where flops_any or mem_bw not available at 'node' level")
// return nil, errors.New("GRAPH/UTIL > todo: rooflineHeatmap() query not implemented for where flops_any or mem_bw not available at 'node' level")
}
for n := 0; n < len(flops.Series); n++ {
@@ -98,7 +102,7 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
return nil, err
}
if len(jobs) > MAX_JOBS_FOR_ANALYSIS {
return nil, fmt.Errorf("GRAPH/STATS > too many jobs matched (max: %d)", MAX_JOBS_FOR_ANALYSIS)
return nil, fmt.Errorf("GRAPH/UTIL > too many jobs matched (max: %d)", MAX_JOBS_FOR_ANALYSIS)
}
avgs := make([][]schema.Float, len(metrics))
@@ -106,7 +110,11 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
avgs[i] = make([]schema.Float, 0, len(jobs))
}
nodehours := make([]schema.Float, 0, len(jobs))
timeweights := new(model.TimeWeights)
timeweights.NodeHours = make([]schema.Float, 0, len(jobs))
timeweights.AccHours = make([]schema.Float, 0, len(jobs))
timeweights.CoreHours = make([]schema.Float, 0, len(jobs))
for _, job := range jobs {
if job.MonitoringStatus == schema.MonitoringStatusDisabled || job.MonitoringStatus == schema.MonitoringStatusArchivingFailed {
continue
@@ -117,7 +125,18 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
return nil, err
}
nodehours = append(nodehours, schema.Float(float64(job.Duration)/60.0*float64(job.NumNodes)))
// #166 collect arrays: Null values or no null values?
timeweights.NodeHours = append(timeweights.NodeHours, schema.Float(float64(job.Duration)/60.0*float64(job.NumNodes)))
if job.NumAcc > 0 {
timeweights.AccHours = append(timeweights.AccHours, schema.Float(float64(job.Duration)/60.0*float64(job.NumAcc)))
} else {
timeweights.AccHours = append(timeweights.AccHours, schema.Float(1.0))
}
if job.NumHWThreads > 0 {
timeweights.CoreHours = append(timeweights.CoreHours, schema.Float(float64(job.Duration)/60.0*float64(job.NumHWThreads))) // SQLite HWThreads == Cores; numCoresForJob(job)
} else {
timeweights.CoreHours = append(timeweights.CoreHours, schema.Float(1.0))
}
}
res := make([]*model.MetricFootprints, len(avgs))
@@ -129,11 +148,34 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
}
return &model.Footprints{
Nodehours: nodehours,
Metrics: res,
TimeWeights: timeweights,
Metrics: res,
}, nil
}
// func numCoresForJob(job *schema.Job) (numCores int) {
// subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster)
// if scerr != nil {
// return 1
// }
// totalJobCores := 0
// topology := subcluster.Topology
// for _, host := range job.Resources {
// hwthreads := host.HWThreads
// if hwthreads == nil {
// hwthreads = topology.Node
// }
// hostCores, _ := topology.GetCoresFromHWThreads(hwthreads)
// totalJobCores += len(hostCores)
// }
// return totalJobCores
// }
func requireField(ctx context.Context, name string) bool {
fields := graphql.CollectAllFields(ctx)

View File

@@ -506,7 +506,7 @@ func (ccms *CCMetricStore) LoadStats(
metrics []string,
ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) {
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode})
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}) // #166 Add scope shere for analysis view accelerator normalization?
if err != nil {
log.Warn("Error while building query")
return nil, err
@@ -533,7 +533,9 @@ func (ccms *CCMetricStore) LoadStats(
metric := ccms.toLocalName(query.Metric)
data := res[0]
if data.Error != nil {
return nil, fmt.Errorf("METRICDATA/CCMS > fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error)
log.Infof("fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error)
continue
// return nil, fmt.Errorf("METRICDATA/CCMS > fetching %s for node %s failed: %s", metric, query.Hostname, *data.Error)
}
metricdata, ok := stats[metric]
@@ -543,7 +545,9 @@ func (ccms *CCMetricStore) LoadStats(
}
if data.Avg.IsNaN() || data.Min.IsNaN() || data.Max.IsNaN() {
return nil, fmt.Errorf("METRICDATA/CCMS > fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN")
log.Infof("fetching %s for node %s failed: one of avg/min/max is NaN", metric, query.Hostname)
continue
// return nil, fmt.Errorf("METRICDATA/CCMS > fetching %s for node %s failed: %s", metric, query.Hostname, "avg/min/max is NaN")
}
metricdata[query.Hostname] = schema.MetricStatistics{

View File

@@ -182,7 +182,7 @@ func LoadAverages(
ctx context.Context) error {
if job.State != schema.JobStateRunning && useArchive {
return archive.LoadAveragesFromArchive(job, metrics, data)
return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here?
}
repo, ok := metricDataRepos[job.Cluster]
@@ -190,7 +190,7 @@ func LoadAverages(
return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster)
}
stats, err := repo.LoadStats(job, metrics, ctx)
stats, err := repo.LoadStats(job, metrics, ctx) // #166 how to handle stats for acc normalizazion?
if err != nil {
log.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project)
return err

View File

@@ -455,69 +455,6 @@ func (r *JobRepository) DeleteJobById(id int64) error {
return err
}
// TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC;
func (r *JobRepository) CountGroupedJobs(
ctx context.Context,
aggreg model.Aggregate,
filters []*model.JobFilter,
weight *model.Weights,
limit *int) (map[string]int, error) {
start := time.Now()
if !aggreg.IsValid() {
return nil, errors.New("invalid aggregate")
}
runner := (sq.BaseRunner)(r.stmtCache)
count := "count(*) as count"
if weight != nil {
switch *weight {
case model.WeightsNodeCount:
count = "sum(job.num_nodes) as count"
case model.WeightsNodeHours:
now := time.Now().Unix()
count = fmt.Sprintf(`sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) as count`, now)
runner = r.DB
default:
log.Debugf("CountGroupedJobs() Weight %v unknown.", *weight)
}
}
q, qerr := SecurityCheck(ctx, sq.Select("job."+string(aggreg), count).From("job").GroupBy("job."+string(aggreg)).OrderBy("count DESC"))
if qerr != nil {
return nil, qerr
}
for _, f := range filters {
q = BuildWhereClause(f, q)
}
if limit != nil {
q = q.Limit(uint64(*limit))
}
counts := map[string]int{}
rows, err := q.RunWith(runner).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
for rows.Next() {
var group string
var count int
if err := rows.Scan(&group, &count); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
counts[group] = count
}
log.Debugf("Timer CountGroupedJobs %s", time.Since(start))
return counts, nil
}
func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).

View File

@@ -18,13 +18,17 @@ import (
sq "github.com/Masterminds/squirrel"
)
// SecurityCheck-less, private: Returns a list of jobs matching the provided filters. page and order are optional-
func (r *JobRepository) queryJobs(
query sq.SelectBuilder,
func (r *JobRepository) QueryJobs(
ctx context.Context,
filters []*model.JobFilter,
page *model.PageRequest,
order *model.OrderByInput) ([]*schema.Job, error) {
query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job"))
if qerr != nil {
return nil, qerr
}
if order != nil {
field := toSnakeCase(order.Field)
@@ -67,34 +71,15 @@ func (r *JobRepository) queryJobs(
return jobs, nil
}
// testFunction for queryJobs
func (r *JobRepository) testQueryJobs(
filters []*model.JobFilter,
page *model.PageRequest,
order *model.OrderByInput) ([]*schema.Job, error) {
return r.queryJobs(sq.Select(jobColumns...).From("job"), filters, page, order)
}
// Public function with added securityCheck, calls private queryJobs function above
func (r *JobRepository) QueryJobs(
func (r *JobRepository) CountJobs(
ctx context.Context,
filters []*model.JobFilter,
page *model.PageRequest,
order *model.OrderByInput) ([]*schema.Job, error) {
query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job"))
if qerr != nil {
return nil, qerr
}
return r.queryJobs(query, filters, page, order)
}
// SecurityCheck-less, private: Returns the number of jobs matching the filters
func (r *JobRepository) countJobs(query sq.SelectBuilder,
filters []*model.JobFilter) (int, error) {
query, qerr := SecurityCheck(ctx, sq.Select("count(*)").From("job"))
if qerr != nil {
return 0, qerr
}
for _, f := range filters {
query = BuildWhereClause(f, query)
}
@@ -107,27 +92,6 @@ func (r *JobRepository) countJobs(query sq.SelectBuilder,
return count, nil
}
// testFunction for countJobs
func (r *JobRepository) testCountJobs(
filters []*model.JobFilter) (int, error) {
return r.countJobs(sq.Select("count(*)").From("job"), filters)
}
// Public function with added securityCheck, calls private countJobs function above
func (r *JobRepository) CountJobs(
ctx context.Context,
filters []*model.JobFilter) (int, error) {
query, qerr := SecurityCheck(ctx, sq.Select("count(*)").From("job"))
if qerr != nil {
return 0, qerr
}
return r.countJobs(query, filters)
}
func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error) {
user := GetUserFromContext(ctx)
if user == nil {

View File

@@ -5,10 +5,12 @@
package repository
import (
"context"
"testing"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
_ "github.com/mattn/go-sqlite3"
)
@@ -94,7 +96,7 @@ func BenchmarkDB_CountJobs(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := db.testCountJobs([]*model.JobFilter{filter})
_, err := db.CountJobs(getContext(b), []*model.JobFilter{filter})
noErr(b, err)
}
})
@@ -118,20 +120,37 @@ func BenchmarkDB_QueryJobs(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := db.testQueryJobs([]*model.JobFilter{filter}, page, order)
_, err := db.QueryJobs(getContext(b), []*model.JobFilter{filter}, page, order)
noErr(b, err)
}
})
})
}
func getContext(tb testing.TB) context.Context {
tb.Helper()
var roles []string
roles = append(roles, schema.GetRoleString(schema.RoleAdmin))
projects := make([]string, 0)
user := &schema.User{
Username: "demo",
Name: "The man",
Roles: roles,
Projects: projects,
AuthSource: schema.AuthViaLDAP,
}
ctx := context.Background()
return context.WithValue(ctx, ContextUserKey, user)
}
func setup(tb testing.TB) *JobRepository {
tb.Helper()
log.Init("warn", true)
dbfile := "testdata/job.db"
err := MigrateDB("sqlite3", dbfile)
noErr(tb, err)
Connect("sqlite3", dbfile)
return GetJobRepository()
}

View File

@@ -23,6 +23,17 @@ var groupBy2column = map[model.Aggregate]string{
model.AggregateCluster: "job.cluster",
}
var sortBy2column = map[model.SortByAggregate]string{
model.SortByAggregateTotaljobs: "totalJobs",
model.SortByAggregateTotalwalltime: "totalWalltime",
model.SortByAggregateTotalnodes: "totalNodes",
model.SortByAggregateTotalnodehours: "totalNodeHours",
model.SortByAggregateTotalcores: "totalCores",
model.SortByAggregateTotalcorehours: "totalCoreHours",
model.SortByAggregateTotalaccs: "totalAccs",
model.SortByAggregateTotalacchours: "totalAccHours",
}
func (r *JobRepository) buildCountQuery(
filter []*model.JobFilter,
kind string,
@@ -59,21 +70,30 @@ func (r *JobRepository) buildStatsQuery(
var query sq.SelectBuilder
castType := r.getCastType()
// fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType)
if col != "" {
// Scan columns: id, totalJobs, totalWalltime, totalNodeHours, totalCoreHours, totalAccHours
query = sq.Select(col, "COUNT(job.id)",
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s)", castType),
// Scan columns: id, totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
query = sq.Select(col, "COUNT(job.id) as totalJobs",
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s) as totalWalltime`, time.Now().Unix(), castType),
fmt.Sprintf(`CAST(SUM(job.num_nodes) as %s) as totalNodes`, castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as %s) as totalNodeHours`, time.Now().Unix(), castType),
fmt.Sprintf(`CAST(SUM(job.num_hwthreads) as %s) as totalCores`, castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as %s) as totalCoreHours`, time.Now().Unix(), castType),
fmt.Sprintf(`CAST(SUM(job.num_acc) as %s) as totalAccs`, castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s) as totalAccHours`, time.Now().Unix(), castType),
).From("job").GroupBy(col)
} else {
// Scan columns: totalJobs, totalWalltime, totalNodeHours, totalCoreHours, totalAccHours
// Scan columns: totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
query = sq.Select("COUNT(job.id)",
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s)", castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s)`, time.Now().Unix(), castType),
fmt.Sprintf(`CAST(SUM(job.num_nodes) as %s)`, castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as %s)`, time.Now().Unix(), castType),
fmt.Sprintf(`CAST(SUM(job.num_hwthreads) as %s)`, castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as %s)`, time.Now().Unix(), castType),
fmt.Sprintf(`CAST(SUM(job.num_acc) as %s)`, castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s)`, time.Now().Unix(), castType),
).From("job")
}
@@ -112,16 +132,28 @@ func (r *JobRepository) getCastType() string {
func (r *JobRepository) JobsStatsGrouped(
ctx context.Context,
filter []*model.JobFilter,
page *model.PageRequest,
sortBy *model.SortByAggregate,
groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
start := time.Now()
col := groupBy2column[*groupBy]
query := r.buildStatsQuery(filter, col)
query, err := SecurityCheck(ctx, query)
if err != nil {
return nil, err
}
if sortBy != nil {
sortBy := sortBy2column[*sortBy]
query = query.OrderBy(fmt.Sprintf("%s DESC", sortBy))
}
if page != nil && page.ItemsPerPage != -1 {
limit := uint64(page.ItemsPerPage)
query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit)
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying DB for job statistics")
@@ -132,15 +164,36 @@ func (r *JobRepository) JobsStatsGrouped(
for rows.Next() {
var id sql.NullString
var jobs, walltime, nodeHours, coreHours, accHours sql.NullInt64
if err := rows.Scan(&id, &jobs, &walltime, &nodeHours, &coreHours, &accHours); err != nil {
var jobs, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64
if err := rows.Scan(&id, &jobs, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
var totalCoreHours, totalAccHours int
var totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours int
if jobs.Valid {
totalJobs = int(jobs.Int64)
}
if walltime.Valid {
totalWalltime = int(walltime.Int64)
}
if nodes.Valid {
totalNodes = int(nodes.Int64)
}
if cores.Valid {
totalCores = int(cores.Int64)
}
if accs.Valid {
totalAccs = int(accs.Int64)
}
if nodeHours.Valid {
totalNodeHours = int(nodeHours.Int64)
}
if coreHours.Valid {
totalCoreHours = int(coreHours.Int64)
}
@@ -154,9 +207,13 @@ func (r *JobRepository) JobsStatsGrouped(
&model.JobsStatistics{
ID: id.String,
Name: name,
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalJobs: totalJobs,
TotalWalltime: totalWalltime,
TotalNodes: totalNodes,
TotalNodeHours: totalNodeHours,
TotalCores: totalCores,
TotalCoreHours: totalCoreHours,
TotalAccs: totalAccs,
TotalAccHours: totalAccHours})
} else {
stats = append(stats,
@@ -164,7 +221,11 @@ func (r *JobRepository) JobsStatsGrouped(
ID: id.String,
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalNodes: totalNodes,
TotalNodeHours: totalNodeHours,
TotalCores: totalCores,
TotalCoreHours: totalCoreHours,
TotalAccs: totalAccs,
TotalAccHours: totalAccHours})
}
}
@@ -188,15 +249,18 @@ func (r *JobRepository) JobsStats(
row := query.RunWith(r.DB).QueryRow()
stats := make([]*model.JobsStatistics, 0, 1)
var jobs, walltime, nodeHours, coreHours, accHours sql.NullInt64
if err := row.Scan(&jobs, &walltime, &nodeHours, &coreHours, &accHours); err != nil {
var jobs, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64
if err := row.Scan(&jobs, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if jobs.Valid {
var totalCoreHours, totalAccHours int
var totalNodeHours, totalCoreHours, totalAccHours int
if nodeHours.Valid {
totalNodeHours = int(nodeHours.Int64)
}
if coreHours.Valid {
totalCoreHours = int(coreHours.Int64)
}
@@ -207,6 +271,7 @@ func (r *JobRepository) JobsStats(
&model.JobsStatistics{
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalNodeHours: totalNodeHours,
TotalCoreHours: totalCoreHours,
TotalAccHours: totalAccHours})
}
@@ -321,7 +386,7 @@ func (r *JobRepository) AddJobCount(
return nil, err
}
counts := make(map[string]int)
var count int
for rows.Next() {
var cnt sql.NullInt64
@@ -329,20 +394,22 @@ func (r *JobRepository) AddJobCount(
log.Warn("Error while scanning rows")
return nil, err
}
count = int(cnt.Int64)
}
switch kind {
case "running":
for _, s := range stats {
s.RunningJobs = counts[s.ID]
s.RunningJobs = count
}
case "short":
for _, s := range stats {
s.ShortJobs = counts[s.ID]
s.ShortJobs = count
}
}
log.Debugf("Timer JobJobCount %s", time.Since(start))
log.Debugf("Timer AddJobCount %s", time.Since(start))
return stats, nil
}
@@ -367,6 +434,18 @@ func (r *JobRepository) AddHistograms(
return nil, err
}
stat.HistNumCores, err = r.jobsStatisticsHistogram(ctx, "job.num_hwthreads as value", filter)
if err != nil {
log.Warn("Error while loading job statistics histogram: num hwthreads")
return nil, err
}
stat.HistNumAccs, err = r.jobsStatisticsHistogram(ctx, "job.num_acc as value", filter)
if err != nil {
log.Warn("Error while loading job statistics histogram: num acc")
return nil, err
}
log.Debugf("Timer AddHistograms %s", time.Since(start))
return stat, nil
}

View File

@@ -7,6 +7,8 @@ package repository
import (
"fmt"
"testing"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
)
func TestBuildJobStatsQuery(t *testing.T) {
@@ -19,3 +21,15 @@ func TestBuildJobStatsQuery(t *testing.T) {
fmt.Printf("SQL: %s\n", sql)
}
func TestJobStats(t *testing.T) {
r := setup(t)
filter := &model.JobFilter{}
stats, err := r.JobsStats(getContext(t), []*model.JobFilter{filter})
noErr(t, err)
if stats[0].TotalJobs != 6 {
t.Fatalf("Want 98, Got %d", stats[0].TotalJobs)
}
}

View File

@@ -134,8 +134,12 @@ func (r *UserRepository) AddUser(user *schema.User) error {
func (r *UserRepository) DelUser(username string) error {
_, err := r.DB.Exec(`DELETE FROM user WHERE user.username = ?`, username)
log.Errorf("Error while deleting user '%s' from DB", username)
return err
if err != nil {
log.Errorf("Error while deleting user '%s' from DB", username)
return err
}
log.Infof("deleted user '%s' from DB", username)
return nil
}
func (r *UserRepository) ListUsers(specialsOnly bool) ([]*schema.User, error) {