Fix configuration issues. Fix shutdown hangs

Always turn on compression
This commit is contained in:
2026-01-15 11:34:06 +01:00
parent d59aa2e855
commit cb219b3c74
3 changed files with 55 additions and 32 deletions

View File

@@ -271,6 +271,7 @@ func initSubsystems() error {
// Initialize job archive // Initialize job archive
archiveCfg := ccconf.GetPackageConfig("archive") archiveCfg := ccconf.GetPackageConfig("archive")
if archiveCfg == nil { if archiveCfg == nil {
cclog.Debug("Archive configuration not found, using default archive configuration")
archiveCfg = json.RawMessage(defaultArchiveConfig) archiveCfg = json.RawMessage(defaultArchiveConfig)
} }
if err := archive.Init(archiveCfg, config.Keys.DisableArchive); err != nil { if err := archive.Init(archiveCfg, config.Keys.DisableArchive); err != nil {
@@ -375,22 +376,32 @@ func runServer(ctx context.Context) error {
} }
runtime.SystemdNotify(true, "running") runtime.SystemdNotify(true, "running")
// Wait for completion or error waitDone := make(chan struct{})
go func() { go func() {
wg.Wait() wg.Wait()
close(waitDone)
}()
go func() {
<-waitDone
close(errChan) close(errChan)
}() }()
// Check for server startup errors
select { select {
case err := <-errChan: case err := <-errChan:
if err != nil { if err != nil {
return err return err
} }
case <-time.After(100 * time.Millisecond): case <-time.After(100 * time.Millisecond):
// Server started successfully, wait for completion select {
if err := <-errChan; err != nil { case err := <-errChan:
return err if err != nil {
return err
}
case <-waitDone:
case <-time.After(45 * time.Second):
cclog.Error("Shutdown timeout after 45 seconds - forcing exit")
return fmt.Errorf("shutdown timeout exceeded")
} }
} }

View File

@@ -17,6 +17,10 @@ import (
"github.com/go-co-op/gocron/v2" "github.com/go-co-op/gocron/v2"
) )
const (
DefaultCompressOlderThen = 7
)
// Retention defines the configuration for job retention policies. // Retention defines the configuration for job retention policies.
type Retention struct { type Retention struct {
Policy string `json:"policy"` Policy string `json:"policy"`
@@ -60,6 +64,38 @@ func parseDuration(s string) (time.Duration, error) {
return interval, nil return interval, nil
} }
func initArchiveServices(config json.RawMessage) {
var cfg struct {
Retention Retention `json:"retention"`
Compression int `json:"compression"`
}
cfg.Retention.IncludeDB = true
if err := json.Unmarshal(config, &cfg); err != nil {
cclog.Errorf("error while unmarshaling raw config json: %v", err)
}
switch cfg.Retention.Policy {
case "delete":
RegisterRetentionDeleteService(
cfg.Retention.Age,
cfg.Retention.IncludeDB,
cfg.Retention.OmitTagged)
case "move":
RegisterRetentionMoveService(
cfg.Retention.Age,
cfg.Retention.IncludeDB,
cfg.Retention.Location,
cfg.Retention.OmitTagged)
}
if cfg.Compression > 0 {
RegisterCompressionService(cfg.Compression)
} else {
RegisterCompressionService(DefaultCompressOlderThen)
}
}
// Start initializes the task manager, parses configurations, and registers background tasks. // Start initializes the task manager, parses configurations, and registers background tasks.
// It starts the gocron scheduler. // It starts the gocron scheduler.
func Start(cronCfg, archiveConfig json.RawMessage) { func Start(cronCfg, archiveConfig json.RawMessage) {
@@ -80,32 +116,8 @@ func Start(cronCfg, archiveConfig json.RawMessage) {
cclog.Errorf("error while decoding cron config: %v", err) cclog.Errorf("error while decoding cron config: %v", err)
} }
var cfg struct { if archiveConfig != nil {
Retention Retention `json:"retention"` initArchiveServices(archiveConfig)
Compression int `json:"compression"`
}
cfg.Retention.IncludeDB = true
if err := json.Unmarshal(archiveConfig, &cfg); err != nil {
cclog.Warn("Error while unmarshaling raw config json")
}
switch cfg.Retention.Policy {
case "delete":
RegisterRetentionDeleteService(
cfg.Retention.Age,
cfg.Retention.IncludeDB,
cfg.Retention.OmitTagged)
case "move":
RegisterRetentionMoveService(
cfg.Retention.Age,
cfg.Retention.IncludeDB,
cfg.Retention.Location,
cfg.Retention.OmitTagged)
}
if cfg.Compression > 0 {
RegisterCompressionService(cfg.Compression)
} }
lc := auth.Keys.LdapConfig lc := auth.Keys.LdapConfig

View File

@@ -77,7 +77,7 @@ type MessageHandler func(subject string, data []byte)
func Connect() { func Connect() {
clientOnce.Do(func() { clientOnce.Do(func() {
if Keys.Address == "" { if Keys.Address == "" {
cclog.Warn("NATS: no address configured, skipping connection") cclog.Info("NATS: no address configured, skipping connection")
return return
} }