Fix critical/severe issues in init, startup and shutdown

- auth: do not abort the server when authentication is disabled. auth.Init
  is now always called; with disable-authentication it sets up an ephemeral
  session store (SESSION_KEY not required) and registers no authenticators,
  so the unconditional auth.GetAuthInstance() callers (server init,
  api.New()) always get a valid instance.
- main: run the graceful-shutdown sequence on the startup-error path. runServer
  derives a cancelable context and, on a server-start failure, cancels it and
  waits so the metricstore final checkpoint / WAL rotation, archiver flush and
  taskmanager shutdown actually run before exit.
- server: log the :80 HTTP->HTTPS redirect listener error instead of dropping it.
- archiver: guard Shutdown against being called when Start never ran
  (avoids close(nil) panic / blocking on a nil workerDone).
- nats API: stop worker goroutines on shutdown via a stop channel + idempotent
  Shutdown(); workers and subscription callbacks select on stop and the
  channels are never closed, so no send-on-closed-channel can occur. Wired
  into Server.Shutdown after the NATS client is closed.
- metricstore: make Shutdown idempotent (nil shutdownFunc, early return) and
  release shutdownFuncMu before the checkpoint write.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Entire-Checkpoint: 3c179f9caa8f
This commit is contained in:
2026-06-05 10:16:28 +02:00
parent 56ae1de011
commit 1b72b0b5ad
6 changed files with 119 additions and 32 deletions

View File

@@ -172,14 +172,20 @@ func handleUserCommands() error {
return fmt.Errorf("--add-user and --del-user can only be used if authentication is enabled") return fmt.Errorf("--add-user and --del-user can only be used if authentication is enabled")
} }
if !config.Keys.DisableAuthentication { // Always initialize the auth subsystem so the HTTP server and REST API have a
// valid (non-nil) auth instance, even when authentication is disabled. With
// authentication disabled, Init only sets up an ephemeral session store and
// registers no authenticators (see auth.Init).
if cfg := ccconf.GetPackageConfig("auth"); cfg != nil { if cfg := ccconf.GetPackageConfig("auth"); cfg != nil {
auth.Init(&cfg) auth.Init(&cfg)
} else { } else {
cclog.Warn("Authentication disabled due to missing configuration") if !config.Keys.DisableAuthentication {
cclog.Warn("Authentication enabled but no auth configuration found")
}
auth.Init(nil) auth.Init(nil)
} }
if !config.Keys.DisableAuthentication {
// Check for default security keys // Check for default security keys
checkDefaultSecurityKeys() checkDefaultSecurityKeys()
@@ -337,6 +343,12 @@ func initSubsystems() error {
} }
func runServer(ctx context.Context) error { func runServer(ctx context.Context) error {
// Derive a cancelable context so the startup-error path below can trigger the
// same graceful-shutdown sequence as a signal (via the signal handler that
// waits on ctx.Done()).
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup var wg sync.WaitGroup
// Initialize metric store if configuration is provided // Initialize metric store if configuration is provided
@@ -438,26 +450,32 @@ func runServer(ctx context.Context) error {
// Wait for either: // Wait for either:
// 1. An error from server startup // 1. An error from server startup
// 2. Completion of all goroutines (normal shutdown or crash) // 2. Completion of all goroutines (normal shutdown or crash)
var runErr error
select { select {
case err := <-errChan: case runErr = <-errChan:
// errChan will be closed when waitDone is closed, which happens // errChan will be closed when waitDone is closed, which happens
// when all goroutines complete (either from normal shutdown or error) // when all goroutines complete (either from normal shutdown or error)
if err != nil {
return err
}
case <-time.After(100 * time.Millisecond): case <-time.After(100 * time.Millisecond):
// Give the server 100ms to start and report any immediate startup errors // Give the server 100ms to start and report any immediate startup errors
// After that, just wait for normal shutdown completion // After that, just wait for normal shutdown completion
select { select {
case err := <-errChan: case runErr = <-errChan:
if err != nil {
return err
}
case <-waitDone: case <-waitDone:
// Normal shutdown completed // Normal shutdown completed
} }
} }
if runErr != nil {
// A subsystem failed (e.g. the HTTP server could not bind). Trigger the
// graceful-shutdown path for the subsystems that were already started
// (metricstore checkpoint, archiver flush, taskmanager) by cancelling the
// context the signal handler waits on, then wait for it to finish so we
// don't exit before the final checkpoint is written.
cancel()
<-waitDone
return runErr
}
cclog.Print("Graceful shutdown completed!") cclog.Print("Graceful shutdown completed!")
return nil return nil
} }

View File

@@ -415,7 +415,9 @@ func (s *Server) Start(ctx context.Context) error {
if !strings.HasSuffix(config.Keys.Addr, ":80") && config.Keys.RedirectHTTPTo != "" { if !strings.HasSuffix(config.Keys.Addr, ":80") && config.Keys.RedirectHTTPTo != "" {
go func() { go func() {
http.ListenAndServe(":80", http.RedirectHandler(config.Keys.RedirectHTTPTo, http.StatusMovedPermanently)) if err := http.ListenAndServe(":80", http.RedirectHandler(config.Keys.RedirectHTTPTo, http.StatusMovedPermanently)); err != nil {
cclog.Errorf("HTTP-to-HTTPS redirect listener on :80 failed: %v", err)
}
}() }()
} }
@@ -460,6 +462,11 @@ func (s *Server) Shutdown(ctx context.Context) {
if nc != nil { if nc != nil {
nc.Close() nc.Close()
} }
// Stop the NATS API worker goroutines after the client is closed (no more
// subscription callbacks can enqueue once the connection is down).
if s.natsAPIHandle != nil {
s.natsAPIHandle.Shutdown()
}
cclog.Infof("Shutdown: NATS closed (%v)", time.Since(natsStart)) cclog.Infof("Shutdown: NATS closed (%v)", time.Since(natsStart))
httpStart := time.Now() httpStart := time.Now()

View File

@@ -78,6 +78,12 @@ type NatsAPI struct {
jobCh chan natsMessage jobCh chan natsMessage
// nodeCh receives node state messages for processing by worker goroutines. // nodeCh receives node state messages for processing by worker goroutines.
nodeCh chan natsMessage nodeCh chan natsMessage
// stop signals worker goroutines and subscription callbacks to stop.
// Closing it (via Shutdown) makes workers exit and callbacks drop further
// messages instead of blocking; the channels are never closed so in-flight
// callbacks can never send on a closed channel.
stop chan struct{}
stopOnce sync.Once
} }
// NewNatsAPI creates a new NatsAPI instance with channel-based worker pools. // NewNatsAPI creates a new NatsAPI instance with channel-based worker pools.
@@ -99,6 +105,7 @@ func NewNatsAPI() *NatsAPI {
JobRepository: repository.GetJobRepository(), JobRepository: repository.GetJobRepository(),
jobCh: make(chan natsMessage, jobConc), jobCh: make(chan natsMessage, jobConc),
nodeCh: make(chan natsMessage, nodeConc), nodeCh: make(chan natsMessage, nodeConc),
stop: make(chan struct{}),
} }
// Start worker goroutines // Start worker goroutines
@@ -112,19 +119,38 @@ func NewNatsAPI() *NatsAPI {
return api return api
} }
// Shutdown stops the worker goroutines and tells subscription callbacks to stop
// enqueueing. It is safe to call multiple times. Callers must ensure the NATS
// client is closed first so no new callbacks are invoked.
func (api *NatsAPI) Shutdown() {
api.stopOnce.Do(func() {
close(api.stop)
})
}
// jobWorker processes job event messages from the job channel. // jobWorker processes job event messages from the job channel.
func (api *NatsAPI) jobWorker() { func (api *NatsAPI) jobWorker() {
for msg := range api.jobCh { for {
select {
case <-api.stop:
return
case msg := <-api.jobCh:
api.handleJobEvent(msg.subject, msg.data) api.handleJobEvent(msg.subject, msg.data)
} }
} }
}
// nodeWorker processes node state messages from the node channel. // nodeWorker processes node state messages from the node channel.
func (api *NatsAPI) nodeWorker() { func (api *NatsAPI) nodeWorker() {
for msg := range api.nodeCh { for {
select {
case <-api.stop:
return
case msg := <-api.nodeCh:
api.handleNodeState(msg.subject, msg.data) api.handleNodeState(msg.subject, msg.data)
} }
} }
}
// StartSubscriptions registers all NATS subscriptions for Job and Node APIs. // StartSubscriptions registers all NATS subscriptions for Job and Node APIs.
// Messages are delivered to buffered channels and processed by worker goroutines. // Messages are delivered to buffered channels and processed by worker goroutines.
@@ -140,13 +166,19 @@ func (api *NatsAPI) StartSubscriptions() error {
s := config.Keys.APISubjects s := config.Keys.APISubjects
if err := client.Subscribe(s.SubjectJobEvent, func(subject string, data []byte) { if err := client.Subscribe(s.SubjectJobEvent, func(subject string, data []byte) {
api.jobCh <- natsMessage{subject: subject, data: data} select {
case api.jobCh <- natsMessage{subject: subject, data: data}:
case <-api.stop:
}
}); err != nil { }); err != nil {
return err return err
} }
if err := client.Subscribe(s.SubjectNodeState, func(subject string, data []byte) { if err := client.Subscribe(s.SubjectNodeState, func(subject string, data []byte) {
api.nodeCh <- natsMessage{subject: subject, data: data} select {
case api.nodeCh <- natsMessage{subject: subject, data: data}:
case <-api.stop:
}
}); err != nil { }); err != nil {
return err return err
} }

View File

@@ -222,6 +222,13 @@ func TriggerArchiving(job *schema.Job) {
func Shutdown(timeout time.Duration) error { func Shutdown(timeout time.Duration) error {
cclog.Info("Initiating archiver shutdown...") cclog.Info("Initiating archiver shutdown...")
// Guard against Shutdown being called when Start was never run: closing a nil
// channel and receiving from a nil workerDone would panic/block forever.
if archiveChannel == nil {
cclog.Warn("Archiver shutdown called but archiver was never started")
return nil
}
// Close channel to signal no more jobs will be accepted // Close channel to signal no more jobs will be accepted
close(archiveChannel) close(archiveChannel)

View File

@@ -9,6 +9,7 @@ package auth
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/rand"
"database/sql" "database/sql"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
@@ -187,20 +188,37 @@ func Init(authCfg *json.RawMessage) {
sessKey := os.Getenv("SESSION_KEY") sessKey := os.Getenv("SESSION_KEY")
if sessKey == "" { if sessKey == "" {
if !config.Keys.DisableAuthentication {
cclog.Fatal("environment variable 'SESSION_KEY' not set: refusing to start with an ephemeral session key. " + cclog.Fatal("environment variable 'SESSION_KEY' not set: refusing to start with an ephemeral session key. " +
"Set SESSION_KEY in .env (base64-encoded 32 random bytes); a random key would invalidate all sessions on every restart " + "Set SESSION_KEY in .env (base64-encoded 32 random bytes); a random key would invalidate all sessions on every restart " +
"and prevent sessions from validating across replicas.") "and prevent sessions from validating across replicas.")
} }
// Authentication is disabled: no user sessions are issued, so an
// ephemeral random key is sufficient and SESSION_KEY is not required.
ephemeralKey := make([]byte, 32)
if _, err := rand.Read(ephemeralKey); err != nil {
cclog.Fatalf("Error while initializing authentication -> generating ephemeral session key failed: %v", err)
}
authInstance.sessionStore = sessions.NewCookieStore(ephemeralKey)
} else {
keyBytes, err := base64.StdEncoding.DecodeString(sessKey) keyBytes, err := base64.StdEncoding.DecodeString(sessKey)
if err != nil { if err != nil {
cclog.Fatal("Error while initializing authentication -> decoding session key failed") cclog.Fatal("Error while initializing authentication -> decoding session key failed")
} }
authInstance.sessionStore = sessions.NewCookieStore(keyBytes) authInstance.sessionStore = sessions.NewCookieStore(keyBytes)
}
if d, err := time.ParseDuration(config.Keys.SessionMaxAge); err == nil { if d, err := time.ParseDuration(config.Keys.SessionMaxAge); err == nil {
authInstance.SessionMaxAge = d authInstance.SessionMaxAge = d
} }
// When authentication is disabled no authenticators are required; the
// session store created above is enough for the server to run with a
// valid (non-nil) auth instance.
if config.Keys.DisableAuthentication {
return
}
if authCfg == nil { if authCfg == nil {
return return
} }

View File

@@ -275,10 +275,15 @@ func Shutdown() {
totalStart := time.Now() totalStart := time.Now()
shutdownFuncMu.Lock() shutdownFuncMu.Lock()
defer shutdownFuncMu.Unlock() if shutdownFunc == nil {
if shutdownFunc != nil { // Already shut down (or never initialized): nothing to do. This keeps
shutdownFunc() // Shutdown idempotent so it is safe to call from more than one path.
shutdownFuncMu.Unlock()
return
} }
shutdownFunc()
shutdownFunc = nil
shutdownFuncMu.Unlock()
cclog.Infof("[METRICSTORE]> Background workers cancelled (%v)", time.Since(totalStart)) cclog.Infof("[METRICSTORE]> Background workers cancelled (%v)", time.Since(totalStart))
if Keys.Checkpoints.FileFormat == "wal" { if Keys.Checkpoints.FileFormat == "wal" {