Refactor main

Convert components to Singletons
Restructure main package
Reduce dependencies
This commit is contained in:
2024-07-16 12:08:10 +02:00
parent 01a4d33514
commit 801607fc16
14 changed files with 774 additions and 613 deletions

View File

@@ -19,6 +19,7 @@ import (
"testing"
"github.com/ClusterCockpit/cc-backend/internal/api"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
@@ -144,7 +145,6 @@ func setup(t *testing.T) *api.RestApi {
archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive)
repository.Connect("sqlite3", dbfilepath)
db := repository.GetConnection()
if err := archive.Init(json.RawMessage(archiveCfg), config.Keys.DisableArchive); err != nil {
t.Fatal(err)
@@ -154,13 +154,10 @@ func setup(t *testing.T) *api.RestApi {
t.Fatal(err)
}
jobRepo := repository.GetJobRepository()
resolver := &graph.Resolver{DB: db.DB, Repo: jobRepo}
auth.Init()
graph.Init()
return &api.RestApi{
JobRepository: resolver.Repo,
Resolver: resolver,
}
return api.New()
}
func cleanup() {
@@ -253,12 +250,13 @@ func TestRestApi(t *testing.T) {
t.Fatal(err)
}
job, err := restapi.Resolver.Query().Job(ctx, strconv.Itoa(int(res.DBID)))
resolver := graph.GetResolverInstance()
job, err := resolver.Query().Job(ctx, strconv.Itoa(int(res.DBID)))
if err != nil {
t.Fatal(err)
}
job.Tags, err = restapi.Resolver.Job().Tags(ctx, job)
job.Tags, err = resolver.Job().Tags(ctx, job)
if err != nil {
t.Fatal(err)
}
@@ -314,7 +312,8 @@ func TestRestApi(t *testing.T) {
}
restapi.JobRepository.WaitForArchiving()
job, err := restapi.Resolver.Query().Job(ctx, strconv.Itoa(int(dbid)))
resolver := graph.GetResolverInstance()
job, err := resolver.Query().Job(ctx, strconv.Itoa(int(dbid)))
if err != nil {
t.Fatal(err)
}

View File

@@ -53,12 +53,19 @@ import (
type RestApi struct {
JobRepository *repository.JobRepository
Resolver *graph.Resolver
Authentication *auth.Authentication
MachineStateDir string
RepositoryMutex sync.Mutex
}
func New() *RestApi {
return &RestApi{
JobRepository: repository.GetJobRepository(),
MachineStateDir: config.Keys.MachineStateDir,
Authentication: auth.GetAuthInstance(),
}
}
func (api *RestApi) MountApiRoutes(r *mux.Router) {
r.StrictSlash(true)
@@ -893,7 +900,6 @@ func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) {
}
job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
if err != nil {
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
return
@@ -977,7 +983,6 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request)
}
job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
if err != nil {
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw)
return
@@ -1105,7 +1110,8 @@ func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
} `json:"error"`
}
data, err := api.Resolver.Query().JobMetrics(r.Context(), id, metrics, scopes)
resolver := graph.GetResolverInstance()
data, err := resolver.Query().JobMetrics(r.Context(), id, metrics, scopes)
if err != nil {
json.NewEncoder(rw).Encode(Respone{
Error: &struct {

View File

@@ -12,6 +12,7 @@ import (
"errors"
"net/http"
"os"
"sync"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
@@ -26,6 +27,11 @@ type Authenticator interface {
Login(user *schema.User, rw http.ResponseWriter, r *http.Request) (*schema.User, error)
}
var (
initOnce sync.Once
authInstance *Authentication
)
type Authentication struct {
sessionStore *sessions.CookieStore
LdapAuth *LdapAuthenticator
@@ -62,71 +68,79 @@ func (auth *Authentication) AuthViaSession(
}, nil
}
func Init() (*Authentication, error) {
auth := &Authentication{}
func Init() {
initOnce.Do(func() {
authInstance = &Authentication{}
sessKey := os.Getenv("SESSION_KEY")
if sessKey == "" {
log.Warn("environment variable 'SESSION_KEY' not set (will use non-persistent random key)")
bytes := make([]byte, 32)
if _, err := rand.Read(bytes); err != nil {
log.Error("Error while initializing authentication -> failed to generate random bytes for session key")
return nil, err
}
auth.sessionStore = sessions.NewCookieStore(bytes)
} else {
bytes, err := base64.StdEncoding.DecodeString(sessKey)
if err != nil {
log.Error("Error while initializing authentication -> decoding session key failed")
return nil, err
}
auth.sessionStore = sessions.NewCookieStore(bytes)
}
if config.Keys.LdapConfig != nil {
ldapAuth := &LdapAuthenticator{}
if err := ldapAuth.Init(); err != nil {
log.Warn("Error while initializing authentication -> ldapAuth init failed")
sessKey := os.Getenv("SESSION_KEY")
if sessKey == "" {
log.Warn("environment variable 'SESSION_KEY' not set (will use non-persistent random key)")
bytes := make([]byte, 32)
if _, err := rand.Read(bytes); err != nil {
log.Fatal("Error while initializing authentication -> failed to generate random bytes for session key")
}
authInstance.sessionStore = sessions.NewCookieStore(bytes)
} else {
auth.LdapAuth = ldapAuth
auth.authenticators = append(auth.authenticators, auth.LdapAuth)
}
} else {
log.Info("Missing LDAP configuration: No LDAP support!")
}
if config.Keys.JwtConfig != nil {
auth.JwtAuth = &JWTAuthenticator{}
if err := auth.JwtAuth.Init(); err != nil {
log.Error("Error while initializing authentication -> jwtAuth init failed")
return nil, err
bytes, err := base64.StdEncoding.DecodeString(sessKey)
if err != nil {
log.Fatal("Error while initializing authentication -> decoding session key failed")
}
authInstance.sessionStore = sessions.NewCookieStore(bytes)
}
jwtSessionAuth := &JWTSessionAuthenticator{}
if err := jwtSessionAuth.Init(); err != nil {
log.Info("jwtSessionAuth init failed: No JWT login support!")
if d, err := time.ParseDuration(config.Keys.SessionMaxAge); err != nil {
authInstance.SessionMaxAge = d
}
if config.Keys.LdapConfig != nil {
ldapAuth := &LdapAuthenticator{}
if err := ldapAuth.Init(); err != nil {
log.Warn("Error while initializing authentication -> ldapAuth init failed")
} else {
authInstance.LdapAuth = ldapAuth
authInstance.authenticators = append(authInstance.authenticators, authInstance.LdapAuth)
}
} else {
auth.authenticators = append(auth.authenticators, jwtSessionAuth)
log.Info("Missing LDAP configuration: No LDAP support!")
}
jwtCookieSessionAuth := &JWTCookieSessionAuthenticator{}
if err := jwtCookieSessionAuth.Init(); err != nil {
log.Info("jwtCookieSessionAuth init failed: No JWT cookie login support!")
if config.Keys.JwtConfig != nil {
authInstance.JwtAuth = &JWTAuthenticator{}
if err := authInstance.JwtAuth.Init(); err != nil {
log.Fatal("Error while initializing authentication -> jwtAuth init failed")
}
jwtSessionAuth := &JWTSessionAuthenticator{}
if err := jwtSessionAuth.Init(); err != nil {
log.Info("jwtSessionAuth init failed: No JWT login support!")
} else {
authInstance.authenticators = append(authInstance.authenticators, jwtSessionAuth)
}
jwtCookieSessionAuth := &JWTCookieSessionAuthenticator{}
if err := jwtCookieSessionAuth.Init(); err != nil {
log.Info("jwtCookieSessionAuth init failed: No JWT cookie login support!")
} else {
authInstance.authenticators = append(authInstance.authenticators, jwtCookieSessionAuth)
}
} else {
auth.authenticators = append(auth.authenticators, jwtCookieSessionAuth)
log.Info("Missing JWT configuration: No JWT token support!")
}
} else {
log.Info("Missing JWT configuration: No JWT token support!")
authInstance.LocalAuth = &LocalAuthenticator{}
if err := authInstance.LocalAuth.Init(); err != nil {
log.Fatal("Error while initializing authentication -> localAuth init failed")
}
authInstance.authenticators = append(authInstance.authenticators, authInstance.LocalAuth)
})
}
func GetAuthInstance() *Authentication {
if authInstance == nil {
log.Fatal("Authentication module not initialized!")
}
auth.LocalAuth = &LocalAuthenticator{}
if err := auth.LocalAuth.Init(); err != nil {
log.Error("Error while initializing authentication -> localAuth init failed")
return nil, err
}
auth.authenticators = append(auth.authenticators, auth.LocalAuth)
return auth, nil
return authInstance
}
func persistUser(user *schema.User) {

View File

@@ -10,7 +10,6 @@ import (
"net/http"
"os"
"strings"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/repository"
@@ -34,33 +33,6 @@ func (la *LdapAuthenticator) Init() error {
lc := config.Keys.LdapConfig
if lc.SyncInterval != "" {
interval, err := time.ParseDuration(lc.SyncInterval)
if err != nil {
log.Warnf("Could not parse duration for sync interval: %v",
lc.SyncInterval)
return err
}
if interval == 0 {
log.Info("Sync interval is zero")
return nil
}
go func() {
ticker := time.NewTicker(interval)
for t := range ticker.C {
log.Printf("sync started at %s", t.Format(time.RFC3339))
if err := la.Sync(); err != nil {
log.Errorf("sync failed: %s", err.Error())
}
log.Print("sync done")
}
}()
} else {
log.Info("LDAP configuration key sync_interval invalid")
}
if lc.UserAttr != "" {
la.UserAttr = lc.UserAttr
} else {

View File

@@ -1,15 +1,39 @@
package graph
import (
"sync"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/jmoiron/sqlx"
)
// This file will not be regenerated automatically.
//
// It serves as dependency injection for your app, add any dependencies you require here.
var (
initOnce sync.Once
resolverInstance *Resolver
)
type Resolver struct {
DB *sqlx.DB
Repo *repository.JobRepository
}
func Init() {
initOnce.Do(func() {
db := repository.GetConnection()
resolverInstance = &Resolver{
DB: db.DB, Repo: repository.GetJobRepository(),
}
})
}
func GetResolverInstance() *Resolver {
if resolverInstance == nil {
log.Fatal("Authentication module not initialized!")
}
return resolverInstance
}

View File

@@ -0,0 +1,41 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskManager
import (
"time"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/go-co-op/gocron/v2"
)
func RegisterCompressionService(compressOlderThan int) {
log.Info("Register compression service")
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(05, 0, 0))),
gocron.NewTask(
func() {
var jobs []*schema.Job
var err error
ar := archive.GetHandle()
startTime := time.Now().Unix() - int64(compressOlderThan*24*3600)
lastTime := ar.CompressLast(startTime)
if startTime == lastTime {
log.Info("Compression Service - Complete archive run")
jobs, err = jobRepo.FindJobsBetween(0, startTime)
} else {
jobs, err = jobRepo.FindJobsBetween(lastTime, startTime)
}
if err != nil {
log.Warnf("Error while looking for compression jobs: %v", err)
}
ar.Compress(jobs)
}))
}

View File

@@ -0,0 +1,36 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package taskManager
import (
"time"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/go-co-op/gocron/v2"
)
func RegisterLdapSyncService(ds string) {
interval, err := parseDuration(ds)
if err != nil {
log.Warnf("Could not parse duration for sync interval: %v",
ds)
return
}
auth := auth.GetAuthInstance()
log.Info("Register LDAP sync service")
s.NewJob(gocron.DurationJob(interval),
gocron.NewTask(
func() {
t := time.Now()
log.Printf("ldap sync started at %s", t.Format(time.RFC3339))
if err := auth.LdapAuth.Sync(); err != nil {
log.Errorf("ldap sync failed: %s", err.Error())
}
log.Print("ldap sync done")
}))
}

View File

@@ -9,62 +9,59 @@ import (
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/go-co-op/gocron/v2"
)
func RegisterRetentionService(cfg schema.Retention) {
switch cfg.Policy {
case "delete":
func RegisterRetentionDeleteService(age int, includeDB bool) {
log.Info("Register retention delete service")
log.Info("Register retention delete service")
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(04, 0, 0))),
gocron.NewTask(
func() {
startTime := time.Now().Unix() - int64(age*24*3600)
jobs, err := jobRepo.FindJobsBetween(0, startTime)
if err != nil {
log.Warnf("Error while looking for retention jobs: %s", err.Error())
}
archive.GetHandle().CleanUp(jobs)
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(04, 0, 0))),
gocron.NewTask(
func() {
startTime := time.Now().Unix() - int64(cfg.Age*24*3600)
jobs, err := jobRepo.FindJobsBetween(0, startTime)
if includeDB {
cnt, err := jobRepo.DeleteJobsBefore(startTime)
if err != nil {
log.Warnf("Error while looking for retention jobs: %s", err.Error())
log.Errorf("Error while deleting retention jobs from db: %s", err.Error())
} else {
log.Infof("Retention: Removed %d jobs from db", cnt)
}
archive.GetHandle().CleanUp(jobs)
if cfg.IncludeDB {
cnt, err := jobRepo.DeleteJobsBefore(startTime)
if err != nil {
log.Errorf("Error while deleting retention jobs from db: %s", err.Error())
} else {
log.Infof("Retention: Removed %d jobs from db", cnt)
}
if err = jobRepo.Optimize(); err != nil {
log.Errorf("Error occured in db optimization: %s", err.Error())
}
if err = jobRepo.Optimize(); err != nil {
log.Errorf("Error occured in db optimization: %s", err.Error())
}
}))
case "move":
log.Info("Register retention move service")
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(04, 0, 0))),
gocron.NewTask(
func() {
startTime := time.Now().Unix() - int64(cfg.Age*24*3600)
jobs, err := jobRepo.FindJobsBetween(0, startTime)
if err != nil {
log.Warnf("Error while looking for retention jobs: %s", err.Error())
}
archive.GetHandle().Move(jobs, cfg.Location)
if cfg.IncludeDB {
cnt, err := jobRepo.DeleteJobsBefore(startTime)
if err != nil {
log.Errorf("Error while deleting retention jobs from db: %v", err)
} else {
log.Infof("Retention: Removed %d jobs from db", cnt)
}
if err = jobRepo.Optimize(); err != nil {
log.Errorf("Error occured in db optimization: %v", err)
}
}
}))
}
}
}))
}
func RegisterRetentionMoveService(age int, includeDB bool, location string) {
log.Info("Register retention move service")
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(04, 0, 0))),
gocron.NewTask(
func() {
startTime := time.Now().Unix() - int64(age*24*3600)
jobs, err := jobRepo.FindJobsBetween(0, startTime)
if err != nil {
log.Warnf("Error while looking for retention jobs: %s", err.Error())
}
archive.GetHandle().Move(jobs, location)
if includeDB {
cnt, err := jobRepo.DeleteJobsBefore(startTime)
if err != nil {
log.Errorf("Error while deleting retention jobs from db: %v", err)
} else {
log.Infof("Retention: Removed %d jobs from db", cnt)
}
if err = jobRepo.Optimize(); err != nil {
log.Errorf("Error occured in db optimization: %v", err)
}
}
}))
}

View File

@@ -5,8 +5,13 @@
package taskManager
import (
"encoding/json"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/go-co-op/gocron/v2"
)
@@ -15,13 +20,66 @@ var (
jobRepo *repository.JobRepository
)
func init() {
func parseDuration(s string) (time.Duration, error) {
interval, err := time.ParseDuration(s)
if err != nil {
log.Warnf("Could not parse duration for sync interval: %v",
s)
return 0, err
}
if interval == 0 {
log.Info("TaskManager: Sync interval is zero")
}
return interval, nil
}
func Start() {
var err error
jobRepo = repository.GetJobRepository()
s, err = gocron.NewScheduler()
if err != nil {
log.Fatalf("Error while creating gocron scheduler: %s", err.Error())
}
if config.Keys.StopJobsExceedingWalltime > 0 {
RegisterStopJobsExceedTime()
}
var cfg struct {
Retention schema.Retention `json:"retention"`
Compression int `json:"compression"`
}
cfg.Retention.IncludeDB = true
if err := json.Unmarshal(config.Keys.Archive, &cfg); err != nil {
log.Warn("Error while unmarshaling raw config json")
}
switch cfg.Retention.Policy {
case "delete":
RegisterRetentionDeleteService(
cfg.Retention.Age,
cfg.Retention.IncludeDB)
case "move":
RegisterRetentionMoveService(
cfg.Retention.Age,
cfg.Retention.IncludeDB,
cfg.Retention.Location)
}
if cfg.Compression > 0 {
RegisterCompressionService(cfg.Compression)
}
lc := config.Keys.LdapConfig
if lc.SyncInterval != "" {
RegisterLdapSyncService(lc.SyncInterval)
}
s.Start()
}
func Shutdown() {