From 1b72b0b5ad320bc86c3dde9837ade05e7c731d7f Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 5 Jun 2026 10:16:28 +0200 Subject: [PATCH] Fix critical/severe issues in init, startup and shutdown - auth: do not abort the server when authentication is disabled. auth.Init is now always called; with disable-authentication it sets up an ephemeral session store (SESSION_KEY not required) and registers no authenticators, so the unconditional auth.GetAuthInstance() callers (server init, api.New()) always get a valid instance. - main: run the graceful-shutdown sequence on the startup-error path. runServer derives a cancelable context and, on a server-start failure, cancels it and waits so the metricstore final checkpoint / WAL rotation, archiver flush and taskmanager shutdown actually run before exit. - server: log the :80 HTTP->HTTPS redirect listener error instead of dropping it. - archiver: guard Shutdown against being called when Start never ran (avoids close(nil) panic / blocking on a nil workerDone). - nats API: stop worker goroutines on shutdown via a stop channel + idempotent Shutdown(); workers and subscription callbacks select on stop and the channels are never closed, so no send-on-closed-channel can occur. Wired into Server.Shutdown after the NATS client is closed. - metricstore: make Shutdown idempotent (nil shutdownFunc, early return) and release shutdownFuncMu before the checkpoint write. Co-Authored-By: Claude Opus 4.8 (1M context) Entire-Checkpoint: 3c179f9caa8f --- cmd/cc-backend/main.go | 46 +++++++++++++++++++++--------- cmd/cc-backend/server.go | 9 +++++- internal/api/nats.go | 44 ++++++++++++++++++++++++---- internal/archiver/archiveWorker.go | 7 +++++ internal/auth/auth.go | 34 ++++++++++++++++------ pkg/metricstore/metricstore.go | 11 +++++-- 6 files changed, 119 insertions(+), 32 deletions(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index f8f0a767..b950e473 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -172,14 +172,20 @@ func handleUserCommands() error { return fmt.Errorf("--add-user and --del-user can only be used if authentication is enabled") } - if !config.Keys.DisableAuthentication { - if cfg := ccconf.GetPackageConfig("auth"); cfg != nil { - auth.Init(&cfg) - } else { - cclog.Warn("Authentication disabled due to missing configuration") - auth.Init(nil) + // Always initialize the auth subsystem so the HTTP server and REST API have a + // valid (non-nil) auth instance, even when authentication is disabled. With + // authentication disabled, Init only sets up an ephemeral session store and + // registers no authenticators (see auth.Init). + if cfg := ccconf.GetPackageConfig("auth"); cfg != nil { + auth.Init(&cfg) + } else { + if !config.Keys.DisableAuthentication { + cclog.Warn("Authentication enabled but no auth configuration found") } + auth.Init(nil) + } + if !config.Keys.DisableAuthentication { // Check for default security keys checkDefaultSecurityKeys() @@ -337,6 +343,12 @@ func initSubsystems() error { } func runServer(ctx context.Context) error { + // Derive a cancelable context so the startup-error path below can trigger the + // same graceful-shutdown sequence as a signal (via the signal handler that + // waits on ctx.Done()). + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup // Initialize metric store if configuration is provided @@ -438,26 +450,32 @@ func runServer(ctx context.Context) error { // Wait for either: // 1. An error from server startup // 2. Completion of all goroutines (normal shutdown or crash) + var runErr error select { - case err := <-errChan: + case runErr = <-errChan: // errChan will be closed when waitDone is closed, which happens // when all goroutines complete (either from normal shutdown or error) - if err != nil { - return err - } case <-time.After(100 * time.Millisecond): // Give the server 100ms to start and report any immediate startup errors // After that, just wait for normal shutdown completion select { - case err := <-errChan: - if err != nil { - return err - } + case runErr = <-errChan: case <-waitDone: // Normal shutdown completed } } + if runErr != nil { + // A subsystem failed (e.g. the HTTP server could not bind). Trigger the + // graceful-shutdown path for the subsystems that were already started + // (metricstore checkpoint, archiver flush, taskmanager) by cancelling the + // context the signal handler waits on, then wait for it to finish so we + // don't exit before the final checkpoint is written. + cancel() + <-waitDone + return runErr + } + cclog.Print("Graceful shutdown completed!") return nil } diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index f39cd45d..4a9e71b1 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -415,7 +415,9 @@ func (s *Server) Start(ctx context.Context) error { if !strings.HasSuffix(config.Keys.Addr, ":80") && config.Keys.RedirectHTTPTo != "" { go func() { - http.ListenAndServe(":80", http.RedirectHandler(config.Keys.RedirectHTTPTo, http.StatusMovedPermanently)) + if err := http.ListenAndServe(":80", http.RedirectHandler(config.Keys.RedirectHTTPTo, http.StatusMovedPermanently)); err != nil { + cclog.Errorf("HTTP-to-HTTPS redirect listener on :80 failed: %v", err) + } }() } @@ -460,6 +462,11 @@ func (s *Server) Shutdown(ctx context.Context) { if nc != nil { nc.Close() } + // Stop the NATS API worker goroutines after the client is closed (no more + // subscription callbacks can enqueue once the connection is down). + if s.natsAPIHandle != nil { + s.natsAPIHandle.Shutdown() + } cclog.Infof("Shutdown: NATS closed (%v)", time.Since(natsStart)) httpStart := time.Now() diff --git a/internal/api/nats.go b/internal/api/nats.go index 0c967b83..7de85785 100644 --- a/internal/api/nats.go +++ b/internal/api/nats.go @@ -78,6 +78,12 @@ type NatsAPI struct { jobCh chan natsMessage // nodeCh receives node state messages for processing by worker goroutines. nodeCh chan natsMessage + // stop signals worker goroutines and subscription callbacks to stop. + // Closing it (via Shutdown) makes workers exit and callbacks drop further + // messages instead of blocking; the channels are never closed so in-flight + // callbacks can never send on a closed channel. + stop chan struct{} + stopOnce sync.Once } // NewNatsAPI creates a new NatsAPI instance with channel-based worker pools. @@ -99,6 +105,7 @@ func NewNatsAPI() *NatsAPI { JobRepository: repository.GetJobRepository(), jobCh: make(chan natsMessage, jobConc), nodeCh: make(chan natsMessage, nodeConc), + stop: make(chan struct{}), } // Start worker goroutines @@ -112,17 +119,36 @@ func NewNatsAPI() *NatsAPI { return api } +// Shutdown stops the worker goroutines and tells subscription callbacks to stop +// enqueueing. It is safe to call multiple times. Callers must ensure the NATS +// client is closed first so no new callbacks are invoked. +func (api *NatsAPI) Shutdown() { + api.stopOnce.Do(func() { + close(api.stop) + }) +} + // jobWorker processes job event messages from the job channel. func (api *NatsAPI) jobWorker() { - for msg := range api.jobCh { - api.handleJobEvent(msg.subject, msg.data) + for { + select { + case <-api.stop: + return + case msg := <-api.jobCh: + api.handleJobEvent(msg.subject, msg.data) + } } } // nodeWorker processes node state messages from the node channel. func (api *NatsAPI) nodeWorker() { - for msg := range api.nodeCh { - api.handleNodeState(msg.subject, msg.data) + for { + select { + case <-api.stop: + return + case msg := <-api.nodeCh: + api.handleNodeState(msg.subject, msg.data) + } } } @@ -140,13 +166,19 @@ func (api *NatsAPI) StartSubscriptions() error { s := config.Keys.APISubjects if err := client.Subscribe(s.SubjectJobEvent, func(subject string, data []byte) { - api.jobCh <- natsMessage{subject: subject, data: data} + select { + case api.jobCh <- natsMessage{subject: subject, data: data}: + case <-api.stop: + } }); err != nil { return err } if err := client.Subscribe(s.SubjectNodeState, func(subject string, data []byte) { - api.nodeCh <- natsMessage{subject: subject, data: data} + select { + case api.nodeCh <- natsMessage{subject: subject, data: data}: + case <-api.stop: + } }); err != nil { return err } diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go index 0639757d..d97a14f3 100644 --- a/internal/archiver/archiveWorker.go +++ b/internal/archiver/archiveWorker.go @@ -222,6 +222,13 @@ func TriggerArchiving(job *schema.Job) { func Shutdown(timeout time.Duration) error { cclog.Info("Initiating archiver shutdown...") + // Guard against Shutdown being called when Start was never run: closing a nil + // channel and receiving from a nil workerDone would panic/block forever. + if archiveChannel == nil { + cclog.Warn("Archiver shutdown called but archiver was never started") + return nil + } + // Close channel to signal no more jobs will be accepted close(archiveChannel) diff --git a/internal/auth/auth.go b/internal/auth/auth.go index 8d8bc222..4bfd4846 100644 --- a/internal/auth/auth.go +++ b/internal/auth/auth.go @@ -9,6 +9,7 @@ package auth import ( "bytes" "context" + "crypto/rand" "database/sql" "encoding/base64" "encoding/json" @@ -187,20 +188,37 @@ func Init(authCfg *json.RawMessage) { sessKey := os.Getenv("SESSION_KEY") if sessKey == "" { - cclog.Fatal("environment variable 'SESSION_KEY' not set: refusing to start with an ephemeral session key. " + - "Set SESSION_KEY in .env (base64-encoded 32 random bytes); a random key would invalidate all sessions on every restart " + - "and prevent sessions from validating across replicas.") + if !config.Keys.DisableAuthentication { + cclog.Fatal("environment variable 'SESSION_KEY' not set: refusing to start with an ephemeral session key. " + + "Set SESSION_KEY in .env (base64-encoded 32 random bytes); a random key would invalidate all sessions on every restart " + + "and prevent sessions from validating across replicas.") + } + // Authentication is disabled: no user sessions are issued, so an + // ephemeral random key is sufficient and SESSION_KEY is not required. + ephemeralKey := make([]byte, 32) + if _, err := rand.Read(ephemeralKey); err != nil { + cclog.Fatalf("Error while initializing authentication -> generating ephemeral session key failed: %v", err) + } + authInstance.sessionStore = sessions.NewCookieStore(ephemeralKey) + } else { + keyBytes, err := base64.StdEncoding.DecodeString(sessKey) + if err != nil { + cclog.Fatal("Error while initializing authentication -> decoding session key failed") + } + authInstance.sessionStore = sessions.NewCookieStore(keyBytes) } - keyBytes, err := base64.StdEncoding.DecodeString(sessKey) - if err != nil { - cclog.Fatal("Error while initializing authentication -> decoding session key failed") - } - authInstance.sessionStore = sessions.NewCookieStore(keyBytes) if d, err := time.ParseDuration(config.Keys.SessionMaxAge); err == nil { authInstance.SessionMaxAge = d } + // When authentication is disabled no authenticators are required; the + // session store created above is enough for the server to run with a + // valid (non-nil) auth instance. + if config.Keys.DisableAuthentication { + return + } + if authCfg == nil { return } diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 703c470e..8a73cde9 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -275,10 +275,15 @@ func Shutdown() { totalStart := time.Now() shutdownFuncMu.Lock() - defer shutdownFuncMu.Unlock() - if shutdownFunc != nil { - shutdownFunc() + if shutdownFunc == nil { + // Already shut down (or never initialized): nothing to do. This keeps + // Shutdown idempotent so it is safe to call from more than one path. + shutdownFuncMu.Unlock() + return } + shutdownFunc() + shutdownFunc = nil + shutdownFuncMu.Unlock() cclog.Infof("[METRICSTORE]> Background workers cancelled (%v)", time.Since(totalStart)) if Keys.Checkpoints.FileFormat == "wal" {