Merge branch 'master' into 134-job-tagging

This commit is contained in:
2024-04-22 11:03:13 +02:00
170 changed files with 12484 additions and 8422 deletions

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,5 +1,4 @@
// Code generated by swaggo/swag. DO NOT EDIT.
// Package api Code generated by swaggo/swag. DO NOT EDIT
package api
import "github.com/swaggo/swag"
@@ -24,6 +23,63 @@ const docTemplate = `{
"host": "{{.Host}}",
"basePath": "{{.BasePath}}",
"paths": {
"/clusters/": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Get a list of all cluster configs. Specific cluster can be requested using query parameter.",
"produces": [
"application/json"
],
"tags": [
"Cluster query"
],
"summary": "Lists all cluster configs",
"parameters": [
{
"type": "string",
"description": "Job Cluster",
"name": "cluster",
"in": "query"
}
],
"responses": {
"200": {
"description": "Array of clusters",
"schema": {
"$ref": "#/definitions/api.GetClustersApiResponse"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"401": {
"description": "Unauthorized",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"403": {
"description": "Forbidden",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
}
}
}
},
"/jobs/": {
"get": {
"security": [
@@ -334,6 +390,76 @@ const docTemplate = `{
}
}
},
"/jobs/edit_meta/{id}": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Edit key value pairs in job metadata json\nIf a key already exists its content will be overwritten",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"Job add and modify"
],
"summary": "Edit meta-data json",
"parameters": [
{
"type": "integer",
"description": "Job Database ID",
"name": "id",
"in": "path",
"required": true
},
{
"description": "Kay value pair to add",
"name": "request",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/api.EditMetaRequest"
}
}
],
"responses": {
"200": {
"description": "Updated job resource",
"schema": {
"$ref": "#/definitions/schema.Job"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"401": {
"description": "Unauthorized",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"404": {
"description": "Job does not exist",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
}
}
}
},
"/jobs/start_job/": {
"post": {
"security": [
@@ -631,6 +757,80 @@ const docTemplate = `{
}
},
"/jobs/{id}": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Job to get is specified by database ID\nReturns full job resource information according to 'JobMeta' scheme and all metrics according to 'JobData'.",
"produces": [
"application/json"
],
"tags": [
"Job query"
],
"summary": "Get job meta and optional all metric data",
"parameters": [
{
"type": "integer",
"description": "Database ID of Job",
"name": "id",
"in": "path",
"required": true
},
{
"type": "boolean",
"description": "Include all available metrics",
"name": "all-metrics",
"in": "query"
}
],
"responses": {
"200": {
"description": "Job resource",
"schema": {
"$ref": "#/definitions/api.GetJobApiResponse"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"401": {
"description": "Unauthorized",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"403": {
"description": "Forbidden",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"404": {
"description": "Resource not found",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"422": {
"description": "Unprocessable Entity: finding job failed: sql: no rows in result set",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.ErrorResponse"
}
}
}
},
"post": {
"security": [
{
@@ -647,7 +847,7 @@ const docTemplate = `{
"tags": [
"Job query"
],
"summary": "Get complete job meta and metric data",
"summary": "Get job meta and configurable metric data",
"parameters": [
{
"type": "integer",
@@ -1121,6 +1321,19 @@ const docTemplate = `{
}
}
},
"api.EditMetaRequest": {
"type": "object",
"properties": {
"key": {
"type": "string",
"example": "jobScript"
},
"value": {
"type": "string",
"example": "bash script"
}
}
},
"api.ErrorResponse": {
"type": "object",
"properties": {
@@ -1134,6 +1347,18 @@ const docTemplate = `{
}
}
},
"api.GetClustersApiResponse": {
"type": "object",
"properties": {
"clusters": {
"description": "Array of clusters",
"type": "array",
"items": {
"$ref": "#/definitions/schema.Cluster"
}
}
}
},
"api.GetJobApiResponse": {
"type": "object",
"properties": {
@@ -1229,6 +1454,40 @@ const docTemplate = `{
}
}
},
"schema.Accelerator": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"model": {
"type": "string"
},
"type": {
"type": "string"
}
}
},
"schema.Cluster": {
"type": "object",
"properties": {
"metricConfig": {
"type": "array",
"items": {
"$ref": "#/definitions/schema.MetricConfig"
}
},
"name": {
"type": "string"
},
"subClusters": {
"type": "array",
"items": {
"$ref": "#/definitions/schema.SubCluster"
}
}
}
},
"schema.Job": {
"description": "Information of a HPC job.",
"type": "object",
@@ -1259,6 +1518,10 @@ const docTemplate = `{
"minimum": 0,
"example": 1
},
"flopsAnyAvg": {
"description": "FlopsAnyAvg as Float64",
"type": "number"
},
"id": {
"description": "The unique identifier of a job in the database",
"type": "integer"
@@ -1285,6 +1548,18 @@ const docTemplate = `{
],
"example": "completed"
},
"loadAvg": {
"description": "LoadAvg as Float64",
"type": "number"
},
"memBwAvg": {
"description": "MemBwAvg as Float64",
"type": "number"
},
"memUsedMax": {
"description": "MemUsedMax as Float64",
"type": "number"
},
"metaData": {
"description": "Additional information about the job",
"type": "object",
@@ -1611,6 +1886,44 @@ const docTemplate = `{
}
}
},
"schema.MetricConfig": {
"type": "object",
"properties": {
"aggregation": {
"type": "string"
},
"alert": {
"type": "number"
},
"caution": {
"type": "number"
},
"name": {
"type": "string"
},
"normal": {
"type": "number"
},
"peak": {
"type": "number"
},
"scope": {
"$ref": "#/definitions/schema.MetricScope"
},
"subClusters": {
"type": "array",
"items": {
"$ref": "#/definitions/schema.SubClusterConfig"
}
},
"timestep": {
"type": "integer"
},
"unit": {
"$ref": "#/definitions/schema.Unit"
}
}
},
"schema.MetricScope": {
"type": "string",
"enum": [
@@ -1646,6 +1959,17 @@ const docTemplate = `{
}
}
},
"schema.MetricValue": {
"type": "object",
"properties": {
"unit": {
"$ref": "#/definitions/schema.Unit"
},
"value": {
"type": "number"
}
}
},
"schema.Resource": {
"description": "A resource used by a job",
"type": "object",
@@ -1726,6 +2050,64 @@ const docTemplate = `{
}
}
},
"schema.SubCluster": {
"type": "object",
"properties": {
"coresPerSocket": {
"type": "integer"
},
"flopRateScalar": {
"$ref": "#/definitions/schema.MetricValue"
},
"flopRateSimd": {
"$ref": "#/definitions/schema.MetricValue"
},
"memoryBandwidth": {
"$ref": "#/definitions/schema.MetricValue"
},
"name": {
"type": "string"
},
"nodes": {
"type": "string"
},
"processorType": {
"type": "string"
},
"socketsPerNode": {
"type": "integer"
},
"threadsPerCore": {
"type": "integer"
},
"topology": {
"$ref": "#/definitions/schema.Topology"
}
}
},
"schema.SubClusterConfig": {
"type": "object",
"properties": {
"alert": {
"type": "number"
},
"caution": {
"type": "number"
},
"name": {
"type": "string"
},
"normal": {
"type": "number"
},
"peak": {
"type": "number"
},
"remove": {
"type": "boolean"
}
}
},
"schema.Tag": {
"description": "Defines a tag using name and type.",
"type": "object",
@@ -1746,6 +2128,59 @@ const docTemplate = `{
}
}
},
"schema.Topology": {
"type": "object",
"properties": {
"accelerators": {
"type": "array",
"items": {
"$ref": "#/definitions/schema.Accelerator"
}
},
"core": {
"type": "array",
"items": {
"type": "array",
"items": {
"type": "integer"
}
}
},
"die": {
"type": "array",
"items": {
"type": "array",
"items": {
"type": "integer"
}
}
},
"memoryDomain": {
"type": "array",
"items": {
"type": "array",
"items": {
"type": "integer"
}
}
},
"node": {
"type": "array",
"items": {
"type": "integer"
}
},
"socket": {
"type": "array",
"items": {
"type": "array",
"items": {
"type": "integer"
}
}
}
}
},
"schema.Unit": {
"type": "object",
"properties": {

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
@@ -70,12 +70,16 @@ func (api *RestApi) MountRoutes(r *mux.Router) {
r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet)
r.HandleFunc("/jobs/{id}", api.getJobById).Methods(http.MethodPost)
r.HandleFunc("/jobs/{id}", api.getCompleteJobById).Methods(http.MethodGet)
r.HandleFunc("/jobs/tag_job/{id}", api.tagJob).Methods(http.MethodPost, http.MethodPatch)
r.HandleFunc("/jobs/edit_meta/{id}", api.editMeta).Methods(http.MethodPost, http.MethodPatch)
r.HandleFunc("/jobs/metrics/{id}", api.getJobMetrics).Methods(http.MethodGet)
r.HandleFunc("/jobs/delete_job/", api.deleteJobByRequest).Methods(http.MethodDelete)
r.HandleFunc("/jobs/delete_job/{id}", api.deleteJobById).Methods(http.MethodDelete)
r.HandleFunc("/jobs/delete_job_before/{ts}", api.deleteJobBefore).Methods(http.MethodDelete)
r.HandleFunc("/clusters/", api.getClusters).Methods(http.MethodGet)
if api.MachineStateDir != "" {
r.HandleFunc("/machine_state/{cluster}/{host}", api.getMachineState).Methods(http.MethodGet)
r.HandleFunc("/machine_state/{cluster}/{host}", api.putMachineState).Methods(http.MethodPut, http.MethodPost)
@@ -110,12 +114,11 @@ type UpdateUserApiResponse struct {
// StopJobApiRequest model
type StopJobApiRequest struct {
// Stop Time of job as epoch
JobId *int64 `json:"jobId" example:"123000"`
Cluster *string `json:"cluster" example:"fritz"`
StartTime *int64 `json:"startTime" example:"1649723812"`
State schema.JobState `json:"jobState" validate:"required" example:"completed"`
StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"`
State schema.JobState `json:"jobState" validate:"required" example:"completed"` // Final job state
JobId *int64 `json:"jobId" example:"123000"` // Cluster Job ID of job
Cluster *string `json:"cluster" example:"fritz"` // Cluster of job
StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch
}
// DeleteJobApiRequest model
@@ -132,6 +135,11 @@ type GetJobsApiResponse struct {
Page int `json:"page"` // Page id returned
}
// GetClustersApiResponse model
type GetClustersApiResponse struct {
Clusters []*schema.Cluster `json:"clusters"` // Array of clusters
}
// ErrorResponse model
type ErrorResponse struct {
// Statustext of Errorcode
@@ -146,6 +154,12 @@ type ApiTag struct {
Name string `json:"name" example:"Testjob"` // Tag Name
}
// ApiMeta model
type EditMetaRequest struct {
Key string `json:"key" example:"jobScript"`
Value string `json:"value" example:"bash script"`
}
type TagJobApiRequest []*ApiTag
type GetJobApiRequest []string
@@ -155,10 +169,15 @@ type GetJobApiResponse struct {
Data []*JobMetricWithName
}
type GetCompleteJobApiResponse struct {
Meta *schema.Job
Data schema.JobData
}
type JobMetricWithName struct {
Metric *schema.JobMetric `json:"metric"`
Name string `json:"name"`
Scope schema.MetricScope `json:"scope"`
Metric *schema.JobMetric `json:"metric"`
}
type ApiReturnedUser struct {
@@ -223,6 +242,55 @@ func securedCheck(r *http.Request) error {
return nil
}
// getClusters godoc
// @summary Lists all cluster configs
// @tags Cluster query
// @description Get a list of all cluster configs. Specific cluster can be requested using query parameter.
// @produce json
// @param cluster query string false "Job Cluster"
// @success 200 {object} api.GetClustersApiResponse "Array of clusters"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /clusters/ [get]
func (api *RestApi) getClusters(rw http.ResponseWriter, r *http.Request) {
if user := repository.GetUserFromContext(r.Context()); user != nil &&
!user.HasRole(schema.RoleApi) {
handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw)
return
}
rw.Header().Add("Content-Type", "application/json")
bw := bufio.NewWriter(rw)
defer bw.Flush()
var clusters []*schema.Cluster
if r.URL.Query().Has("cluster") {
name := r.URL.Query().Get("cluster")
cluster := archive.GetCluster(name)
if cluster == nil {
handleError(fmt.Errorf("unknown cluster: %s", name), http.StatusBadRequest, rw)
return
}
clusters = append(clusters, cluster)
} else {
clusters = archive.Clusters
}
payload := GetClustersApiResponse{
Clusters: clusters,
}
if err := json.NewEncoder(bw).Encode(payload); err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
}
// getJobs godoc
// @summary Lists all jobs
// @tags Job query
@@ -243,7 +311,6 @@ func securedCheck(r *http.Request) error {
// @security ApiKeyAuth
// @router /jobs/ [get]
func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
if user := repository.GetUserFromContext(r.Context()); user != nil &&
!user.HasRole(schema.RoleApi) {
@@ -342,10 +409,8 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
if res.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful {
res.Statistics, err = archive.GetStatistics(job)
if err != nil {
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
handleError(err, http.StatusInternalServerError, rw)
return
}
}
@@ -370,14 +435,107 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
}
// getJobById godoc
// @summary Get complete job meta and metric data
// @summary Get job meta and optional all metric data
// @tags Job query
// @description Job to get is specified by database ID
// @description Returns full job resource information according to 'JobMeta' scheme and all metrics according to 'JobData'.
// @produce json
// @param id path int true "Database ID of Job"
// @param all-metrics query bool false "Include all available metrics"
// @success 200 {object} api.GetJobApiResponse "Job resource"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 404 {object} api.ErrorResponse "Resource not found"
// @failure 422 {object} api.ErrorResponse "Unprocessable Entity: finding job failed: sql: no rows in result set"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /jobs/{id} [get]
func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) {
if user := repository.GetUserFromContext(r.Context()); user != nil &&
!user.HasRole(schema.RoleApi) {
handleError(fmt.Errorf("missing role: %v",
schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw)
return
}
// Fetch job from db
id, ok := mux.Vars(r)["id"]
var job *schema.Job
var err error
if ok {
id, e := strconv.ParseInt(id, 10, 64)
if e != nil {
handleError(fmt.Errorf("integer expected in path for id: %w", e), http.StatusBadRequest, rw)
return
}
job, err = api.JobRepository.FindById(id)
} else {
handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw)
return
}
if err != nil {
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
return
}
job.Tags, err = api.JobRepository.GetTags(&job.ID)
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
if _, err = api.JobRepository.FetchMetadata(job); err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
var scopes []schema.MetricScope
if job.NumNodes == 1 {
scopes = []schema.MetricScope{"core"}
} else {
scopes = []schema.MetricScope{"node"}
}
var data schema.JobData
if r.URL.Query().Get("all-metrics") == "true" {
data, err = metricdata.LoadData(job, nil, scopes, r.Context())
if err != nil {
log.Warn("Error while loading job data")
return
}
}
log.Debugf("/api/job/%s: get job %d", id, job.JobID)
rw.Header().Add("Content-Type", "application/json")
bw := bufio.NewWriter(rw)
defer bw.Flush()
payload := GetCompleteJobApiResponse{
Meta: job,
Data: data,
}
if err := json.NewEncoder(bw).Encode(payload); err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
}
// getJobById godoc
// @summary Get job meta and configurable metric data
// @tags Job query
// @description Job to get is specified by database ID
// @description Returns full job resource information according to 'JobMeta' scheme and all metrics according to 'JobData'.
// @accept json
// @produce json
// @param id path int true "Database ID of Job"
// @param request body api.GetJobApiRequest true "Array of metric names"
// @param id path int true "Database ID of Job"
// @param request body api.GetJobApiRequest true "Array of metric names"
// @success 200 {object} api.GetJobApiResponse "Job resource"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
@@ -417,6 +575,18 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
return
}
job.Tags, err = api.JobRepository.GetTags(&job.ID)
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
if _, err = api.JobRepository.FetchMetadata(job); err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
var metrics GetJobApiRequest
if err = decode(r.Body, &metrics); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
@@ -464,6 +634,57 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
}
}
// editMeta godoc
// @summary Edit meta-data json
// @tags Job add and modify
// @description Edit key value pairs in job metadata json
// @description If a key already exists its content will be overwritten
// @accept json
// @produce json
// @param id path int true "Job Database ID"
// @param request body api.EditMetaRequest true "Kay value pair to add"
// @success 200 {object} schema.Job "Updated job resource"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 404 {object} api.ErrorResponse "Job does not exist"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /jobs/edit_meta/{id} [post]
func (api *RestApi) editMeta(rw http.ResponseWriter, r *http.Request) {
if user := repository.GetUserFromContext(r.Context()); user != nil &&
!user.HasRole(schema.RoleApi) {
handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw)
return
}
iid, err := strconv.ParseInt(mux.Vars(r)["id"], 10, 64)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
job, err := api.JobRepository.FindById(iid)
if err != nil {
http.Error(rw, err.Error(), http.StatusNotFound)
return
}
var req EditMetaRequest
if err := decode(r.Body, &req); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
if err := api.JobRepository.UpdateMetadata(job, req.Key, req.Value); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
json.NewEncoder(rw).Encode(job)
}
// tagJob godoc
// @summary Adds one or more tags to a job
// @tags Job add and modify
@@ -873,7 +1094,6 @@ func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
}
func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobApiRequest) {
// Sanity checks
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
handleError(errors.New("stopTime must be larger than startTime and only running jobs can be stopped"), http.StatusBadRequest, rw)
@@ -1015,12 +1235,13 @@ func (api *RestApi) createUser(rw http.ResponseWriter, r *http.Request) {
Password: password,
Email: email,
Projects: []string{project},
Roles: []string{role}}); err != nil {
Roles: []string{role},
}); err != nil {
http.Error(rw, err.Error(), http.StatusUnprocessableEntity)
return
}
rw.Write([]byte(fmt.Sprintf("User %v successfully created!\n", username)))
fmt.Fprintf(rw, "User %v successfully created!\n", username)
}
// deleteUser godoc

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
@@ -27,18 +27,18 @@ type Authenticator interface {
}
type Authentication struct {
sessionStore *sessions.CookieStore
SessionMaxAge time.Duration
authenticators []Authenticator
sessionStore *sessions.CookieStore
LdapAuth *LdapAuthenticator
JwtAuth *JWTAuthenticator
LocalAuth *LocalAuthenticator
authenticators []Authenticator
SessionMaxAge time.Duration
}
func (auth *Authentication) AuthViaSession(
rw http.ResponseWriter,
r *http.Request) (*schema.User, error) {
r *http.Request,
) (*schema.User, error) {
session, err := auth.sessionStore.Get(r, "session")
if err != nil {
log.Error("Error while getting session store")
@@ -129,10 +129,46 @@ func Init() (*Authentication, error) {
return auth, nil
}
func persistUser(user *schema.User) {
r := repository.GetUserRepository()
_, err := r.GetUser(user.Username)
if err != nil && err != sql.ErrNoRows {
log.Errorf("Error while loading user '%s': %v", user.Username, err)
} else if err == sql.ErrNoRows {
if err := r.AddUser(user); err != nil {
log.Errorf("Error while adding user '%s' to DB: %v", user.Username, err)
}
}
}
func (auth *Authentication) SaveSession(rw http.ResponseWriter, r *http.Request, user *schema.User) error {
session, err := auth.sessionStore.New(r, "session")
if err != nil {
log.Errorf("session creation failed: %s", err.Error())
http.Error(rw, err.Error(), http.StatusInternalServerError)
return err
}
if auth.SessionMaxAge != 0 {
session.Options.MaxAge = int(auth.SessionMaxAge.Seconds())
}
session.Values["username"] = user.Username
session.Values["projects"] = user.Projects
session.Values["roles"] = user.Roles
if err := auth.sessionStore.Save(r, rw, session); err != nil {
log.Warnf("session save failed: %s", err.Error())
http.Error(rw, err.Error(), http.StatusInternalServerError)
return err
}
return nil
}
func (auth *Authentication) Login(
onsuccess http.Handler,
onfailure func(rw http.ResponseWriter, r *http.Request, loginErr error)) http.Handler {
onfailure func(rw http.ResponseWriter, r *http.Request, loginErr error),
) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
username := r.FormValue("username")
var dbUser *schema.User
@@ -161,22 +197,7 @@ func (auth *Authentication) Login(
return
}
session, err := auth.sessionStore.New(r, "session")
if err != nil {
log.Errorf("session creation failed: %s", err.Error())
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
if auth.SessionMaxAge != 0 {
session.Options.MaxAge = int(auth.SessionMaxAge.Seconds())
}
session.Values["username"] = user.Username
session.Values["projects"] = user.Projects
session.Values["roles"] = user.Roles
if err := auth.sessionStore.Save(r, rw, session); err != nil {
log.Warnf("session save failed: %s", err.Error())
http.Error(rw, err.Error(), http.StatusInternalServerError)
if err := auth.SaveSession(rw, r, user); err != nil {
return
}
@@ -193,10 +214,9 @@ func (auth *Authentication) Login(
func (auth *Authentication) Auth(
onsuccess http.Handler,
onfailure func(rw http.ResponseWriter, r *http.Request, authErr error)) http.Handler {
onfailure func(rw http.ResponseWriter, r *http.Request, authErr error),
) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
user, err := auth.JwtAuth.AuthViaJWT(rw, r)
if err != nil {
log.Infof("authentication failed: %s", err.Error())

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
@@ -17,7 +17,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/golang-jwt/jwt/v4"
"github.com/golang-jwt/jwt/v5"
)
type JWTAuthenticator struct {
@@ -49,8 +49,8 @@ func (ja *JWTAuthenticator) Init() error {
func (ja *JWTAuthenticator) AuthViaJWT(
rw http.ResponseWriter,
r *http.Request) (*schema.User, error) {
r *http.Request,
) (*schema.User, error) {
rawtoken := r.Header.Get("X-Auth-Token")
if rawtoken == "" {
rawtoken = r.Header.Get("Authorization")
@@ -73,9 +73,9 @@ func (ja *JWTAuthenticator) AuthViaJWT(
log.Warn("Error while parsing JWT token")
return nil, err
}
if err := token.Claims.Valid(); err != nil {
if !token.Valid {
log.Warn("jwt token claims are not valid")
return nil, err
return nil, errors.New("jwt token claims are not valid")
}
// Token is valid, extract payload
@@ -88,7 +88,6 @@ func (ja *JWTAuthenticator) AuthViaJWT(
if config.Keys.JwtConfig.ValidateUser {
ur := repository.GetUserRepository()
user, err := ur.GetUser(sub)
// Deny any logins for unknown usernames
if err != nil {
log.Warn("Could not find user from JWT in internal database.")
@@ -117,7 +116,6 @@ func (ja *JWTAuthenticator) AuthViaJWT(
// Generate a new JWT that can be used for authentication
func (ja *JWTAuthenticator) ProvideJWT(user *schema.User) (string, error) {
if ja.privateKey == nil {
return "", errors.New("environment variable 'JWT_PRIVATE_KEY' not set")
}

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
@@ -17,7 +17,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/golang-jwt/jwt/v4"
"github.com/golang-jwt/jwt/v5"
)
type JWTCookieSessionAuthenticator struct {
@@ -90,8 +90,8 @@ func (ja *JWTCookieSessionAuthenticator) CanLogin(
user *schema.User,
username string,
rw http.ResponseWriter,
r *http.Request) (*schema.User, bool) {
r *http.Request,
) (*schema.User, bool) {
jc := config.Keys.JwtConfig
cookieName := ""
if jc.CookieName != "" {
@@ -113,8 +113,8 @@ func (ja *JWTCookieSessionAuthenticator) CanLogin(
func (ja *JWTCookieSessionAuthenticator) Login(
user *schema.User,
rw http.ResponseWriter,
r *http.Request) (*schema.User, error) {
r *http.Request,
) (*schema.User, error) {
jc := config.Keys.JwtConfig
jwtCookie, err := r.Cookie(jc.CookieName)
var rawtoken string
@@ -144,10 +144,9 @@ func (ja *JWTCookieSessionAuthenticator) Login(
return nil, err
}
// Check token validity and extract paypload
if err := token.Claims.Valid(); err != nil {
if !token.Valid {
log.Warn("jwt token claims are not valid")
return nil, err
return nil, errors.New("jwt token claims are not valid")
}
claims := token.Claims.(jwt.MapClaims)
@@ -200,9 +199,7 @@ func (ja *JWTCookieSessionAuthenticator) Login(
}
if jc.SyncUserOnLogin {
if err := repository.GetUserRepository().AddUser(user); err != nil {
log.Errorf("Error while adding user '%s' to DB", user.Username)
}
persistUser(user)
}
}

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
@@ -17,7 +17,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/golang-jwt/jwt/v4"
"github.com/golang-jwt/jwt/v5"
)
type JWTSessionAuthenticator struct {
@@ -44,8 +44,8 @@ func (ja *JWTSessionAuthenticator) CanLogin(
user *schema.User,
username string,
rw http.ResponseWriter,
r *http.Request) (*schema.User, bool) {
r *http.Request,
) (*schema.User, bool) {
return user, r.Header.Get("Authorization") != "" ||
r.URL.Query().Get("login-token") != ""
}
@@ -53,8 +53,8 @@ func (ja *JWTSessionAuthenticator) CanLogin(
func (ja *JWTSessionAuthenticator) Login(
user *schema.User,
rw http.ResponseWriter,
r *http.Request) (*schema.User, error) {
r *http.Request,
) (*schema.User, error) {
rawtoken := strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ")
if rawtoken == "" {
rawtoken = r.URL.Query().Get("login-token")
@@ -71,9 +71,9 @@ func (ja *JWTSessionAuthenticator) Login(
return nil, err
}
if err = token.Claims.Valid(); err != nil {
if !token.Valid {
log.Warn("jwt token claims are not valid")
return nil, err
return nil, errors.New("jwt token claims are not valid")
}
claims := token.Claims.(jwt.MapClaims)
@@ -139,9 +139,7 @@ func (ja *JWTSessionAuthenticator) Login(
}
if config.Keys.JwtConfig.SyncUserOnLogin {
if err := repository.GetUserRepository().AddUser(user); err != nil {
log.Errorf("Error while adding user '%s' to DB", user.Username)
}
persistUser(user)
}
}

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
@@ -21,7 +21,7 @@ import (
type LdapAuthenticator struct {
syncPassword string
UserAttr string
UserAttr string
}
var _ Authenticator = (*LdapAuthenticator)(nil)
@@ -74,8 +74,8 @@ func (la *LdapAuthenticator) CanLogin(
user *schema.User,
username string,
rw http.ResponseWriter,
r *http.Request) (*schema.User, bool) {
r *http.Request,
) (*schema.User, bool) {
lc := config.Keys.LdapConfig
if user != nil {
@@ -138,8 +138,8 @@ func (la *LdapAuthenticator) CanLogin(
func (la *LdapAuthenticator) Login(
user *schema.User,
rw http.ResponseWriter,
r *http.Request) (*schema.User, error) {
r *http.Request,
) (*schema.User, error) {
l, err := la.getLdapConnection(false)
if err != nil {
log.Warn("Error while getting ldap connection")
@@ -238,7 +238,6 @@ func (la *LdapAuthenticator) Sync() error {
}
func (la *LdapAuthenticator) getLdapConnection(admin bool) (*ldap.Conn, error) {
lc := config.Keys.LdapConfig
conn, err := ldap.DialURL(lc.Url)
if err != nil {

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

196
internal/auth/oidc.go Normal file
View File

@@ -0,0 +1,196 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package auth
import (
"context"
"crypto/rand"
"encoding/base64"
"io"
"net/http"
"os"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/coreos/go-oidc/v3/oidc"
"github.com/gorilla/mux"
"golang.org/x/oauth2"
)
type OIDC struct {
client *oauth2.Config
provider *oidc.Provider
authentication *Authentication
clientID string
}
func randString(nByte int) (string, error) {
b := make([]byte, nByte)
if _, err := io.ReadFull(rand.Reader, b); err != nil {
return "", err
}
return base64.RawURLEncoding.EncodeToString(b), nil
}
func setCallbackCookie(w http.ResponseWriter, r *http.Request, name, value string) {
c := &http.Cookie{
Name: name,
Value: value,
MaxAge: int(time.Hour.Seconds()),
Secure: r.TLS != nil,
HttpOnly: true,
}
http.SetCookie(w, c)
}
func NewOIDC(a *Authentication) *OIDC {
provider, err := oidc.NewProvider(context.Background(), config.Keys.OpenIDConfig.Provider)
if err != nil {
log.Fatal(err)
}
clientID := os.Getenv("OID_CLIENT_ID")
if clientID == "" {
log.Warn("environment variable 'OID_CLIENT_ID' not set (Open ID connect auth will not work)")
}
clientSecret := os.Getenv("OID_CLIENT_SECRET")
if clientSecret == "" {
log.Warn("environment variable 'OID_CLIENT_SECRET' not set (Open ID connect auth will not work)")
}
client := &oauth2.Config{
ClientID: clientID,
ClientSecret: clientSecret,
Endpoint: provider.Endpoint(),
RedirectURL: "oidc-callback",
Scopes: []string{oidc.ScopeOpenID, "profile", "email"},
}
oa := &OIDC{provider: provider, client: client, clientID: clientID, authentication: a}
return oa
}
func (oa *OIDC) RegisterEndpoints(r *mux.Router) {
r.HandleFunc("/oidc-login", oa.OAuth2Login)
r.HandleFunc("/oidc-callback", oa.OAuth2Callback)
}
func (oa *OIDC) OAuth2Callback(rw http.ResponseWriter, r *http.Request) {
c, err := r.Cookie("state")
if err != nil {
http.Error(rw, "state cookie not found", http.StatusBadRequest)
return
}
state := c.Value
c, err = r.Cookie("verifier")
if err != nil {
http.Error(rw, "verifier cookie not found", http.StatusBadRequest)
return
}
codeVerifier := c.Value
_ = r.ParseForm()
if r.Form.Get("state") != state {
http.Error(rw, "State invalid", http.StatusBadRequest)
return
}
code := r.Form.Get("code")
if code == "" {
http.Error(rw, "Code not found", http.StatusBadRequest)
return
}
token, err := oa.client.Exchange(context.Background(), code, oauth2.VerifierOption(codeVerifier))
if err != nil {
http.Error(rw, "Failed to exchange token: "+err.Error(), http.StatusInternalServerError)
return
}
userInfo, err := oa.provider.UserInfo(context.Background(), oauth2.StaticTokenSource(token))
if err != nil {
http.Error(rw, "Failed to get userinfo: "+err.Error(), http.StatusInternalServerError)
return
}
// // Extract the ID Token from OAuth2 token.
// rawIDToken, ok := token.Extra("id_token").(string)
// if !ok {
// http.Error(rw, "Cannot access idToken", http.StatusInternalServerError)
// }
//
// verifier := oa.provider.Verifier(&oidc.Config{ClientID: oa.clientID})
// // Parse and verify ID Token payload.
// idToken, err := verifier.Verify(context.Background(), rawIDToken)
// if err != nil {
// http.Error(rw, "Failed to extract idToken: "+err.Error(), http.StatusInternalServerError)
// }
projects := make([]string, 0)
// Extract custom claims
var claims struct {
Username string `json:"preferred_username"`
Name string `json:"name"`
Profile struct {
Client struct {
Roles []string `json:"roles"`
} `json:"clustercockpit"`
} `json:"resource_access"`
}
if err := userInfo.Claims(&claims); err != nil {
http.Error(rw, "Failed to extract Claims: "+err.Error(), http.StatusInternalServerError)
}
var roles []string
for _, r := range claims.Profile.Client.Roles {
switch r {
case "user":
roles = append(roles, schema.GetRoleString(schema.RoleUser))
case "admin":
roles = append(roles, schema.GetRoleString(schema.RoleAdmin))
}
}
if len(roles) == 0 {
roles = append(roles, schema.GetRoleString(schema.RoleUser))
}
user := &schema.User{
Username: claims.Username,
Name: claims.Name,
Roles: roles,
Projects: projects,
AuthSource: schema.AuthViaOIDC,
}
if config.Keys.OpenIDConfig.SyncUserOnLogin {
persistUser(user)
}
oa.authentication.SaveSession(rw, r, user)
log.Infof("login successfull: user: %#v (roles: %v, projects: %v)", user.Username, user.Roles, user.Projects)
ctx := context.WithValue(r.Context(), repository.ContextUserKey, user)
http.RedirectHandler("/", http.StatusTemporaryRedirect).ServeHTTP(rw, r.WithContext(ctx))
}
func (oa *OIDC) OAuth2Login(rw http.ResponseWriter, r *http.Request) {
state, err := randString(16)
if err != nil {
http.Error(rw, "Internal error", http.StatusInternalServerError)
return
}
setCallbackCookie(rw, r, "state", state)
// use PKCE to protect against CSRF attacks
codeVerifier := oauth2.GenerateVerifier()
setCallbackCookie(rw, r, "verifier", codeVerifier)
// Redirect user to consent page to ask for permission
url := oa.client.AuthCodeURL(state, oauth2.AccessTypeOffline, oauth2.S256ChallengeOption(codeVerifier))
http.Redirect(rw, r, url, http.StatusFound)
}

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
@@ -31,6 +31,8 @@ var Keys schema.ProgramConfig = schema.ProgramConfig{
"job_view_nodestats_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"job_view_polarPlotMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"job_view_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"job_view_showFootprint": true,
"job_list_usePaging": true,
"plot_general_colorBackground": true,
"plot_general_colorscheme": []string{"#00bfff", "#0000ff", "#ff00ff", "#ff0000", "#ff8000", "#ffff00", "#80ff00"},
"plot_general_lineWidth": 3,

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -78,29 +78,31 @@ type JobMetricWithName struct {
}
type JobResultList struct {
Items []*schema.Job `json:"items"`
Offset *int `json:"offset,omitempty"`
Limit *int `json:"limit,omitempty"`
Count *int `json:"count,omitempty"`
Items []*schema.Job `json:"items"`
Offset *int `json:"offset,omitempty"`
Limit *int `json:"limit,omitempty"`
Count *int `json:"count,omitempty"`
HasNextPage *bool `json:"hasNextPage,omitempty"`
}
type JobsStatistics struct {
ID string `json:"id"`
Name string `json:"name"`
TotalJobs int `json:"totalJobs"`
RunningJobs int `json:"runningJobs"`
ShortJobs int `json:"shortJobs"`
TotalWalltime int `json:"totalWalltime"`
TotalNodes int `json:"totalNodes"`
TotalNodeHours int `json:"totalNodeHours"`
TotalCores int `json:"totalCores"`
TotalCoreHours int `json:"totalCoreHours"`
TotalAccs int `json:"totalAccs"`
TotalAccHours int `json:"totalAccHours"`
HistDuration []*HistoPoint `json:"histDuration"`
HistNumNodes []*HistoPoint `json:"histNumNodes"`
HistNumCores []*HistoPoint `json:"histNumCores"`
HistNumAccs []*HistoPoint `json:"histNumAccs"`
ID string `json:"id"`
Name string `json:"name"`
TotalJobs int `json:"totalJobs"`
RunningJobs int `json:"runningJobs"`
ShortJobs int `json:"shortJobs"`
TotalWalltime int `json:"totalWalltime"`
TotalNodes int `json:"totalNodes"`
TotalNodeHours int `json:"totalNodeHours"`
TotalCores int `json:"totalCores"`
TotalCoreHours int `json:"totalCoreHours"`
TotalAccs int `json:"totalAccs"`
TotalAccHours int `json:"totalAccHours"`
HistDuration []*HistoPoint `json:"histDuration"`
HistNumNodes []*HistoPoint `json:"histNumNodes"`
HistNumCores []*HistoPoint `json:"histNumCores"`
HistNumAccs []*HistoPoint `json:"histNumAccs"`
HistMetrics []*MetricHistoPoints `json:"histMetrics"`
}
type MetricFootprints struct {
@@ -108,6 +110,22 @@ type MetricFootprints struct {
Data []schema.Float `json:"data"`
}
type MetricHistoPoint struct {
Bin *int `json:"bin,omitempty"`
Count int `json:"count"`
Min *int `json:"min,omitempty"`
Max *int `json:"max,omitempty"`
}
type MetricHistoPoints struct {
Metric string `json:"metric"`
Unit string `json:"unit"`
Data []*MetricHistoPoint `json:"data,omitempty"`
}
type Mutation struct {
}
type NodeMetrics struct {
Host string `json:"host"`
SubCluster string `json:"subCluster"`
@@ -124,6 +142,9 @@ type PageRequest struct {
Page int `json:"page"`
}
type Query struct {
}
type StringInput struct {
Eq *string `json:"eq,omitempty"`
Neq *string `json:"neq,omitempty"`

View File

@@ -2,7 +2,7 @@ package graph
// This file will be automatically regenerated based on the schema, any resolver implementations
// will be copied through when generating and any unknown code will be moved to the end.
// Code generated by github.com/99designs/gqlgen version v0.17.36
// Code generated by github.com/99designs/gqlgen version v0.17.45
import (
"context"
@@ -11,6 +11,7 @@ import (
"strconv"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/generated"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
@@ -240,11 +241,27 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag
return nil, err
}
return &model.JobResultList{Items: jobs, Count: &count}, nil
if !config.Keys.UiDefaults["job_list_usePaging"].(bool) {
hasNextPage := false
page.Page += 1
nextJobs, err := r.Repo.QueryJobs(ctx, filter, page, order)
if err != nil {
log.Warn("Error while querying next jobs")
return nil, err
}
if len(nextJobs) > 0 {
hasNextPage = true
}
return &model.JobResultList{Items: jobs, Count: &count, HasNextPage: &hasNextPage}, nil
} else {
return &model.JobResultList{Items: jobs, Count: &count}, nil
}
}
// JobsStatistics is the resolver for the jobsStatistics field.
func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, metrics []string, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
var err error
var stats []*model.JobsStatistics
@@ -291,6 +308,17 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
}
}
if requireField(ctx, "histMetrics") {
if groupBy == nil {
stats[0], err = r.Repo.AddMetricHistograms(ctx, filter, metrics, stats[0])
if err != nil {
return nil, err
}
} else {
return nil, errors.New("metric histograms only implemented without groupBy argument")
}
}
return stats, nil
}

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
@@ -32,32 +32,32 @@ type CCMetricStoreConfig struct {
}
type CCMetricStore struct {
here2there map[string]string
there2here map[string]string
client http.Client
jwt string
url string
queryEndpoint string
client http.Client
here2there map[string]string
there2here map[string]string
}
type ApiQueryRequest struct {
Cluster string `json:"cluster"`
Queries []ApiQuery `json:"queries"`
ForAllNodes []string `json:"for-all-nodes"`
From int64 `json:"from"`
To int64 `json:"to"`
WithStats bool `json:"with-stats"`
WithData bool `json:"with-data"`
Queries []ApiQuery `json:"queries"`
ForAllNodes []string `json:"for-all-nodes"`
}
type ApiQuery struct {
Type *string `json:"type,omitempty"`
SubType *string `json:"subtype,omitempty"`
Metric string `json:"metric"`
Hostname string `json:"host"`
Aggregate bool `json:"aggreg"`
Type *string `json:"type,omitempty"`
TypeIds []string `json:"type-ids,omitempty"`
SubType *string `json:"subtype,omitempty"`
SubTypeIds []string `json:"subtype-ids,omitempty"`
Aggregate bool `json:"aggreg"`
}
type ApiQueryResponse struct {
@@ -67,16 +67,15 @@ type ApiQueryResponse struct {
type ApiMetricData struct {
Error *string `json:"error"`
Data []schema.Float `json:"data"`
From int64 `json:"from"`
To int64 `json:"to"`
Data []schema.Float `json:"data"`
Avg schema.Float `json:"avg"`
Min schema.Float `json:"min"`
Max schema.Float `json:"max"`
}
func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error {
var config CCMetricStoreConfig
if err := json.Unmarshal(rawConfig, &config); err != nil {
log.Warn("Error while unmarshaling raw json config")
@@ -122,8 +121,8 @@ func (ccms *CCMetricStore) toLocalName(metric string) string {
func (ccms *CCMetricStore) doRequest(
ctx context.Context,
body *ApiQueryRequest) (*ApiQueryResponse, error) {
body *ApiQueryRequest,
) (*ApiQueryResponse, error) {
buf := &bytes.Buffer{}
if err := json.NewEncoder(buf).Encode(body); err != nil {
log.Warn("Error while encoding request body")
@@ -162,8 +161,8 @@ func (ccms *CCMetricStore) LoadData(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context) (schema.JobData, error) {
ctx context.Context,
) (schema.JobData, error) {
queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes)
if err != nil {
log.Warn("Error while building queries")
@@ -186,7 +185,7 @@ func (ccms *CCMetricStore) LoadData(
}
var errors []string
var jobData schema.JobData = make(schema.JobData)
jobData := make(schema.JobData)
for i, row := range resBody.Results {
query := req.Queries[i]
metric := ccms.toLocalName(query.Metric)
@@ -206,7 +205,7 @@ func (ccms *CCMetricStore) LoadData(
jobData[metric][scope] = jobMetric
}
for _, res := range row {
for ndx, res := range row {
if res.Error != nil {
/* Build list for "partial errors", if any */
errors = append(errors, fmt.Sprintf("failed to fetch '%s' from host '%s': %s", query.Metric, query.Hostname, *res.Error))
@@ -216,7 +215,7 @@ func (ccms *CCMetricStore) LoadData(
id := (*string)(nil)
if query.Type != nil {
id = new(string)
*id = query.TypeIds[0]
*id = query.TypeIds[ndx]
}
if res.Avg.IsNaN() || res.Min.IsNaN() || res.Max.IsNaN() {
@@ -267,8 +266,8 @@ var (
func (ccms *CCMetricStore) buildQueries(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope) ([]ApiQuery, []schema.MetricScope, error) {
scopes []schema.MetricScope,
) ([]ApiQuery, []schema.MetricScope, error) {
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
assignedScope := []schema.MetricScope{}
@@ -313,6 +312,11 @@ func (ccms *CCMetricStore) buildQueries(
// Accelerator -> Accelerator (Use "accelerator" scope if requested scope is lower than node)
if nativeScope == schema.MetricScopeAccelerator && scope.LT(schema.MetricScopeNode) {
if scope != schema.MetricScopeAccelerator {
// Skip all other catched cases
continue
}
queries = append(queries, ApiQuery{
Metric: remoteName,
Hostname: host.Hostname,
@@ -504,8 +508,8 @@ func (ccms *CCMetricStore) buildQueries(
func (ccms *CCMetricStore) LoadStats(
job *schema.Job,
metrics []string,
ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) {
ctx context.Context,
) (map[string]map[string]schema.MetricStatistics, error) {
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}) // #166 Add scope shere for analysis view accelerator normalization?
if err != nil {
log.Warn("Error while building query")
@@ -566,8 +570,8 @@ func (ccms *CCMetricStore) LoadNodeData(
metrics, nodes []string,
scopes []schema.MetricScope,
from, to time.Time,
ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
ctx context.Context,
) (map[string]map[string][]*schema.JobMetric, error) {
req := ApiQueryRequest{
Cluster: cluster,
From: from.Unix(),
@@ -652,7 +656,6 @@ func (ccms *CCMetricStore) LoadNodeData(
}
func intToStringSlice(is []int) []string {
ss := make([]string, len(is))
for i, x := range is {
ss[i] = strconv.Itoa(x)

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
@@ -38,7 +38,6 @@ var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepos
var useArchive bool
func Init(disableArchive bool) error {
useArchive = !disableArchive
for _, cluster := range config.Keys.Clusters {
if cluster.MetricDataRepository != nil {
@@ -80,7 +79,8 @@ var cache *lrucache.Cache = lrucache.New(128 * 1024 * 1024)
func LoadData(job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context) (schema.JobData, error) {
ctx context.Context,
) (schema.JobData, error) {
data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) {
var jd schema.JobData
var err error
@@ -109,7 +109,8 @@ func LoadData(job *schema.Job,
jd, err = repo.LoadData(job, metrics, scopes, ctx)
if err != nil {
if len(jd) != 0 {
log.Warnf("partial error: %s", err.Error())
log.Errorf("partial error: %s", err.Error())
return err, 0, 0
} else {
log.Error("Error while loading job data from metric repository")
return err, 0, 0
@@ -179,8 +180,8 @@ func LoadAverages(
job *schema.Job,
metrics []string,
data [][]schema.Float,
ctx context.Context) error {
ctx context.Context,
) error {
if job.State != schema.JobStateRunning && useArchive {
return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here?
}
@@ -219,8 +220,8 @@ func LoadNodeData(
metrics, nodes []string,
scopes []schema.MetricScope,
from, to time.Time,
ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
ctx context.Context,
) (map[string]map[string][]*schema.JobMetric, error) {
repo, ok := metricDataRepos[cluster]
if !ok {
return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster)
@@ -252,8 +253,8 @@ func LoadNodeData(
func cacheKey(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope) string {
scopes []schema.MetricScope,
) string {
// Duration and StartTime do not need to be in the cache key as StartTime is less unique than
// job.ID and the TTL of the cache entry makes sure it does not stay there forever.
return fmt.Sprintf("%d(%s):[%v],[%v]",
@@ -267,8 +268,8 @@ func cacheKey(
func prepareJobData(
job *schema.Job,
jobData schema.JobData,
scopes []schema.MetricScope) {
scopes []schema.MetricScope,
) {
const maxSeriesSize int = 15
for _, scopes := range jobData {
for _, jm := range scopes {
@@ -295,7 +296,6 @@ func prepareJobData(
// Writes a running job to the job-archive
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
allMetrics := make([]string, 0)
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
for _, mc := range metricConfigs {

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
@@ -16,6 +16,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
@@ -29,13 +30,11 @@ var (
)
type JobRepository struct {
DB *sqlx.DB
driver string
stmtCache *sq.StmtCache
cache *lrucache.Cache
DB *sqlx.DB
stmtCache *sq.StmtCache
cache *lrucache.Cache
archiveChannel chan *schema.Job
driver string
archivePending sync.WaitGroup
}
@@ -60,7 +59,7 @@ func GetJobRepository() *JobRepository {
var jobColumns []string = []string{
"job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.subcluster", "job.start_time", "job.partition", "job.array_job_id",
"job.num_nodes", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state",
"job.duration", "job.walltime", "job.resources", // "job.meta_data",
"job.duration", "job.walltime", "job.resources", "job.mem_used_max", "job.flops_any_avg", "job.mem_bw_avg", "job.load_avg", // "job.meta_data",
}
func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
@@ -68,7 +67,7 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
if err := row.Scan(
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId,
&job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
&job.Duration, &job.Walltime, &job.RawResources /*&job.RawMetaData*/); err != nil {
&job.Duration, &job.Walltime, &job.RawResources, &job.MemUsedMax, &job.FlopsAnyAvg, &job.MemBwAvg, &job.LoadAvg /*&job.RawMetaData*/); err != nil {
log.Warnf("Error while scanning rows (Job): %v", err)
return nil, err
}
@@ -212,7 +211,7 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
}
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
return nil
return archive.UpdateMetadata(job, job.MetaData)
}
// Find executes a SQL query to find a specific batch job.
@@ -223,8 +222,8 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
func (r *JobRepository) Find(
jobId *int64,
cluster *string,
startTime *int64) (*schema.Job, error) {
startTime *int64,
) (*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
@@ -248,8 +247,8 @@ func (r *JobRepository) Find(
func (r *JobRepository) FindAll(
jobId *int64,
cluster *string,
startTime *int64) ([]*schema.Job, error) {
startTime *int64,
) ([]*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId)
@@ -292,7 +291,8 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {
func (r *JobRepository) FindConcurrentJobs(
ctx context.Context,
job *schema.Job) (*model.JobLinkResultList, error) {
job *schema.Job,
) (*model.JobLinkResultList, error) {
if job == nil {
return nil, nil
}
@@ -420,8 +420,8 @@ func (r *JobRepository) Stop(
jobId int64,
duration int32,
state schema.JobState,
monitoringStatus int32) (err error) {
monitoringStatus int32,
) (err error) {
stmt := sq.Update("job").
Set("job_state", state).
Set("duration", duration).
@@ -434,11 +434,14 @@ func (r *JobRepository) Stop(
func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
var cnt int
qs := fmt.Sprintf("SELECT count(*) FROM job WHERE job.start_time < %d", startTime)
err := r.DB.Get(&cnt, qs) //ignore error as it will also occur in delete statement
_, err = r.DB.Exec(`DELETE FROM job WHERE job.start_time < ?`, startTime)
q := sq.Select("count(*)").From("job").Where("job.start_time < ?", startTime)
q.RunWith(r.DB).QueryRow().Scan(cnt)
qd := sq.Delete("job").Where("job.start_time < ?", startTime)
_, err := qd.RunWith(r.DB).Exec()
if err != nil {
log.Errorf(" DeleteJobsBefore(%d): error %#v", startTime, err)
s, _, _ := qd.ToSql()
log.Errorf(" DeleteJobsBefore(%d) with %s: error %#v", startTime, s, err)
} else {
log.Debugf("DeleteJobsBefore(%d): Deleted %d jobs", startTime, cnt)
}
@@ -446,9 +449,12 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
}
func (r *JobRepository) DeleteJobById(id int64) error {
_, err := r.DB.Exec(`DELETE FROM job WHERE job.id = ?`, id)
qd := sq.Delete("job").Where("job.id = ?", id)
_, err := qd.RunWith(r.DB).Exec()
if err != nil {
log.Errorf("DeleteJobById(%d): error %#v", id, err)
s, _, _ := qd.ToSql()
log.Errorf("DeleteJobById(%d) with %s : error %#v", id, s, err)
} else {
log.Debugf("DeleteJobById(%d): Success", id)
}
@@ -468,8 +474,8 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
func (r *JobRepository) MarkArchived(
jobId int64,
monitoringStatus int32,
metricStats map[string]schema.JobStatistics) error {
metricStats map[string]schema.JobStatistics,
) error {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobId)
@@ -483,6 +489,7 @@ func (r *JobRepository) MarkArchived(
case "mem_bw":
stmt = stmt.Set("mem_bw_avg", stats.Avg)
case "load":
stmt = stmt.Set("load_avg", stats.Avg)
case "cpu_load":
stmt = stmt.Set("load_avg", stats.Avg)
case "net_bw":
@@ -577,8 +584,10 @@ func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm
}
}
var ErrNotFound = errors.New("no such jobname, project or user")
var ErrForbidden = errors.New("not authorized")
var (
ErrNotFound = errors.New("no such jobname, project or user")
ErrForbidden = errors.New("not authorized")
)
func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, table string, selectColumn string, whereColumn string, isLike bool) (result string, err error) {
compareStr := " = ?"
@@ -662,7 +671,6 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) {
// AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host.
// Hosts with zero jobs running on them will not show up!
func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) {
start := time.Now()
subclusters := make(map[string]map[string]int)
rows, err := sq.Select("resources", "subcluster").From("job").
@@ -705,7 +713,6 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
}
func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
start := time.Now()
res, err := sq.Update("job").
Set("monitoring_status", schema.MonitoringStatusArchivingFailed).
@@ -734,7 +741,6 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
}
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) {
var query sq.SelectBuilder
if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
@@ -16,7 +16,7 @@ import (
"github.com/golang-migrate/migrate/v4/source/iofs"
)
const Version uint = 6
const Version uint = 7
//go:embed migrations/*
var migrationFiles embed.FS
@@ -57,7 +57,7 @@ func checkDBVersion(backend string, db *sql.DB) error {
log.Fatalf("unsupported database backend: %s", backend)
}
v, _, err := m.Version()
v, dirty, err := m.Version()
if err != nil {
if err == migrate.ErrNilVersion {
log.Warn("Legacy database without version or missing database file!")
@@ -68,18 +68,18 @@ func checkDBVersion(backend string, db *sql.DB) error {
if v < Version {
return fmt.Errorf("unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend -migrate-db", v, Version)
} else if v > Version {
return fmt.Errorf("unsupported database version %d, need %d.\nPlease refer to documentation how to downgrade db with external migrate tool", v, Version)
}
if v > Version {
return fmt.Errorf("unsupported database version %d, need %d.\nPlease refer to documentation how to downgrade db with external migrate tool", v, Version)
if dirty {
return fmt.Errorf("last migration to version %d has failed, please fix the db manually and force version with -force-db flag", Version)
}
return nil
}
func MigrateDB(backend string, db string) error {
var m *migrate.Migrate
func getMigrateInstance(backend string, db string) (m *migrate.Migrate, err error) {
switch backend {
case "sqlite3":
d, err := iofs.New(migrationFiles, "migrations/sqlite3")
@@ -89,22 +89,31 @@ func MigrateDB(backend string, db string) error {
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("sqlite3://%s?_foreign_keys=on", db))
if err != nil {
return err
return m, err
}
case "mysql":
d, err := iofs.New(migrationFiles, "migrations/mysql")
if err != nil {
return err
return m, err
}
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("mysql://%s?multiStatements=true", db))
if err != nil {
return err
return m, err
}
default:
log.Fatalf("unsupported database backend: %s", backend)
}
return m, nil
}
func MigrateDB(backend string, db string) error {
m, err := getMigrateInstance(backend, db)
if err != nil {
return err
}
if err := m.Up(); err != nil {
if err == migrate.ErrNoChange {
log.Info("DB already up to date!")
@@ -116,3 +125,35 @@ func MigrateDB(backend string, db string) error {
m.Close()
return nil
}
func RevertDB(backend string, db string) error {
m, err := getMigrateInstance(backend, db)
if err != nil {
return err
}
if err := m.Migrate(Version - 1); err != nil {
if err == migrate.ErrNoChange {
log.Info("DB already up to date!")
} else {
return err
}
}
m.Close()
return nil
}
func ForceDB(backend string, db string) error {
m, err := getMigrateInstance(backend, db)
if err != nil {
return err
}
if err := m.Force(int(Version)); err != nil {
return err
}
m.Close()
return nil
}

View File

@@ -0,0 +1,3 @@
SET FOREIGN_KEY_CHECKS = 0;
ALTER TABLE tag MODIFY id INTEGER;
SET FOREIGN_KEY_CHECKS = 1;

View File

@@ -0,0 +1,3 @@
SET FOREIGN_KEY_CHECKS = 0;
ALTER TABLE tag MODIFY id INTEGER AUTO_INCREMENT;
SET FOREIGN_KEY_CHECKS = 1;

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
@@ -96,7 +96,7 @@ func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilde
user := GetUserFromContext(ctx)
if user == nil {
var qnil sq.SelectBuilder
return qnil, fmt.Errorf("user context is nil!")
return qnil, fmt.Errorf("user context is nil")
} else if user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport, schema.RoleApi}) { // Admin & Co. : All jobs
return query, nil
} else if user.HasRole(schema.RoleManager) { // Manager : Add filter for managed projects' jobs only + personal jobs

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
@@ -8,11 +8,15 @@ import (
"context"
"database/sql"
"fmt"
"math"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel"
)
@@ -450,6 +454,39 @@ func (r *JobRepository) AddHistograms(
return stat, nil
}
// Requires thresholds for metric from config for cluster? Of all clusters and use largest? split to 10 + 1 for artifacts?
func (r *JobRepository) AddMetricHistograms(
ctx context.Context,
filter []*model.JobFilter,
metrics []string,
stat *model.JobsStatistics) (*model.JobsStatistics, error) {
start := time.Now()
// Running Jobs Only: First query jobdata from sqlite, then query data and make bins
for _, f := range filter {
if f.State != nil {
if len(f.State) == 1 && f.State[0] == "running" {
stat.HistMetrics = r.runningJobsMetricStatisticsHistogram(ctx, metrics, filter)
log.Debugf("Timer AddMetricHistograms %s", time.Since(start))
return stat, nil
}
}
}
// All other cases: Query and make bins in sqlite directly
for _, m := range metrics {
metricHisto, err := r.jobsMetricStatisticsHistogram(ctx, m, filter)
if err != nil {
log.Warnf("Error while loading job metric statistics histogram: %s", m)
continue
}
stat.HistMetrics = append(stat.HistMetrics, metricHisto)
}
log.Debugf("Timer AddMetricHistograms %s", time.Since(start))
return stat, nil
}
// `value` must be the column grouped by, but renamed to "value"
func (r *JobRepository) jobsStatisticsHistogram(
ctx context.Context,
@@ -487,3 +524,231 @@ func (r *JobRepository) jobsStatisticsHistogram(
log.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start))
return points, nil
}
func (r *JobRepository) jobsMetricStatisticsHistogram(
ctx context.Context,
metric string,
filters []*model.JobFilter) (*model.MetricHistoPoints, error) {
var dbMetric string
switch metric {
case "cpu_load":
dbMetric = "load_avg"
case "flops_any":
dbMetric = "flops_any_avg"
case "mem_bw":
dbMetric = "mem_bw_avg"
case "mem_used":
dbMetric = "mem_used_max"
case "net_bw":
dbMetric = "net_bw_avg"
case "file_bw":
dbMetric = "file_bw_avg"
default:
return nil, fmt.Errorf("%s not implemented", metric)
}
// Get specific Peak or largest Peak
var metricConfig *schema.MetricConfig
var peak float64 = 0.0
var unit string = ""
for _, f := range filters {
if f.Cluster != nil {
metricConfig = archive.GetMetricConfig(*f.Cluster.Eq, metric)
peak = metricConfig.Peak
unit = metricConfig.Unit.Prefix + metricConfig.Unit.Base
log.Debugf("Cluster %s filter found with peak %f for %s", *f.Cluster.Eq, peak, metric)
}
}
if peak == 0.0 {
for _, c := range archive.Clusters {
for _, m := range c.MetricConfig {
if m.Name == metric {
if m.Peak > peak {
peak = m.Peak
}
if unit == "" {
unit = m.Unit.Prefix + m.Unit.Base
}
}
}
}
}
// log.Debugf("Metric %s: DB %s, Peak %f, Unit %s", metric, dbMetric, peak, unit)
// Make bins, see https://jereze.com/code/sql-histogram/
start := time.Now()
crossJoinQuery := sq.Select(
fmt.Sprintf(`max(%s) as max`, dbMetric),
fmt.Sprintf(`min(%s) as min`, dbMetric),
).From("job").Where(
fmt.Sprintf(`%s is not null`, dbMetric),
).Where(
fmt.Sprintf(`%s <= %f`, dbMetric, peak),
)
crossJoinQuery, cjqerr := SecurityCheck(ctx, crossJoinQuery)
if cjqerr != nil {
return nil, cjqerr
}
for _, f := range filters {
crossJoinQuery = BuildWhereClause(f, crossJoinQuery)
}
crossJoinQuerySql, crossJoinQueryArgs, sqlerr := crossJoinQuery.ToSql()
if sqlerr != nil {
return nil, sqlerr
}
bins := 10
binQuery := fmt.Sprintf(`CAST( (case when job.%s = value.max then value.max*0.999999999 else job.%s end - value.min) / (value.max - value.min) * %d as INTEGER )`, dbMetric, dbMetric, bins)
mainQuery := sq.Select(
fmt.Sprintf(`%s + 1 as bin`, binQuery),
fmt.Sprintf(`count(job.%s) as count`, dbMetric),
fmt.Sprintf(`CAST(((value.max / %d) * (%s )) as INTEGER ) as min`, bins, binQuery),
fmt.Sprintf(`CAST(((value.max / %d) * (%s + 1 )) as INTEGER ) as max`, bins, binQuery),
).From("job").CrossJoin(
fmt.Sprintf(`(%s) as value`, crossJoinQuerySql), crossJoinQueryArgs...,
).Where(fmt.Sprintf(`job.%s is not null and job.%s <= %f`, dbMetric, dbMetric, peak))
mainQuery, qerr := SecurityCheck(ctx, mainQuery)
if qerr != nil {
return nil, qerr
}
for _, f := range filters {
mainQuery = BuildWhereClause(f, mainQuery)
}
// Finalize query with Grouping and Ordering
mainQuery = mainQuery.GroupBy("bin").OrderBy("bin")
rows, err := mainQuery.RunWith(r.DB).Query()
if err != nil {
log.Errorf("Error while running mainQuery: %s", err)
return nil, err
}
points := make([]*model.MetricHistoPoint, 0)
for rows.Next() {
point := model.MetricHistoPoint{}
if err := rows.Scan(&point.Bin, &point.Count, &point.Min, &point.Max); err != nil {
log.Warnf("Error while scanning rows for %s", metric)
return nil, err // Totally bricks cc-backend if returned and if all metrics requested?
}
points = append(points, &point)
}
result := model.MetricHistoPoints{Metric: metric, Unit: unit, Data: points}
log.Debugf("Timer jobsStatisticsHistogram %s", time.Since(start))
return &result, nil
}
func (r *JobRepository) runningJobsMetricStatisticsHistogram(
ctx context.Context,
metrics []string,
filters []*model.JobFilter) []*model.MetricHistoPoints {
// Get Jobs
jobs, err := r.QueryJobs(ctx, filters, &model.PageRequest{Page: 1, ItemsPerPage: 500 + 1}, nil)
if err != nil {
log.Errorf("Error while querying jobs for footprint: %s", err)
return nil
}
if len(jobs) > 500 {
log.Errorf("too many jobs matched (max: %d)", 500)
return nil
}
// Get AVGs from metric repo
avgs := make([][]schema.Float, len(metrics))
for i := range avgs {
avgs[i] = make([]schema.Float, 0, len(jobs))
}
for _, job := range jobs {
if job.MonitoringStatus == schema.MonitoringStatusDisabled || job.MonitoringStatus == schema.MonitoringStatusArchivingFailed {
continue
}
if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil {
log.Errorf("Error while loading averages for histogram: %s", err)
return nil
}
}
// Iterate metrics to fill endresult
data := make([]*model.MetricHistoPoints, 0)
for idx, metric := range metrics {
// Get specific Peak or largest Peak
var metricConfig *schema.MetricConfig
var peak float64 = 0.0
var unit string = ""
for _, f := range filters {
if f.Cluster != nil {
metricConfig = archive.GetMetricConfig(*f.Cluster.Eq, metric)
peak = metricConfig.Peak
unit = metricConfig.Unit.Prefix + metricConfig.Unit.Base
log.Debugf("Cluster %s filter found with peak %f for %s", *f.Cluster.Eq, peak, metric)
}
}
if peak == 0.0 {
for _, c := range archive.Clusters {
for _, m := range c.MetricConfig {
if m.Name == metric {
if m.Peak > peak {
peak = m.Peak
}
if unit == "" {
unit = m.Unit.Prefix + m.Unit.Base
}
}
}
}
}
// Make and fill bins
bins := 10.0
peakBin := peak / bins
points := make([]*model.MetricHistoPoint, 0)
for b := 0; b < 10; b++ {
count := 0
bindex := b + 1
bmin := math.Round(peakBin * float64(b))
bmax := math.Round(peakBin * (float64(b) + 1.0))
// Iterate AVG values for indexed metric and count for bins
for _, val := range avgs[idx] {
if float64(val) >= bmin && float64(val) < bmax {
count += 1
}
}
bminint := int(bmin)
bmaxint := int(bmax)
// Append Bin to Metric Result Array
point := model.MetricHistoPoint{Bin: &bindex, Count: count, Min: &bminint, Max: &bmaxint}
points = append(points, &point)
}
// Append Metric Result Array to final results array
result := model.MetricHistoPoints{Metric: metric, Unit: unit, Data: points}
data = append(data, &result)
}
return data
}

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
@@ -15,8 +15,11 @@ import (
// Add the tag with id `tagId` to the job with the database id `jobId`.
func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {
if _, err := r.stmtCache.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES ($1, $2)`, job, tag); err != nil {
log.Error("Error while running query")
q := sq.Insert("jobtag").Columns("job_id", "tag_id").Values(job, tag)
if _, err := q.RunWith(r.stmtCache).Exec(); err != nil {
s, _, _ := q.ToSql()
log.Errorf("Error adding tag with %s: %v", s, err)
return nil, err
}
@@ -37,8 +40,11 @@ func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) {
// Removes a tag from a job
func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) {
if _, err := r.stmtCache.Exec("DELETE FROM jobtag WHERE jobtag.job_id = $1 AND jobtag.tag_id = $2", job, tag); err != nil {
log.Error("Error while running query")
q := sq.Delete("jobtag").Where("jobtag.job_id = ?", job).Where("jobtag.tag_id = ?", tag)
if _, err := q.RunWith(r.stmtCache).Exec(); err != nil {
s, _, _ := q.ToSql()
log.Errorf("Error adding tag with %s: %v", s, err)
return nil, err
}
@@ -59,8 +65,12 @@ func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) {
// CreateTag creates a new tag with the specified type and name and returns its database id.
func (r *JobRepository) CreateTag(tagType string, tagName string) (tagId int64, err error) {
res, err := r.stmtCache.Exec("INSERT INTO tag (tag_type, tag_name) VALUES ($1, $2)", tagType, tagName)
q := sq.Insert("tag").Columns("tag_type", "tag_name").Values(tagType, tagName)
res, err := q.RunWith(r.stmtCache).Exec()
if err != nil {
s, _, _ := q.ToSql()
log.Errorf("Error inserting tag with %s: %v", s, err)
return 0, err
}
@@ -167,7 +177,8 @@ func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) {
rows, err := q.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
s, _, _ := q.ToSql()
log.Errorf("Error get tags with %s: %v", s, err)
return nil, err
}

Binary file not shown.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.