mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-08-24 11:02:59 +02:00
Merge branch 'dev' into rework_status_view
This commit is contained in:
@@ -27,6 +27,7 @@ import (
|
||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
ccconf "github.com/ClusterCockpit/cc-lib/ccConfig"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/schema"
|
||||
"github.com/gorilla/mux"
|
||||
@@ -36,18 +37,22 @@ import (
|
||||
|
||||
func setup(t *testing.T) *api.RestApi {
|
||||
const testconfig = `{
|
||||
"main": {
|
||||
"addr": "0.0.0.0:8080",
|
||||
"validate": false,
|
||||
"apiAllowedIPs": [
|
||||
"*"
|
||||
]
|
||||
},
|
||||
"archive": {
|
||||
"kind": "file",
|
||||
"path": "./var/job-archive"
|
||||
},
|
||||
"jwts": {
|
||||
"max-age": "2m"
|
||||
},
|
||||
"apiAllowedIPs": [
|
||||
"*"
|
||||
],
|
||||
"auth": {
|
||||
"jwts": {
|
||||
"max-age": "2m"
|
||||
}
|
||||
},
|
||||
"clusters": [
|
||||
{
|
||||
"name": "testcluster",
|
||||
@@ -146,7 +151,18 @@ func setup(t *testing.T) *api.RestApi {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
config.Init(cfgFilePath)
|
||||
ccconf.Init(cfgFilePath)
|
||||
|
||||
// Load and check main configuration
|
||||
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {
|
||||
if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil {
|
||||
config.Init(cfg, clustercfg)
|
||||
} else {
|
||||
cclog.Abort("Cluster configuration must be present")
|
||||
}
|
||||
} else {
|
||||
cclog.Abort("Main configuration must be present")
|
||||
}
|
||||
archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive)
|
||||
|
||||
repository.Connect("sqlite3", dbfilepath)
|
||||
@@ -160,7 +176,14 @@ func setup(t *testing.T) *api.RestApi {
|
||||
}
|
||||
|
||||
archiver.Start(repository.GetJobRepository())
|
||||
auth.Init()
|
||||
|
||||
if cfg := ccconf.GetPackageConfig("auth"); cfg != nil {
|
||||
auth.Init(&cfg)
|
||||
} else {
|
||||
cclog.Warn("Authentication disabled due to missing configuration")
|
||||
auth.Init(nil)
|
||||
}
|
||||
|
||||
graph.Init()
|
||||
|
||||
return api.New()
|
||||
@@ -274,7 +297,6 @@ func TestRestApi(t *testing.T) {
|
||||
job.NumNodes != 1 ||
|
||||
job.NumHWThreads != 8 ||
|
||||
job.NumAcc != 0 ||
|
||||
job.Exclusive != 1 ||
|
||||
job.MonitoringStatus != 1 ||
|
||||
job.SMT != 1 ||
|
||||
!reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "host123", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) ||
|
||||
|
@@ -1261,9 +1261,27 @@ const docTemplate = `{
|
||||
"api.Node": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"cpusAllocated": {
|
||||
"type": "integer"
|
||||
},
|
||||
"cpusTotal": {
|
||||
"type": "integer"
|
||||
},
|
||||
"gpusAllocated": {
|
||||
"type": "integer"
|
||||
},
|
||||
"gpusTotal": {
|
||||
"type": "integer"
|
||||
},
|
||||
"hostname": {
|
||||
"type": "string"
|
||||
},
|
||||
"memoryAllocated": {
|
||||
"type": "integer"
|
||||
},
|
||||
"memoryTotal": {
|
||||
"type": "integer"
|
||||
},
|
||||
"states": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
@@ -1379,7 +1397,8 @@ const docTemplate = `{
|
||||
"energyFootprint": {
|
||||
"type": "object",
|
||||
"additionalProperties": {
|
||||
"type": "number"
|
||||
"type": "number",
|
||||
"format": "float64"
|
||||
}
|
||||
},
|
||||
"exclusive": {
|
||||
@@ -1391,7 +1410,8 @@ const docTemplate = `{
|
||||
"footprint": {
|
||||
"type": "object",
|
||||
"additionalProperties": {
|
||||
"type": "number"
|
||||
"type": "number",
|
||||
"format": "float64"
|
||||
}
|
||||
},
|
||||
"id": {
|
||||
@@ -1482,6 +1502,10 @@ const docTemplate = `{
|
||||
"type": "string",
|
||||
"example": "main"
|
||||
},
|
||||
"submitTime": {
|
||||
"type": "integer",
|
||||
"example": 1649723812
|
||||
},
|
||||
"tags": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
@@ -1547,24 +1571,32 @@ const docTemplate = `{
|
||||
"schema.JobState": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"running",
|
||||
"completed",
|
||||
"failed",
|
||||
"boot_fail",
|
||||
"cancelled",
|
||||
"stopped",
|
||||
"timeout",
|
||||
"completed",
|
||||
"deadline",
|
||||
"failed",
|
||||
"node_fail",
|
||||
"out_of_memory",
|
||||
"pending",
|
||||
"preempted",
|
||||
"out_of_memory"
|
||||
"running",
|
||||
"suspended",
|
||||
"timeout"
|
||||
],
|
||||
"x-enum-varnames": [
|
||||
"JobStateRunning",
|
||||
"JobStateCompleted",
|
||||
"JobStateFailed",
|
||||
"JobStateBootFail",
|
||||
"JobStateCancelled",
|
||||
"JobStateStopped",
|
||||
"JobStateTimeout",
|
||||
"JobStateCompleted",
|
||||
"JobStateDeadline",
|
||||
"JobStateFailed",
|
||||
"JobStateNodeFail",
|
||||
"JobStateOutOfMemory",
|
||||
"JobStatePending",
|
||||
"JobStatePreempted",
|
||||
"JobStateOutOfMemory"
|
||||
"JobStateRunning",
|
||||
"JobStateSuspended",
|
||||
"JobStateTimeout"
|
||||
]
|
||||
},
|
||||
"schema.JobStatistics": {
|
||||
@@ -1763,7 +1795,8 @@ const docTemplate = `{
|
||||
"additionalProperties": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "number"
|
||||
"type": "number",
|
||||
"format": "float64"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -17,6 +17,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/archiver"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/importer"
|
||||
@@ -143,7 +144,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
ufrom, uto := time.Unix(from, 0), time.Unix(to, 0)
|
||||
filter.StartTime = &schema.TimeRange{From: &ufrom, To: &uto}
|
||||
filter.StartTime = &config.TimeRange{From: &ufrom, To: &uto}
|
||||
case "page":
|
||||
x, err := strconv.Atoi(vals[0])
|
||||
if err != nil {
|
||||
@@ -647,7 +648,7 @@ func (api *RestApi) removeTags(rw http.ResponseWriter, r *http.Request) {
|
||||
// @router /api/jobs/start_job/ [post]
|
||||
func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
|
||||
req := schema.Job{
|
||||
Exclusive: 1,
|
||||
Shared: "none",
|
||||
MonitoringStatus: schema.MonitoringStatusRunningOrArchiving,
|
||||
}
|
||||
if err := decode(r.Body, &req); err != nil {
|
||||
|
@@ -5,10 +5,12 @@
|
||||
package auth
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"database/sql"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
@@ -51,6 +53,14 @@ func getIPUserLimiter(ip, username string) *rate.Limiter {
|
||||
return limiter.(*rate.Limiter)
|
||||
}
|
||||
|
||||
type AuthConfig struct {
|
||||
LdapConfig *LdapConfig `json:"ldap"`
|
||||
JwtConfig *JWTAuthConfig `json:"jwts"`
|
||||
OpenIDConfig *OpenIDConfig `json:"oidc"`
|
||||
}
|
||||
|
||||
var Keys AuthConfig
|
||||
|
||||
type Authentication struct {
|
||||
sessionStore *sessions.CookieStore
|
||||
LdapAuth *LdapAuthenticator
|
||||
@@ -87,7 +97,7 @@ func (auth *Authentication) AuthViaSession(
|
||||
}, nil
|
||||
}
|
||||
|
||||
func Init() {
|
||||
func Init(authCfg *json.RawMessage) {
|
||||
initOnce.Do(func() {
|
||||
authInstance = &Authentication{}
|
||||
|
||||
@@ -111,7 +121,18 @@ func Init() {
|
||||
authInstance.SessionMaxAge = d
|
||||
}
|
||||
|
||||
if config.Keys.LdapConfig != nil {
|
||||
if authCfg == nil {
|
||||
return
|
||||
}
|
||||
|
||||
config.Validate(configSchema, *authCfg)
|
||||
dec := json.NewDecoder(bytes.NewReader(*authCfg))
|
||||
dec.DisallowUnknownFields()
|
||||
if err := dec.Decode(&Keys); err != nil {
|
||||
cclog.Errorf("error while decoding ldap config: %v", err)
|
||||
}
|
||||
|
||||
if Keys.LdapConfig != nil {
|
||||
ldapAuth := &LdapAuthenticator{}
|
||||
if err := ldapAuth.Init(); err != nil {
|
||||
cclog.Warn("Error while initializing authentication -> ldapAuth init failed")
|
||||
@@ -123,7 +144,7 @@ func Init() {
|
||||
cclog.Info("Missing LDAP configuration: No LDAP support!")
|
||||
}
|
||||
|
||||
if config.Keys.JwtConfig != nil {
|
||||
if Keys.JwtConfig != nil {
|
||||
authInstance.JwtAuth = &JWTAuthenticator{}
|
||||
if err := authInstance.JwtAuth.Init(); err != nil {
|
||||
cclog.Fatal("Error while initializing authentication -> jwtAuth init failed")
|
||||
@@ -168,11 +189,11 @@ func handleTokenUser(tokenUser *schema.User) {
|
||||
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
cclog.Errorf("Error while loading user '%s': %v", tokenUser.Username, err)
|
||||
} else if err == sql.ErrNoRows && config.Keys.JwtConfig.SyncUserOnLogin { // Adds New User
|
||||
} else if err == sql.ErrNoRows && Keys.JwtConfig.SyncUserOnLogin { // Adds New User
|
||||
if err := r.AddUser(tokenUser); err != nil {
|
||||
cclog.Errorf("Error while adding user '%s' to DB: %v", tokenUser.Username, err)
|
||||
}
|
||||
} else if err == nil && config.Keys.JwtConfig.UpdateUserOnLogin { // Update Existing User
|
||||
} else if err == nil && Keys.JwtConfig.UpdateUserOnLogin { // Update Existing User
|
||||
if err := r.UpdateUser(dbUser, tokenUser); err != nil {
|
||||
cclog.Errorf("Error while updating user '%s' to DB: %v", dbUser.Username, err)
|
||||
}
|
||||
@@ -185,11 +206,11 @@ func handleOIDCUser(OIDCUser *schema.User) {
|
||||
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
cclog.Errorf("Error while loading user '%s': %v", OIDCUser.Username, err)
|
||||
} else if err == sql.ErrNoRows && config.Keys.OpenIDConfig.SyncUserOnLogin { // Adds New User
|
||||
} else if err == sql.ErrNoRows && Keys.OpenIDConfig.SyncUserOnLogin { // Adds New User
|
||||
if err := r.AddUser(OIDCUser); err != nil {
|
||||
cclog.Errorf("Error while adding user '%s' to DB: %v", OIDCUser.Username, err)
|
||||
}
|
||||
} else if err == nil && config.Keys.OpenIDConfig.UpdateUserOnLogin { // Update Existing User
|
||||
} else if err == nil && Keys.OpenIDConfig.UpdateUserOnLogin { // Update Existing User
|
||||
if err := r.UpdateUser(dbUser, OIDCUser); err != nil {
|
||||
cclog.Errorf("Error while updating user '%s' to DB: %v", dbUser.Username, err)
|
||||
}
|
||||
|
@@ -13,13 +13,34 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/schema"
|
||||
"github.com/golang-jwt/jwt/v5"
|
||||
)
|
||||
|
||||
type JWTAuthConfig struct {
|
||||
// Specifies for how long a JWT token shall be valid
|
||||
// as a string parsable by time.ParseDuration().
|
||||
MaxAge string `json:"max-age"`
|
||||
|
||||
// Specifies which cookie should be checked for a JWT token (if no authorization header is present)
|
||||
CookieName string `json:"cookieName"`
|
||||
|
||||
// Deny login for users not in database (but defined in JWT).
|
||||
// Ignore user roles defined in JWTs ('roles' claim), get them from db.
|
||||
ValidateUser bool `json:"validateUser"`
|
||||
|
||||
// Specifies which issuer should be accepted when validating external JWTs ('iss' claim)
|
||||
TrustedIssuer string `json:"trustedIssuer"`
|
||||
|
||||
// Should an non-existent user be added to the DB based on the information in the token
|
||||
SyncUserOnLogin bool `json:"syncUserOnLogin"`
|
||||
|
||||
// Should an existent user be updated in the DB based on the information in the token
|
||||
UpdateUserOnLogin bool `json:"updateUserOnLogin"`
|
||||
}
|
||||
|
||||
type JWTAuthenticator struct {
|
||||
publicKey ed25519.PublicKey
|
||||
privateKey ed25519.PrivateKey
|
||||
@@ -62,7 +83,7 @@ func (ja *JWTAuthenticator) AuthViaJWT(
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
token, err := jwt.Parse(rawtoken, func(t *jwt.Token) (interface{}, error) {
|
||||
token, err := jwt.Parse(rawtoken, func(t *jwt.Token) (any, error) {
|
||||
if t.Method != jwt.SigningMethodEdDSA {
|
||||
return nil, errors.New("only Ed25519/EdDSA supported")
|
||||
}
|
||||
@@ -85,7 +106,7 @@ func (ja *JWTAuthenticator) AuthViaJWT(
|
||||
var roles []string
|
||||
|
||||
// Validate user + roles from JWT against database?
|
||||
if config.Keys.JwtConfig.ValidateUser {
|
||||
if Keys.JwtConfig.ValidateUser {
|
||||
ur := repository.GetUserRepository()
|
||||
user, err := ur.GetUser(sub)
|
||||
// Deny any logins for unknown usernames
|
||||
@@ -97,7 +118,7 @@ func (ja *JWTAuthenticator) AuthViaJWT(
|
||||
roles = user.Roles
|
||||
} else {
|
||||
// Extract roles from JWT (if present)
|
||||
if rawroles, ok := claims["roles"].([]interface{}); ok {
|
||||
if rawroles, ok := claims["roles"].([]any); ok {
|
||||
for _, rr := range rawroles {
|
||||
if r, ok := rr.(string); ok {
|
||||
roles = append(roles, r)
|
||||
@@ -126,8 +147,8 @@ func (ja *JWTAuthenticator) ProvideJWT(user *schema.User) (string, error) {
|
||||
"roles": user.Roles,
|
||||
"iat": now.Unix(),
|
||||
}
|
||||
if config.Keys.JwtConfig.MaxAge != "" {
|
||||
d, err := time.ParseDuration(config.Keys.JwtConfig.MaxAge)
|
||||
if Keys.JwtConfig.MaxAge != "" {
|
||||
d, err := time.ParseDuration(Keys.JwtConfig.MaxAge)
|
||||
if err != nil {
|
||||
return "", errors.New("cannot parse max-age config key")
|
||||
}
|
||||
|
@@ -13,7 +13,6 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/schema"
|
||||
@@ -63,17 +62,16 @@ func (ja *JWTCookieSessionAuthenticator) Init() error {
|
||||
return errors.New("environment variable 'CROSS_LOGIN_JWT_PUBLIC_KEY' not set (cross login token based authentication will not work)")
|
||||
}
|
||||
|
||||
jc := config.Keys.JwtConfig
|
||||
// Warn if other necessary settings are not configured
|
||||
if jc != nil {
|
||||
if jc.CookieName == "" {
|
||||
if Keys.JwtConfig != nil {
|
||||
if Keys.JwtConfig.CookieName == "" {
|
||||
cclog.Info("cookieName for JWTs not configured (cross login via JWT cookie will fail)")
|
||||
return errors.New("cookieName for JWTs not configured (cross login via JWT cookie will fail)")
|
||||
}
|
||||
if !jc.ValidateUser {
|
||||
if !Keys.JwtConfig.ValidateUser {
|
||||
cclog.Info("forceJWTValidationViaDatabase not set to true: CC will accept users and roles defined in JWTs regardless of its own database!")
|
||||
}
|
||||
if jc.TrustedIssuer == "" {
|
||||
if Keys.JwtConfig.TrustedIssuer == "" {
|
||||
cclog.Info("trustedExternalIssuer for JWTs not configured (cross login via JWT cookie will fail)")
|
||||
return errors.New("trustedExternalIssuer for JWTs not configured (cross login via JWT cookie will fail)")
|
||||
}
|
||||
@@ -92,7 +90,7 @@ func (ja *JWTCookieSessionAuthenticator) CanLogin(
|
||||
rw http.ResponseWriter,
|
||||
r *http.Request,
|
||||
) (*schema.User, bool) {
|
||||
jc := config.Keys.JwtConfig
|
||||
jc := Keys.JwtConfig
|
||||
cookieName := ""
|
||||
if jc.CookieName != "" {
|
||||
cookieName = jc.CookieName
|
||||
@@ -115,7 +113,7 @@ func (ja *JWTCookieSessionAuthenticator) Login(
|
||||
rw http.ResponseWriter,
|
||||
r *http.Request,
|
||||
) (*schema.User, error) {
|
||||
jc := config.Keys.JwtConfig
|
||||
jc := Keys.JwtConfig
|
||||
jwtCookie, err := r.Cookie(jc.CookieName)
|
||||
var rawtoken string
|
||||
|
||||
@@ -123,7 +121,7 @@ func (ja *JWTCookieSessionAuthenticator) Login(
|
||||
rawtoken = jwtCookie.Value
|
||||
}
|
||||
|
||||
token, err := jwt.Parse(rawtoken, func(t *jwt.Token) (interface{}, error) {
|
||||
token, err := jwt.Parse(rawtoken, func(t *jwt.Token) (any, error) {
|
||||
if t.Method != jwt.SigningMethodEdDSA {
|
||||
return nil, errors.New("only Ed25519/EdDSA supported")
|
||||
}
|
||||
@@ -169,8 +167,8 @@ func (ja *JWTCookieSessionAuthenticator) Login(
|
||||
}
|
||||
} else {
|
||||
var name string
|
||||
if wrap, ok := claims["name"].(map[string]interface{}); ok {
|
||||
if vals, ok := wrap["values"].([]interface{}); ok {
|
||||
if wrap, ok := claims["name"].(map[string]any); ok {
|
||||
if vals, ok := wrap["values"].([]any); ok {
|
||||
if len(vals) != 0 {
|
||||
name = fmt.Sprintf("%v", vals[0])
|
||||
|
||||
@@ -182,7 +180,7 @@ func (ja *JWTCookieSessionAuthenticator) Login(
|
||||
}
|
||||
|
||||
// Extract roles from JWT (if present)
|
||||
if rawroles, ok := claims["roles"].([]interface{}); ok {
|
||||
if rawroles, ok := claims["roles"].([]any); ok {
|
||||
for _, rr := range rawroles {
|
||||
if r, ok := rr.(string); ok {
|
||||
roles = append(roles, r)
|
||||
|
@@ -13,7 +13,6 @@ import (
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/schema"
|
||||
@@ -60,7 +59,7 @@ func (ja *JWTSessionAuthenticator) Login(
|
||||
rawtoken = r.URL.Query().Get("login-token")
|
||||
}
|
||||
|
||||
token, err := jwt.Parse(rawtoken, func(t *jwt.Token) (interface{}, error) {
|
||||
token, err := jwt.Parse(rawtoken, func(t *jwt.Token) (any, error) {
|
||||
if t.Method == jwt.SigningMethodHS256 || t.Method == jwt.SigningMethodHS512 {
|
||||
return ja.loginTokenKey, nil
|
||||
}
|
||||
@@ -82,7 +81,7 @@ func (ja *JWTSessionAuthenticator) Login(
|
||||
var roles []string
|
||||
projects := make([]string, 0)
|
||||
|
||||
if config.Keys.JwtConfig.ValidateUser {
|
||||
if Keys.JwtConfig.ValidateUser {
|
||||
var err error
|
||||
user, err = repository.GetUserRepository().GetUser(sub)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
@@ -96,8 +95,8 @@ func (ja *JWTSessionAuthenticator) Login(
|
||||
}
|
||||
} else {
|
||||
var name string
|
||||
if wrap, ok := claims["name"].(map[string]interface{}); ok {
|
||||
if vals, ok := wrap["values"].([]interface{}); ok {
|
||||
if wrap, ok := claims["name"].(map[string]any); ok {
|
||||
if vals, ok := wrap["values"].([]any); ok {
|
||||
if len(vals) != 0 {
|
||||
name = fmt.Sprintf("%v", vals[0])
|
||||
|
||||
@@ -109,7 +108,7 @@ func (ja *JWTSessionAuthenticator) Login(
|
||||
}
|
||||
|
||||
// Extract roles from JWT (if present)
|
||||
if rawroles, ok := claims["roles"].([]interface{}); ok {
|
||||
if rawroles, ok := claims["roles"].([]any); ok {
|
||||
for _, rr := range rawroles {
|
||||
if r, ok := rr.(string); ok {
|
||||
if schema.IsValidRole(r) {
|
||||
@@ -119,7 +118,7 @@ func (ja *JWTSessionAuthenticator) Login(
|
||||
}
|
||||
}
|
||||
|
||||
if rawprojs, ok := claims["projects"].([]interface{}); ok {
|
||||
if rawprojs, ok := claims["projects"].([]any); ok {
|
||||
for _, pp := range rawprojs {
|
||||
if p, ok := pp.(string); ok {
|
||||
projects = append(projects, p)
|
||||
@@ -138,7 +137,7 @@ func (ja *JWTSessionAuthenticator) Login(
|
||||
AuthSource: schema.AuthViaToken,
|
||||
}
|
||||
|
||||
if config.Keys.JwtConfig.SyncUserOnLogin || config.Keys.JwtConfig.UpdateUserOnLogin {
|
||||
if Keys.JwtConfig.SyncUserOnLogin || Keys.JwtConfig.UpdateUserOnLogin {
|
||||
handleTokenUser(user)
|
||||
}
|
||||
}
|
||||
|
@@ -11,13 +11,26 @@ import (
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/schema"
|
||||
"github.com/go-ldap/ldap/v3"
|
||||
)
|
||||
|
||||
type LdapConfig struct {
|
||||
Url string `json:"url"`
|
||||
UserBase string `json:"user_base"`
|
||||
SearchDN string `json:"search_dn"`
|
||||
UserBind string `json:"user_bind"`
|
||||
UserFilter string `json:"user_filter"`
|
||||
UserAttr string `json:"username_attr"`
|
||||
SyncInterval string `json:"sync_interval"` // Parsed using time.ParseDuration.
|
||||
SyncDelOldUsers bool `json:"sync_del_old_users"`
|
||||
|
||||
// Should an non-existent user be added to the DB if user exists in ldap directory
|
||||
SyncUserOnLogin bool `json:"syncUserOnLogin"`
|
||||
}
|
||||
|
||||
type LdapAuthenticator struct {
|
||||
syncPassword string
|
||||
UserAttr string
|
||||
@@ -31,10 +44,8 @@ func (la *LdapAuthenticator) Init() error {
|
||||
cclog.Warn("environment variable 'LDAP_ADMIN_PASSWORD' not set (ldap sync will not work)")
|
||||
}
|
||||
|
||||
lc := config.Keys.LdapConfig
|
||||
|
||||
if lc.UserAttr != "" {
|
||||
la.UserAttr = lc.UserAttr
|
||||
if Keys.LdapConfig.UserAttr != "" {
|
||||
la.UserAttr = Keys.LdapConfig.UserAttr
|
||||
} else {
|
||||
la.UserAttr = "gecos"
|
||||
}
|
||||
@@ -48,7 +59,7 @@ func (la *LdapAuthenticator) CanLogin(
|
||||
rw http.ResponseWriter,
|
||||
r *http.Request,
|
||||
) (*schema.User, bool) {
|
||||
lc := config.Keys.LdapConfig
|
||||
lc := Keys.LdapConfig
|
||||
|
||||
if user != nil {
|
||||
if user.AuthSource == schema.AuthViaLDAP {
|
||||
@@ -119,7 +130,7 @@ func (la *LdapAuthenticator) Login(
|
||||
}
|
||||
defer l.Close()
|
||||
|
||||
userDn := strings.Replace(config.Keys.LdapConfig.UserBind, "{username}", user.Username, -1)
|
||||
userDn := strings.Replace(Keys.LdapConfig.UserBind, "{username}", user.Username, -1)
|
||||
if err := l.Bind(userDn, r.FormValue("password")); err != nil {
|
||||
cclog.Errorf("AUTH/LDAP > Authentication for user %s failed: %v",
|
||||
user.Username, err)
|
||||
@@ -134,7 +145,7 @@ func (la *LdapAuthenticator) Sync() error {
|
||||
const IN_LDAP int = 2
|
||||
const IN_BOTH int = 3
|
||||
ur := repository.GetUserRepository()
|
||||
lc := config.Keys.LdapConfig
|
||||
lc := Keys.LdapConfig
|
||||
|
||||
users := map[string]int{}
|
||||
usernames, err := ur.GetLdapUsernames()
|
||||
@@ -210,7 +221,7 @@ func (la *LdapAuthenticator) Sync() error {
|
||||
}
|
||||
|
||||
func (la *LdapAuthenticator) getLdapConnection(admin bool) (*ldap.Conn, error) {
|
||||
lc := config.Keys.LdapConfig
|
||||
lc := Keys.LdapConfig
|
||||
conn, err := ldap.DialURL(lc.Url)
|
||||
if err != nil {
|
||||
cclog.Warn("LDAP URL dial failed")
|
||||
|
@@ -13,7 +13,6 @@ import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/schema"
|
||||
@@ -22,6 +21,12 @@ import (
|
||||
"golang.org/x/oauth2"
|
||||
)
|
||||
|
||||
type OpenIDConfig struct {
|
||||
Provider string `json:"provider"`
|
||||
SyncUserOnLogin bool `json:"syncUserOnLogin"`
|
||||
UpdateUserOnLogin bool `json:"updateUserOnLogin"`
|
||||
}
|
||||
|
||||
type OIDC struct {
|
||||
client *oauth2.Config
|
||||
provider *oidc.Provider
|
||||
@@ -49,7 +54,7 @@ func setCallbackCookie(w http.ResponseWriter, r *http.Request, name, value strin
|
||||
}
|
||||
|
||||
func NewOIDC(a *Authentication) *OIDC {
|
||||
provider, err := oidc.NewProvider(context.Background(), config.Keys.OpenIDConfig.Provider)
|
||||
provider, err := oidc.NewProvider(context.Background(), Keys.OpenIDConfig.Provider)
|
||||
if err != nil {
|
||||
cclog.Fatal(err)
|
||||
}
|
||||
@@ -168,7 +173,7 @@ func (oa *OIDC) OAuth2Callback(rw http.ResponseWriter, r *http.Request) {
|
||||
AuthSource: schema.AuthViaOIDC,
|
||||
}
|
||||
|
||||
if config.Keys.OpenIDConfig.SyncUserOnLogin || config.Keys.OpenIDConfig.UpdateUserOnLogin {
|
||||
if Keys.OpenIDConfig.SyncUserOnLogin || Keys.OpenIDConfig.UpdateUserOnLogin {
|
||||
handleOIDCUser(user)
|
||||
}
|
||||
|
||||
|
95
internal/auth/schema.go
Normal file
95
internal/auth/schema.go
Normal file
@@ -0,0 +1,95 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved. This file is part of cc-backend.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package auth
|
||||
|
||||
var configSchema = `
|
||||
{
|
||||
"jwts": {
|
||||
"description": "For JWT token authentication.",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"max-age": {
|
||||
"description": "Configure how long a token is valid. As string parsable by time.ParseDuration()",
|
||||
"type": "string"
|
||||
},
|
||||
"cookieName": {
|
||||
"description": "Cookie that should be checked for a JWT token.",
|
||||
"type": "string"
|
||||
},
|
||||
"validateUser": {
|
||||
"description": "Deny login for users not in database (but defined in JWT). Overwrite roles in JWT with database roles.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"trustedIssuer": {
|
||||
"description": "Issuer that should be accepted when validating external JWTs ",
|
||||
"type": "string"
|
||||
},
|
||||
"syncUserOnLogin": {
|
||||
"description": "Add non-existent user to DB at login attempt with values provided in JWT.",
|
||||
"type": "boolean"
|
||||
}
|
||||
},
|
||||
"required": ["max-age"]
|
||||
},
|
||||
"oidc": {
|
||||
"provider": {
|
||||
"description": "",
|
||||
"type": "string"
|
||||
},
|
||||
"syncUserOnLogin": {
|
||||
"description": "",
|
||||
"type": "boolean"
|
||||
},
|
||||
"updateUserOnLogin": {
|
||||
"description": "",
|
||||
"type": "boolean"
|
||||
},
|
||||
"required": ["provider"]
|
||||
},
|
||||
"ldap": {
|
||||
"description": "For LDAP Authentication and user synchronisation.",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"url": {
|
||||
"description": "URL of LDAP directory server.",
|
||||
"type": "string"
|
||||
},
|
||||
"user_base": {
|
||||
"description": "Base DN of user tree root.",
|
||||
"type": "string"
|
||||
},
|
||||
"search_dn": {
|
||||
"description": "DN for authenticating LDAP admin account with general read rights.",
|
||||
"type": "string"
|
||||
},
|
||||
"user_bind": {
|
||||
"description": "Expression used to authenticate users via LDAP bind. Must contain uid={username}.",
|
||||
"type": "string"
|
||||
},
|
||||
"user_filter": {
|
||||
"description": "Filter to extract users for syncing.",
|
||||
"type": "string"
|
||||
},
|
||||
"username_attr": {
|
||||
"description": "Attribute with full username. Default: gecos",
|
||||
"type": "string"
|
||||
},
|
||||
"sync_interval": {
|
||||
"description": "Interval used for syncing local user table with LDAP directory. Parsed using time.ParseDuration.",
|
||||
"type": "string"
|
||||
},
|
||||
"sync_del_old_users": {
|
||||
"description": "Delete obsolete users in database.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"syncUserOnLogin": {
|
||||
"description": "Add non-existent user to DB at login attempt if user exists in Ldap directory",
|
||||
"type": "boolean"
|
||||
}
|
||||
},
|
||||
"required": ["url", "user_base", "search_dn", "user_bind", "user_filter"]
|
||||
},
|
||||
"required": ["jwts"]
|
||||
}`
|
@@ -7,25 +7,123 @@ package config
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/schema"
|
||||
)
|
||||
|
||||
var Keys schema.ProgramConfig = schema.ProgramConfig{
|
||||
type ResampleConfig struct {
|
||||
// Array of resampling target resolutions, in seconds; Example: [600,300,60]
|
||||
Resolutions []int `json:"resolutions"`
|
||||
// Trigger next zoom level at less than this many visible datapoints
|
||||
Trigger int `json:"trigger"`
|
||||
}
|
||||
|
||||
// Format of the configuration (file). See below for the defaults.
|
||||
type ProgramConfig struct {
|
||||
// Address where the http (or https) server will listen on (for example: 'localhost:80').
|
||||
Addr string `json:"addr"`
|
||||
|
||||
// Addresses from which secured admin API endpoints can be reached, can be wildcard "*"
|
||||
ApiAllowedIPs []string `json:"apiAllowedIPs"`
|
||||
|
||||
// Drop root permissions once .env was read and the port was taken.
|
||||
User string `json:"user"`
|
||||
Group string `json:"group"`
|
||||
|
||||
// Disable authentication (for everything: API, Web-UI, ...)
|
||||
DisableAuthentication bool `json:"disable-authentication"`
|
||||
|
||||
// If `embed-static-files` is true (default), the frontend files are directly
|
||||
// embeded into the go binary and expected to be in web/frontend. Only if
|
||||
// it is false the files in `static-files` are served instead.
|
||||
EmbedStaticFiles bool `json:"embed-static-files"`
|
||||
StaticFiles string `json:"static-files"`
|
||||
|
||||
// 'sqlite3' or 'mysql' (mysql will work for mariadb as well)
|
||||
DBDriver string `json:"db-driver"`
|
||||
|
||||
// For sqlite3 a filename, for mysql a DSN in this format: https://github.com/go-sql-driver/mysql#dsn-data-source-name (Without query parameters!).
|
||||
DB string `json:"db"`
|
||||
|
||||
// Keep all metric data in the metric data repositories,
|
||||
// do not write to the job-archive.
|
||||
DisableArchive bool `json:"disable-archive"`
|
||||
|
||||
EnableJobTaggers bool `json:"enable-job-taggers"`
|
||||
|
||||
// Validate json input against schema
|
||||
Validate bool `json:"validate"`
|
||||
|
||||
// If 0 or empty, the session does not expire!
|
||||
SessionMaxAge string `json:"session-max-age"`
|
||||
|
||||
// If both those options are not empty, use HTTPS using those certificates.
|
||||
HttpsCertFile string `json:"https-cert-file"`
|
||||
HttpsKeyFile string `json:"https-key-file"`
|
||||
|
||||
// If not the empty string and `addr` does not end in ":80",
|
||||
// redirect every request incoming at port 80 to that url.
|
||||
RedirectHttpTo string `json:"redirect-http-to"`
|
||||
|
||||
// If overwritten, at least all the options in the defaults below must
|
||||
// be provided! Most options here can be overwritten by the user.
|
||||
UiDefaults map[string]any `json:"ui-defaults"`
|
||||
|
||||
// Where to store MachineState files
|
||||
MachineStateDir string `json:"machine-state-dir"`
|
||||
|
||||
// If not zero, automatically mark jobs as stopped running X seconds longer than their walltime.
|
||||
StopJobsExceedingWalltime int `json:"stop-jobs-exceeding-walltime"`
|
||||
|
||||
// Defines time X in seconds in which jobs are considered to be "short" and will be filtered in specific views.
|
||||
ShortRunningJobsDuration int `json:"short-running-jobs-duration"`
|
||||
|
||||
// Energy Mix CO2 Emission Constant [g/kWh]
|
||||
// If entered, displays estimated CO2 emission for job based on jobs totalEnergy
|
||||
EmissionConstant int `json:"emission-constant"`
|
||||
|
||||
// If exists, will enable dynamic zoom in frontend metric plots using the configured values
|
||||
EnableResampling *ResampleConfig `json:"resampling"`
|
||||
}
|
||||
|
||||
type IntRange struct {
|
||||
From int `json:"from"`
|
||||
To int `json:"to"`
|
||||
}
|
||||
|
||||
type TimeRange struct {
|
||||
From *time.Time `json:"from"`
|
||||
To *time.Time `json:"to"`
|
||||
Range string `json:"range,omitempty"`
|
||||
}
|
||||
|
||||
type FilterRanges struct {
|
||||
Duration *IntRange `json:"duration"`
|
||||
NumNodes *IntRange `json:"numNodes"`
|
||||
StartTime *TimeRange `json:"startTime"`
|
||||
}
|
||||
|
||||
type ClusterConfig struct {
|
||||
Name string `json:"name"`
|
||||
FilterRanges *FilterRanges `json:"filterRanges"`
|
||||
MetricDataRepository json.RawMessage `json:"metricDataRepository"`
|
||||
}
|
||||
|
||||
var Clusters []*ClusterConfig
|
||||
|
||||
var Keys ProgramConfig = ProgramConfig{
|
||||
Addr: "localhost:8080",
|
||||
DisableAuthentication: false,
|
||||
EmbedStaticFiles: true,
|
||||
DBDriver: "sqlite3",
|
||||
DB: "./var/job.db",
|
||||
Archive: json.RawMessage(`{\"kind\":\"file\",\"path\":\"./var/job-archive\"}`),
|
||||
DisableArchive: false,
|
||||
Validate: false,
|
||||
SessionMaxAge: "168h",
|
||||
StopJobsExceedingWalltime: 0,
|
||||
ShortRunningJobsDuration: 5 * 60,
|
||||
UiDefaults: map[string]interface{}{
|
||||
UiDefaults: map[string]any{
|
||||
"analysis_view_histogramMetrics": []string{"flops_any", "mem_bw", "mem_used"},
|
||||
"analysis_view_scatterPlotMetrics": [][]string{{"flops_any", "mem_bw"}, {"flops_any", "cpu_load"}, {"cpu_load", "mem_bw"}},
|
||||
"job_view_nodestats_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"},
|
||||
@@ -49,24 +147,22 @@ var Keys schema.ProgramConfig = schema.ProgramConfig{
|
||||
},
|
||||
}
|
||||
|
||||
func Init(flagConfigFile string) {
|
||||
raw, err := os.ReadFile(flagConfigFile)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
cclog.Abortf("Config Init: Could not read config file '%s'.\nError: %s\n", flagConfigFile, err.Error())
|
||||
}
|
||||
} else {
|
||||
if err := schema.Validate(schema.Config, bytes.NewReader(raw)); err != nil {
|
||||
cclog.Abortf("Config Init: Could not validate config file '%s'.\nError: %s\n", flagConfigFile, err.Error())
|
||||
}
|
||||
dec := json.NewDecoder(bytes.NewReader(raw))
|
||||
dec.DisallowUnknownFields()
|
||||
if err := dec.Decode(&Keys); err != nil {
|
||||
cclog.Abortf("Config Init: Could not decode config file '%s'.\nError: %s\n", flagConfigFile, err.Error())
|
||||
}
|
||||
func Init(mainConfig json.RawMessage, clusterConfig json.RawMessage) {
|
||||
Validate(configSchema, mainConfig)
|
||||
dec := json.NewDecoder(bytes.NewReader(mainConfig))
|
||||
dec.DisallowUnknownFields()
|
||||
if err := dec.Decode(&Keys); err != nil {
|
||||
cclog.Abortf("Config Init: Could not decode config file '%s'.\nError: %s\n", mainConfig, err.Error())
|
||||
}
|
||||
|
||||
if Keys.Clusters == nil || len(Keys.Clusters) < 1 {
|
||||
cclog.Abort("Config Init: At least one cluster required in config. Exited with error.")
|
||||
}
|
||||
Validate(clustersSchema, clusterConfig)
|
||||
dec = json.NewDecoder(bytes.NewReader(clusterConfig))
|
||||
dec.DisallowUnknownFields()
|
||||
if err := dec.Decode(&Clusters); err != nil {
|
||||
cclog.Abortf("Config Init: Could not decode config file '%s'.\nError: %s\n", mainConfig, err.Error())
|
||||
}
|
||||
|
||||
if Clusters == nil || len(Clusters) < 1 {
|
||||
cclog.Abort("Config Init: At least one cluster required in config. Exited with error.")
|
||||
}
|
||||
}
|
||||
|
@@ -6,11 +6,24 @@ package config
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
ccconf "github.com/ClusterCockpit/cc-lib/ccConfig"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
)
|
||||
|
||||
func TestInit(t *testing.T) {
|
||||
fp := "../../configs/config.json"
|
||||
Init(fp)
|
||||
ccconf.Init(fp)
|
||||
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {
|
||||
if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil {
|
||||
Init(cfg, clustercfg)
|
||||
} else {
|
||||
cclog.Abort("Cluster configuration must be present")
|
||||
}
|
||||
} else {
|
||||
cclog.Abort("Main configuration must be present")
|
||||
}
|
||||
|
||||
if Keys.Addr != "0.0.0.0:443" {
|
||||
t.Errorf("wrong addr\ngot: %s \nwant: 0.0.0.0:443", Keys.Addr)
|
||||
}
|
||||
@@ -18,7 +31,17 @@ func TestInit(t *testing.T) {
|
||||
|
||||
func TestInitMinimal(t *testing.T) {
|
||||
fp := "../../configs/config-demo.json"
|
||||
Init(fp)
|
||||
ccconf.Init(fp)
|
||||
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {
|
||||
if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil {
|
||||
Init(cfg, clustercfg)
|
||||
} else {
|
||||
cclog.Abort("Cluster configuration must be present")
|
||||
}
|
||||
} else {
|
||||
cclog.Abort("Main configuration must be present")
|
||||
}
|
||||
|
||||
if Keys.Addr != "127.0.0.1:8080" {
|
||||
t.Errorf("wrong addr\ngot: %s \nwant: 127.0.0.1:8080", Keys.Addr)
|
||||
}
|
||||
|
199
internal/config/schema.go
Normal file
199
internal/config/schema.go
Normal file
@@ -0,0 +1,199 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved. This file is part of cc-backend.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package config
|
||||
|
||||
var configSchema = `
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"addr": {
|
||||
"description": "Address where the http (or https) server will listen on (for example: 'localhost:80').",
|
||||
"type": "string"
|
||||
},
|
||||
"apiAllowedIPs": {
|
||||
"description": "Addresses from which secured API endpoints can be reached",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"user": {
|
||||
"description": "Drop root permissions once .env was read and the port was taken. Only applicable if using privileged port.",
|
||||
"type": "string"
|
||||
},
|
||||
"group": {
|
||||
"description": "Drop root permissions once .env was read and the port was taken. Only applicable if using privileged port.",
|
||||
"type": "string"
|
||||
},
|
||||
"disable-authentication": {
|
||||
"description": "Disable authentication (for everything: API, Web-UI, ...).",
|
||||
"type": "boolean"
|
||||
},
|
||||
"embed-static-files": {
|
||||
"description": "If all files in web/frontend/public should be served from within the binary itself (they are embedded) or not.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"static-files": {
|
||||
"description": "Folder where static assets can be found, if embed-static-files is false.",
|
||||
"type": "string"
|
||||
},
|
||||
"db": {
|
||||
"description": "For sqlite3 a filename, for mysql a DSN in this format: https://github.com/go-sql-driver/mysql#dsn-data-source-name (Without query parameters!).",
|
||||
"type": "string"
|
||||
},
|
||||
"disable-archive": {
|
||||
"description": "Keep all metric data in the metric data repositories, do not write to the job-archive.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"enable-job-taggers": {
|
||||
"description": "Turn on automatic application and jobclass taggers",
|
||||
"type": "boolean"
|
||||
},
|
||||
"validate": {
|
||||
"description": "Validate all input json documents against json schema.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"session-max-age": {
|
||||
"description": "Specifies for how long a session shall be valid as a string parsable by time.ParseDuration(). If 0 or empty, the session/token does not expire!",
|
||||
"type": "string"
|
||||
},
|
||||
"https-cert-file": {
|
||||
"description": "Filepath to SSL certificate. If also https-key-file is set use HTTPS using those certificates.",
|
||||
"type": "string"
|
||||
},
|
||||
"https-key-file": {
|
||||
"description": "Filepath to SSL key file. If also https-cert-file is set use HTTPS using those certificates.",
|
||||
"type": "string"
|
||||
},
|
||||
"redirect-http-to": {
|
||||
"description": "If not the empty string and addr does not end in :80, redirect every request incoming at port 80 to that url.",
|
||||
"type": "string"
|
||||
},
|
||||
"stop-jobs-exceeding-walltime": {
|
||||
"description": "If not zero, automatically mark jobs as stopped running X seconds longer than their walltime. Only applies if walltime is set for job.",
|
||||
"type": "integer"
|
||||
},
|
||||
"short-running-jobs-duration": {
|
||||
"description": "Do not show running jobs shorter than X seconds.",
|
||||
"type": "integer"
|
||||
},
|
||||
"emission-constant": {
|
||||
"description": ".",
|
||||
"type": "integer"
|
||||
},
|
||||
"cron-frequency": {
|
||||
"description": "Frequency of cron job workers.",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"duration-worker": {
|
||||
"description": "Duration Update Worker [Defaults to '5m']",
|
||||
"type": "string"
|
||||
},
|
||||
"footprint-worker": {
|
||||
"description": "Metric-Footprint Update Worker [Defaults to '10m']",
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"enable-resampling": {
|
||||
"description": "Enable dynamic zoom in frontend metric plots.",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"trigger": {
|
||||
"description": "Trigger next zoom level at less than this many visible datapoints.",
|
||||
"type": "integer"
|
||||
},
|
||||
"resolutions": {
|
||||
"description": "Array of resampling target resolutions, in seconds.",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": ["trigger", "resolutions"]
|
||||
}
|
||||
},
|
||||
"required": ["apiAllowedIPs"]
|
||||
}`
|
||||
|
||||
var clustersSchema = `
|
||||
{
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"description": "The name of the cluster.",
|
||||
"type": "string"
|
||||
},
|
||||
"metricDataRepository": {
|
||||
"description": "Type of the metric data repository for this cluster",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"kind": {
|
||||
"type": "string",
|
||||
"enum": ["influxdb", "prometheus", "cc-metric-store", "test"]
|
||||
},
|
||||
"url": {
|
||||
"type": "string"
|
||||
},
|
||||
"token": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": ["kind", "url"]
|
||||
},
|
||||
"filterRanges": {
|
||||
"description": "This option controls the slider ranges for the UI controls of numNodes, duration, and startTime.",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"numNodes": {
|
||||
"description": "UI slider range for number of nodes",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"from": {
|
||||
"type": "integer"
|
||||
},
|
||||
"to": {
|
||||
"type": "integer"
|
||||
}
|
||||
},
|
||||
"required": ["from", "to"]
|
||||
},
|
||||
"duration": {
|
||||
"description": "UI slider range for duration",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"from": {
|
||||
"type": "integer"
|
||||
},
|
||||
"to": {
|
||||
"type": "integer"
|
||||
}
|
||||
},
|
||||
"required": ["from", "to"]
|
||||
},
|
||||
"startTime": {
|
||||
"description": "UI slider range for start time",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"from": {
|
||||
"type": "string",
|
||||
"format": "date-time"
|
||||
},
|
||||
"to": {
|
||||
"type": "null"
|
||||
}
|
||||
},
|
||||
"required": ["from", "to"]
|
||||
}
|
||||
},
|
||||
"required": ["numNodes", "duration", "startTime"]
|
||||
}
|
||||
},
|
||||
"required": ["name", "metricDataRepository", "filterRanges"],
|
||||
"minItems": 1
|
||||
}}`
|
28
internal/config/validate.go
Normal file
28
internal/config/validate.go
Normal file
@@ -0,0 +1,28 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved. This file is part of cc-backend.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package config
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"github.com/santhosh-tekuri/jsonschema/v5"
|
||||
)
|
||||
|
||||
func Validate(schema string, instance json.RawMessage) {
|
||||
sch, err := jsonschema.CompileString("schema.json", schema)
|
||||
if err != nil {
|
||||
cclog.Fatalf("%#v", err)
|
||||
}
|
||||
|
||||
var v any
|
||||
if err := json.Unmarshal([]byte(instance), &v); err != nil {
|
||||
cclog.Fatal(err)
|
||||
}
|
||||
|
||||
if err = sch.Validate(v); err != nil {
|
||||
cclog.Fatalf("%#v", err)
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@@ -3,11 +3,13 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-lib/schema"
|
||||
)
|
||||
|
||||
@@ -58,13 +60,13 @@ type JobFilter struct {
|
||||
JobName *StringInput `json:"jobName,omitempty"`
|
||||
Cluster *StringInput `json:"cluster,omitempty"`
|
||||
Partition *StringInput `json:"partition,omitempty"`
|
||||
Duration *schema.IntRange `json:"duration,omitempty"`
|
||||
Duration *config.IntRange `json:"duration,omitempty"`
|
||||
Energy *FloatRange `json:"energy,omitempty"`
|
||||
MinRunningFor *int `json:"minRunningFor,omitempty"`
|
||||
NumNodes *schema.IntRange `json:"numNodes,omitempty"`
|
||||
NumAccelerators *schema.IntRange `json:"numAccelerators,omitempty"`
|
||||
NumHWThreads *schema.IntRange `json:"numHWThreads,omitempty"`
|
||||
StartTime *schema.TimeRange `json:"startTime,omitempty"`
|
||||
NumNodes *config.IntRange `json:"numNodes,omitempty"`
|
||||
NumAccelerators *config.IntRange `json:"numAccelerators,omitempty"`
|
||||
NumHWThreads *config.IntRange `json:"numHWThreads,omitempty"`
|
||||
StartTime *config.TimeRange `json:"startTime,omitempty"`
|
||||
State []schema.JobState `json:"state,omitempty"`
|
||||
MetricStats []*MetricStatItem `json:"metricStats,omitempty"`
|
||||
Exclusive *int `json:"exclusive,omitempty"`
|
||||
@@ -290,6 +292,20 @@ func (e Aggregate) MarshalGQL(w io.Writer) {
|
||||
fmt.Fprint(w, strconv.Quote(e.String()))
|
||||
}
|
||||
|
||||
func (e *Aggregate) UnmarshalJSON(b []byte) error {
|
||||
s, err := strconv.Unquote(string(b))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return e.UnmarshalGQL(s)
|
||||
}
|
||||
|
||||
func (e Aggregate) MarshalJSON() ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
e.MarshalGQL(&buf)
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
type SortByAggregate string
|
||||
|
||||
const (
|
||||
@@ -345,6 +361,20 @@ func (e SortByAggregate) MarshalGQL(w io.Writer) {
|
||||
fmt.Fprint(w, strconv.Quote(e.String()))
|
||||
}
|
||||
|
||||
func (e *SortByAggregate) UnmarshalJSON(b []byte) error {
|
||||
s, err := strconv.Unquote(string(b))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return e.UnmarshalGQL(s)
|
||||
}
|
||||
|
||||
func (e SortByAggregate) MarshalJSON() ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
e.MarshalGQL(&buf)
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
type SortDirectionEnum string
|
||||
|
||||
const (
|
||||
@@ -385,3 +415,17 @@ func (e *SortDirectionEnum) UnmarshalGQL(v any) error {
|
||||
func (e SortDirectionEnum) MarshalGQL(w io.Writer) {
|
||||
fmt.Fprint(w, strconv.Quote(e.String()))
|
||||
}
|
||||
|
||||
func (e *SortDirectionEnum) UnmarshalJSON(b []byte) error {
|
||||
s, err := strconv.Unquote(string(b))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return e.UnmarshalGQL(s)
|
||||
}
|
||||
|
||||
func (e SortDirectionEnum) MarshalJSON() ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
e.MarshalGQL(&buf)
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
@@ -2,7 +2,7 @@ package graph
|
||||
|
||||
// This file will be automatically regenerated based on the schema, any resolver implementations
|
||||
// will be copied through when generating and any unknown code will be moved to the end.
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.66
|
||||
// Code generated by github.com/99designs/gqlgen version v0.17.78
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -35,6 +35,11 @@ func (r *jobResolver) StartTime(ctx context.Context, obj *schema.Job) (*time.Tim
|
||||
return ×tamp, nil
|
||||
}
|
||||
|
||||
// Exclusive is the resolver for the exclusive field.
|
||||
func (r *jobResolver) Exclusive(ctx context.Context, obj *schema.Job) (int, error) {
|
||||
panic(fmt.Errorf("not implemented: Exclusive - exclusive"))
|
||||
}
|
||||
|
||||
// Tags is the resolver for the tags field.
|
||||
func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) {
|
||||
return r.Repo.GetTags(repository.GetUserFromContext(ctx), obj.ID)
|
||||
@@ -43,7 +48,7 @@ func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag,
|
||||
// ConcurrentJobs is the resolver for the concurrentJobs field.
|
||||
func (r *jobResolver) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) {
|
||||
// FIXME: Make the hardcoded duration configurable
|
||||
if obj.Exclusive != 1 && obj.Duration > 600 {
|
||||
if obj.Shared != "none" && obj.Duration > 600 {
|
||||
return r.Repo.FindConcurrentJobs(ctx, obj)
|
||||
}
|
||||
|
||||
|
@@ -43,7 +43,7 @@ func HandleImportFlag(flag string) error {
|
||||
dec := json.NewDecoder(bytes.NewReader(raw))
|
||||
dec.DisallowUnknownFields()
|
||||
job := schema.Job{
|
||||
Exclusive: 1,
|
||||
Shared: "none",
|
||||
MonitoringStatus: schema.MonitoringStatusRunningOrArchiving,
|
||||
}
|
||||
if err = dec.Decode(&job); err != nil {
|
||||
|
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/ClusterCockpit/cc-backend/internal/importer"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
ccconf "github.com/ClusterCockpit/cc-lib/ccConfig"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
)
|
||||
|
||||
@@ -36,18 +37,16 @@ func copyFile(s string, d string) error {
|
||||
|
||||
func setup(t *testing.T) *repository.JobRepository {
|
||||
const testconfig = `{
|
||||
"main": {
|
||||
"addr": "0.0.0.0:8080",
|
||||
"validate": false,
|
||||
"apiAllowedIPs": [
|
||||
"*"
|
||||
]},
|
||||
"archive": {
|
||||
"kind": "file",
|
||||
"path": "./var/job-archive"
|
||||
},
|
||||
"jwts": {
|
||||
"max-age": "2m"
|
||||
},
|
||||
"apiAllowedIPs": [
|
||||
"*"
|
||||
],
|
||||
"clusters": [
|
||||
{
|
||||
"name": "testcluster",
|
||||
@@ -108,7 +107,19 @@ func setup(t *testing.T) *repository.JobRepository {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
config.Init(cfgFilePath)
|
||||
ccconf.Init(cfgFilePath)
|
||||
|
||||
// Load and check main configuration
|
||||
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {
|
||||
if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil {
|
||||
config.Init(cfg, clustercfg)
|
||||
} else {
|
||||
t.Fatal("Cluster configuration must be present")
|
||||
}
|
||||
} else {
|
||||
t.Fatal("Main configuration must be present")
|
||||
}
|
||||
|
||||
archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive)
|
||||
|
||||
if err := archive.Init(json.RawMessage(archiveCfg), config.Keys.DisableArchive); err != nil {
|
||||
|
@@ -41,7 +41,7 @@ func LoadData(job *schema.Job,
|
||||
ctx context.Context,
|
||||
resolution int,
|
||||
) (schema.JobData, error) {
|
||||
data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ interface{}, ttl time.Duration, size int) {
|
||||
data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ any, ttl time.Duration, size int) {
|
||||
var jd schema.JobData
|
||||
var err error
|
||||
|
||||
|
@@ -40,7 +40,7 @@ type MetricDataRepository interface {
|
||||
var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{}
|
||||
|
||||
func Init() error {
|
||||
for _, cluster := range config.Keys.Clusters {
|
||||
for _, cluster := range config.Clusters {
|
||||
if cluster.MetricDataRepository != nil {
|
||||
var kind struct {
|
||||
Kind string `json:"kind"`
|
||||
|
@@ -74,7 +74,7 @@ func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) {
|
||||
if err := row.Scan(
|
||||
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster,
|
||||
&job.StartTime, &job.Partition, &job.ArrayJobId, &job.NumNodes, &job.NumHWThreads,
|
||||
&job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
|
||||
&job.NumAcc, &job.Shared, &job.MonitoringStatus, &job.SMT, &job.State,
|
||||
&job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint, &job.Energy); err != nil {
|
||||
cclog.Warnf("Error while scanning rows (Job): %v", err)
|
||||
return nil, err
|
||||
|
@@ -12,6 +12,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/schema"
|
||||
@@ -216,7 +217,7 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
|
||||
return query
|
||||
}
|
||||
|
||||
func buildIntCondition(field string, cond *schema.IntRange, query sq.SelectBuilder) sq.SelectBuilder {
|
||||
func buildIntCondition(field string, cond *config.IntRange, query sq.SelectBuilder) sq.SelectBuilder {
|
||||
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
|
||||
}
|
||||
|
||||
@@ -224,7 +225,7 @@ func buildFloatCondition(field string, cond *model.FloatRange, query sq.SelectBu
|
||||
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
|
||||
}
|
||||
|
||||
func buildTimeCondition(field string, cond *schema.TimeRange, query sq.SelectBuilder) sq.SelectBuilder {
|
||||
func buildTimeCondition(field string, cond *config.TimeRange, query sq.SelectBuilder) sq.SelectBuilder {
|
||||
if cond.From != nil && cond.To != nil {
|
||||
return query.Where(field+" BETWEEN ? AND ?", cond.From.Unix(), cond.To.Unix())
|
||||
} else if cond.From != nil {
|
||||
|
@@ -1,9 +1,10 @@
|
||||
CREATE TABLE "job_cache" (
|
||||
id INTEGER PRIMARY KEY,
|
||||
job_id BIGINT NOT NULL,
|
||||
cluster VARCHAR(255) NOT NULL,
|
||||
hpc_cluster VARCHAR(255) NOT NULL,
|
||||
subcluster VARCHAR(255) NOT NULL,
|
||||
start_time BIGINT NOT NULL, -- Unix timestamp
|
||||
submit_time BIGINT NOT NULL, -- Unix timestamp
|
||||
start_time BIGINT NOT NULL DEFAULT 0, -- Unix timestamp
|
||||
hpc_user VARCHAR(255) NOT NULL,
|
||||
project VARCHAR(255) NOT NULL,
|
||||
cluster_partition VARCHAR(255),
|
||||
@@ -12,8 +13,9 @@ CREATE TABLE "job_cache" (
|
||||
walltime INT NOT NULL,
|
||||
job_state VARCHAR(255) NOT NULL
|
||||
CHECK (job_state IN (
|
||||
'running', 'completed', 'failed', 'cancelled',
|
||||
'stopped', 'timeout', 'preempted', 'out_of_memory'
|
||||
'boot_fail', 'cancelled', 'completed', 'deadline',
|
||||
'failed', 'node_fail', 'out-of-memory', 'pending',
|
||||
'preempted', 'running', 'suspended', 'timeout'
|
||||
)),
|
||||
meta_data TEXT, -- JSON
|
||||
resources TEXT NOT NULL, -- JSON
|
||||
@@ -21,7 +23,8 @@ CREATE TABLE "job_cache" (
|
||||
num_hwthreads INT,
|
||||
num_acc INT,
|
||||
smt TINYINT NOT NULL DEFAULT 1 CHECK (smt IN (0, 1)),
|
||||
exclusive TINYINT NOT NULL DEFAULT 1 CHECK (exclusive IN (0, 1, 2)),
|
||||
shared TEXT NOT NULL
|
||||
CHECK (shared IN ("none", "single_user", "multi_user")),
|
||||
monitoring_status TINYINT NOT NULL DEFAULT 1
|
||||
CHECK (monitoring_status IN (0, 1, 2, 3)),
|
||||
energy REAL NOT NULL DEFAULT 0.0,
|
||||
@@ -29,3 +32,43 @@ CREATE TABLE "job_cache" (
|
||||
footprint TEXT DEFAULT NULL,
|
||||
UNIQUE (job_id, cluster, start_time)
|
||||
);
|
||||
|
||||
CREATE TABLE "job_new" (
|
||||
id INTEGER PRIMARY KEY,
|
||||
job_id BIGINT NOT NULL,
|
||||
hpc_cluster TEXT NOT NULL,
|
||||
subcluster TEXT NOT NULL,
|
||||
submit_time BIGINT NOT NULL DEFAULT 0, -- Unix timestamp
|
||||
start_time BIGINT NOT NULL DEFAULT 0, -- Unix timestamp
|
||||
hpc_user TEXT NOT NULL,
|
||||
project TEXT NOT NULL,
|
||||
cluster_partition TEXT,
|
||||
array_job_id BIGINT,
|
||||
duration INT NOT NULL,
|
||||
walltime INT NOT NULL,
|
||||
job_state TEXT NOT NULL
|
||||
CHECK (job_state IN (
|
||||
'boot_fail', 'cancelled', 'completed', 'deadline',
|
||||
'failed', 'node_fail', 'out-of-memory', 'pending',
|
||||
'preempted', 'running', 'suspended', 'timeout'
|
||||
)),
|
||||
meta_data TEXT, -- JSON
|
||||
resources TEXT NOT NULL, -- JSON
|
||||
num_nodes INT NOT NULL,
|
||||
num_hwthreads INT,
|
||||
num_acc INT,
|
||||
smt INT NOT NULL DEFAULT 1,
|
||||
shared TEXT NOT NULL
|
||||
CHECK (shared IN ("none", "single_user", "multi_user")),
|
||||
monitoring_status TINYINT NOT NULL DEFAULT 1
|
||||
CHECK (monitoring_status IN (0, 1, 2, 3)),
|
||||
energy REAL NOT NULL DEFAULT 0.0,
|
||||
energy_footprint TEXT DEFAULT NULL,
|
||||
footprint TEXT DEFAULT NULL,
|
||||
UNIQUE (job_id, cluster, start_time)
|
||||
);
|
||||
|
||||
ALTER TABLE job RENAME COLUMN cluster TO hpc_cluster;
|
||||
INSERT INTO job_new SELECT * FROM job;
|
||||
DROP TABLE job;
|
||||
ALTER TABLE job_new RENAME TO job;
|
||||
|
@@ -1,5 +1,6 @@
|
||||
CREATE TABLE "node" (
|
||||
id INTEGER PRIMARY KEY,
|
||||
time_stamp INTEGER NOT NULL,
|
||||
hostname VARCHAR(255) NOT NULL,
|
||||
cluster VARCHAR(255) NOT NULL,
|
||||
subcluster VARCHAR(255) NOT NULL,
|
||||
@@ -33,4 +34,4 @@ CREATE INDEX IF NOT EXISTS nodes_cluster_health ON node (cluster, health_state);
|
||||
|
||||
-- Add Indices For Increased Amounts of Tags
|
||||
CREATE INDEX IF NOT EXISTS tags_jobid ON jobtag (job_id);
|
||||
CREATE INDEX IF NOT EXISTS tags_tagid ON jobtag (tag_id);
|
||||
CREATE INDEX IF NOT EXISTS tags_tagid ON jobtag (tag_id);
|
||||
|
@@ -6,6 +6,7 @@ package repository
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"maps"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -24,7 +25,7 @@ var (
|
||||
type UserCfgRepo struct {
|
||||
DB *sqlx.DB
|
||||
Lookup *sqlx.Stmt
|
||||
uiDefaults map[string]interface{}
|
||||
uiDefaults map[string]any
|
||||
cache *lrucache.Cache
|
||||
lock sync.RWMutex
|
||||
}
|
||||
@@ -51,22 +52,18 @@ func GetUserCfgRepo() *UserCfgRepo {
|
||||
|
||||
// Return the personalised UI config for the currently authenticated
|
||||
// user or return the plain default config.
|
||||
func (uCfg *UserCfgRepo) GetUIConfig(user *schema.User) (map[string]interface{}, error) {
|
||||
func (uCfg *UserCfgRepo) GetUIConfig(user *schema.User) (map[string]any, error) {
|
||||
if user == nil {
|
||||
uCfg.lock.RLock()
|
||||
copy := make(map[string]interface{}, len(uCfg.uiDefaults))
|
||||
for k, v := range uCfg.uiDefaults {
|
||||
copy[k] = v
|
||||
}
|
||||
copy := make(map[string]any, len(uCfg.uiDefaults))
|
||||
maps.Copy(copy, uCfg.uiDefaults)
|
||||
uCfg.lock.RUnlock()
|
||||
return copy, nil
|
||||
}
|
||||
|
||||
data := uCfg.cache.Get(user.Username, func() (interface{}, time.Duration, int) {
|
||||
uiconfig := make(map[string]interface{}, len(uCfg.uiDefaults))
|
||||
for k, v := range uCfg.uiDefaults {
|
||||
uiconfig[k] = v
|
||||
}
|
||||
data := uCfg.cache.Get(user.Username, func() (any, time.Duration, int) {
|
||||
uiconfig := make(map[string]any, len(uCfg.uiDefaults))
|
||||
maps.Copy(uiconfig, uCfg.uiDefaults)
|
||||
|
||||
rows, err := uCfg.Lookup.Query(user.Username)
|
||||
if err != nil {
|
||||
@@ -83,7 +80,7 @@ func (uCfg *UserCfgRepo) GetUIConfig(user *schema.User) (map[string]interface{},
|
||||
return err, 0, 0
|
||||
}
|
||||
|
||||
var val interface{}
|
||||
var val any
|
||||
if err := json.Unmarshal([]byte(rawval), &val); err != nil {
|
||||
cclog.Warn("Error while unmarshaling raw user uiconfig json")
|
||||
return err, 0, 0
|
||||
@@ -104,7 +101,7 @@ func (uCfg *UserCfgRepo) GetUIConfig(user *schema.User) (map[string]interface{},
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return data.(map[string]interface{}), nil
|
||||
return data.(map[string]any), nil
|
||||
}
|
||||
|
||||
// If the context does not have a user, update the global ui configuration
|
||||
@@ -115,7 +112,7 @@ func (uCfg *UserCfgRepo) UpdateConfig(
|
||||
user *schema.User,
|
||||
) error {
|
||||
if user == nil {
|
||||
var val interface{}
|
||||
var val any
|
||||
if err := json.Unmarshal([]byte(value), &val); err != nil {
|
||||
cclog.Warn("Error while unmarshaling raw user config json")
|
||||
return err
|
||||
|
@@ -10,6 +10,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
ccconf "github.com/ClusterCockpit/cc-lib/ccConfig"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/schema"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
@@ -17,17 +18,16 @@ import (
|
||||
|
||||
func setupUserTest(t *testing.T) *UserCfgRepo {
|
||||
const testconfig = `{
|
||||
"addr": "0.0.0.0:8080",
|
||||
"main": {
|
||||
"addr": "0.0.0.0:8080",
|
||||
"apiAllowedIPs": [
|
||||
"*"
|
||||
]
|
||||
},
|
||||
"archive": {
|
||||
"kind": "file",
|
||||
"path": "./var/job-archive"
|
||||
},
|
||||
"jwts": {
|
||||
"max-age": "2m"
|
||||
},
|
||||
"apiAllowedIPs": [
|
||||
"*"
|
||||
],
|
||||
"clusters": [
|
||||
{
|
||||
"name": "testcluster",
|
||||
@@ -36,7 +36,8 @@ func setupUserTest(t *testing.T) *UserCfgRepo {
|
||||
"numNodes": { "from": 1, "to": 64 },
|
||||
"duration": { "from": 0, "to": 86400 },
|
||||
"startTime": { "from": "2022-01-01T00:00:00Z", "to": null }
|
||||
} } ]
|
||||
}
|
||||
}]
|
||||
}`
|
||||
|
||||
cclog.Init("info", true)
|
||||
@@ -53,7 +54,19 @@ func setupUserTest(t *testing.T) *UserCfgRepo {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
config.Init(cfgFilePath)
|
||||
ccconf.Init(cfgFilePath)
|
||||
|
||||
// Load and check main configuration
|
||||
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {
|
||||
if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil {
|
||||
config.Init(cfg, clustercfg)
|
||||
} else {
|
||||
t.Fatal("Cluster configuration must be present")
|
||||
}
|
||||
} else {
|
||||
t.Fatal("Main configuration must be present")
|
||||
}
|
||||
|
||||
return GetUserCfgRepo()
|
||||
}
|
||||
|
||||
|
@@ -240,13 +240,13 @@ func (t *JobClassTagger) Match(job *schema.Job) {
|
||||
|
||||
// Initialize environment
|
||||
env["job"] = map[string]any{
|
||||
"exclusive": job.Exclusive,
|
||||
"duration": job.Duration,
|
||||
"numCores": job.NumHWThreads,
|
||||
"numNodes": job.NumNodes,
|
||||
"jobState": job.State,
|
||||
"numAcc": job.NumAcc,
|
||||
"smt": job.SMT,
|
||||
"shared": job.Shared,
|
||||
"duration": job.Duration,
|
||||
"numCores": job.NumHWThreads,
|
||||
"numNodes": job.NumNodes,
|
||||
"jobState": job.State,
|
||||
"numAcc": job.NumAcc,
|
||||
"smt": job.SMT,
|
||||
}
|
||||
|
||||
// add metrics to env
|
||||
|
@@ -7,7 +7,6 @@ package taskManager
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
@@ -15,8 +14,8 @@ import (
|
||||
|
||||
func RegisterCommitJobService() {
|
||||
var frequency string
|
||||
if config.Keys.CronFrequency != nil && config.Keys.CronFrequency.CommitJobWorker != "" {
|
||||
frequency = config.Keys.CronFrequency.CommitJobWorker
|
||||
if Keys.CommitJobWorker != "" {
|
||||
frequency = Keys.CommitJobWorker
|
||||
} else {
|
||||
frequency = "2m"
|
||||
}
|
||||
|
@@ -5,19 +5,37 @@
|
||||
package taskManager
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/schema"
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
)
|
||||
|
||||
type Retention struct {
|
||||
Policy string `json:"policy"`
|
||||
Location string `json:"location"`
|
||||
Age int `json:"age"`
|
||||
IncludeDB bool `json:"includeDB"`
|
||||
}
|
||||
|
||||
type CronFrequency struct {
|
||||
// Duration Update Worker [Defaults to '2m']
|
||||
CommitJobWorker string `json:"commit-job-worker"`
|
||||
// Duration Update Worker [Defaults to '5m']
|
||||
DurationWorker string `json:"duration-worker"`
|
||||
// Metric-Footprint Update Worker [Defaults to '10m']
|
||||
FootprintWorker string `json:"footprint-worker"`
|
||||
}
|
||||
|
||||
var (
|
||||
s gocron.Scheduler
|
||||
jobRepo *repository.JobRepository
|
||||
Keys CronFrequency
|
||||
)
|
||||
|
||||
func parseDuration(s string) (time.Duration, error) {
|
||||
@@ -35,7 +53,7 @@ func parseDuration(s string) (time.Duration, error) {
|
||||
return interval, nil
|
||||
}
|
||||
|
||||
func Start() {
|
||||
func Start(cronCfg, archiveConfig json.RawMessage) {
|
||||
var err error
|
||||
jobRepo = repository.GetJobRepository()
|
||||
s, err = gocron.NewScheduler()
|
||||
@@ -47,13 +65,19 @@ func Start() {
|
||||
RegisterStopJobsExceedTime()
|
||||
}
|
||||
|
||||
dec := json.NewDecoder(bytes.NewReader(cronCfg))
|
||||
dec.DisallowUnknownFields()
|
||||
if err := dec.Decode(&Keys); err != nil {
|
||||
cclog.Errorf("error while decoding ldap config: %v", err)
|
||||
}
|
||||
|
||||
var cfg struct {
|
||||
Retention schema.Retention `json:"retention"`
|
||||
Compression int `json:"compression"`
|
||||
Retention Retention `json:"retention"`
|
||||
Compression int `json:"compression"`
|
||||
}
|
||||
cfg.Retention.IncludeDB = true
|
||||
|
||||
if err := json.Unmarshal(config.Keys.Archive, &cfg); err != nil {
|
||||
if err := json.Unmarshal(archiveConfig, &cfg); err != nil {
|
||||
cclog.Warn("Error while unmarshaling raw config json")
|
||||
}
|
||||
|
||||
@@ -73,7 +97,7 @@ func Start() {
|
||||
RegisterCompressionService(cfg.Compression)
|
||||
}
|
||||
|
||||
lc := config.Keys.LdapConfig
|
||||
lc := auth.Keys.LdapConfig
|
||||
|
||||
if lc != nil && lc.SyncInterval != "" {
|
||||
RegisterLdapSyncService(lc.SyncInterval)
|
||||
|
@@ -7,15 +7,14 @@ package taskManager
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
)
|
||||
|
||||
func RegisterUpdateDurationWorker() {
|
||||
var frequency string
|
||||
if config.Keys.CronFrequency != nil && config.Keys.CronFrequency.DurationWorker != "" {
|
||||
frequency = config.Keys.CronFrequency.DurationWorker
|
||||
if Keys.DurationWorker != "" {
|
||||
frequency = Keys.DurationWorker
|
||||
} else {
|
||||
frequency = "5m"
|
||||
}
|
||||
|
@@ -9,7 +9,6 @@ import (
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
@@ -20,8 +19,8 @@ import (
|
||||
|
||||
func RegisterFootprintWorker() {
|
||||
var frequency string
|
||||
if config.Keys.CronFrequency != nil && config.Keys.CronFrequency.FootprintWorker != "" {
|
||||
frequency = config.Keys.CronFrequency.FootprintWorker
|
||||
if Keys.FootprintWorker != "" {
|
||||
frequency = Keys.FootprintWorker
|
||||
} else {
|
||||
frequency = "10m"
|
||||
}
|
||||
|
Reference in New Issue
Block a user