From d59aa2e855716ea172c8933f4d4632b8185a0494 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 15 Jan 2026 11:33:01 +0100 Subject: [PATCH 01/10] Restructure configuration with sensible defaults. Fix shutdown hangs --- internal/metricstore/archive.go | 2 +- internal/metricstore/avroHelper.go | 51 +++++++++- internal/metricstore/checkpoint.go | 2 +- internal/metricstore/config.go | 63 +++++++----- internal/metricstore/configSchema.go | 146 ++++++++++++--------------- internal/metricstore/lineprotocol.go | 17 ++-- internal/metricstore/metricstore.go | 40 +++++--- 7 files changed, 189 insertions(+), 132 deletions(-) diff --git a/internal/metricstore/archive.go b/internal/metricstore/archive.go index 972769fd..78efadfe 100644 --- a/internal/metricstore/archive.go +++ b/internal/metricstore/archive.go @@ -24,7 +24,7 @@ import ( func Archiving(wg *sync.WaitGroup, ctx context.Context) { go func() { defer wg.Done() - d, err := time.ParseDuration(Keys.Archive.Interval) + d, err := time.ParseDuration(Keys.Archive.ArchiveInterval) if err != nil { cclog.Fatalf("[METRICSTORE]> error parsing archive interval duration: %v\n", err) } diff --git a/internal/metricstore/avroHelper.go b/internal/metricstore/avroHelper.go index 5587a58d..985fdd78 100644 --- a/internal/metricstore/avroHelper.go +++ b/internal/metricstore/avroHelper.go @@ -30,8 +30,51 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { for { select { case <-ctx.Done(): - return - case val := <-LineProtocolMessages: + // Drain any remaining messages in channel before exiting + for { + select { + case val, ok := <-LineProtocolMessages: + if !ok { + // Channel closed + return + } + // Process remaining message + freq, err := GetMetricFrequency(val.MetricName) + if err != nil { + continue + } + + metricName := "" + for _, selectorName := range val.Selector { + metricName += selectorName + SelectorDelimiter + } + metricName += val.MetricName + + var selector []string + selector = append(selector, val.Cluster, val.Node, strconv.FormatInt(freq, 10)) + + if !stringSlicesEqual(oldSelector, selector) { + avroLevel = avroStore.root.findAvroLevelOrCreate(selector) + if avroLevel == nil { + cclog.Errorf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName) + } + oldSelector = slices.Clone(selector) + } + + if avroLevel != nil { + avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq)) + } + default: + // No more messages, exit + return + } + } + case val, ok := <-LineProtocolMessages: + if !ok { + // Channel closed, exit gracefully + return + } + // Fetch the frequency of the metric from the global configuration freq, err := GetMetricFrequency(val.MetricName) if err != nil { @@ -65,7 +108,9 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { oldSelector = slices.Clone(selector) } - avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq)) + if avroLevel != nil { + avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq)) + } } } }() diff --git a/internal/metricstore/checkpoint.go b/internal/metricstore/checkpoint.go index 27d611c4..fb72fc6e 100644 --- a/internal/metricstore/checkpoint.go +++ b/internal/metricstore/checkpoint.go @@ -408,7 +408,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { return m.FromCheckpoint(dir, from, altFormat) } - cclog.Print("[METRICSTORE]> No valid checkpoint files found in the directory") + cclog.Info("[METRICSTORE]> No valid checkpoint files found") return 0, nil } diff --git a/internal/metricstore/config.go b/internal/metricstore/config.go index c789f11c..9657453d 100644 --- a/internal/metricstore/config.go +++ b/internal/metricstore/config.go @@ -19,36 +19,49 @@ const ( DefaultAvroCheckpointInterval = time.Minute ) +type Checkpoints struct { + FileFormat string `json:"file-format"` + Interval string `json:"interval"` + RootDir string `json:"directory"` +} + +type Debug struct { + DumpToFile string `json:"dump-to-file"` + EnableGops bool `json:"gops"` +} + +type Archive struct { + ArchiveInterval string `json:"interval"` + RootDir string `json:"directory"` + DeleteInstead bool `json:"delete-instead"` +} + +type Subscriptions []struct { + // Channel name + SubscribeTo string `json:"subscribe-to"` + + // Allow lines without a cluster tag, use this as default, optional + ClusterTag string `json:"cluster-tag"` +} + type MetricStoreConfig struct { // Number of concurrent workers for checkpoint and archive operations. // If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10) - NumWorkers int `json:"num-workers"` - Checkpoints struct { - FileFormat string `json:"file-format"` - Interval string `json:"interval"` - RootDir string `json:"directory"` - Restore string `json:"restore"` - } `json:"checkpoints"` - Debug struct { - DumpToFile string `json:"dump-to-file"` - EnableGops bool `json:"gops"` - } `json:"debug"` - RetentionInMemory string `json:"retention-in-memory"` - Archive struct { - Interval string `json:"interval"` - RootDir string `json:"directory"` - DeleteInstead bool `json:"delete-instead"` - } `json:"archive"` - Subscriptions []struct { - // Channel name - SubscribeTo string `json:"subscribe-to"` - - // Allow lines without a cluster tag, use this as default, optional - ClusterTag string `json:"cluster-tag"` - } `json:"subscriptions"` + NumWorkers int `json:"num-workers"` + RetentionInMemory string `json:"retention-in-memory"` + MemoryCap int `json:"memory-cap"` + Checkpoints Checkpoints `json:"checkpoints"` + Debug *Debug `json:"debug"` + Archive *Archive `json:"archive"` + Subscriptions *Subscriptions `json:"nats-subscriptions"` } -var Keys MetricStoreConfig +var Keys MetricStoreConfig = MetricStoreConfig{ + Checkpoints: Checkpoints{ + FileFormat: "avro", + RootDir: "./var/checkpoints", + }, +} // AggregationStrategy for aggregation over multiple values at different cpus/sockets/..., not time! type AggregationStrategy int diff --git a/internal/metricstore/configSchema.go b/internal/metricstore/configSchema.go index f1a20a73..c06bc6d9 100644 --- a/internal/metricstore/configSchema.go +++ b/internal/metricstore/configSchema.go @@ -6,90 +6,72 @@ package metricstore const configSchema = `{ - "type": "object", - "description": "Configuration specific to built-in metric-store.", - "properties": { - "checkpoints": { - "description": "Configuration for checkpointing the metrics within metric-store", - "type": "object", - "properties": { - "file-format": { - "description": "Specify the type of checkpoint file. There are 2 variants: 'avro' and 'json'. If nothing is specified, 'avro' is default.", - "type": "string" - }, - "interval": { - "description": "Interval at which the metrics should be checkpointed.", - "type": "string" - }, - "directory": { - "description": "Specify the parent directy in which the checkpointed files should be placed.", - "type": "string" - }, - "restore": { - "description": "When cc-backend starts up, look for checkpointed files that are less than X hours old and load metrics from these selected checkpoint files.", - "type": "string" - } - } + "type": "object", + "description": "Configuration specific to built-in metric-store.", + "properties": { + "num-workers": { + "description": "Number of concurrent workers for checkpoint and archive operations", + "type": "integer" + }, + "checkpoints": { + "description": "Configuration for checkpointing the metrics within metric-store", + "type": "object", + "properties": { + "file-format": { + "description": "Specify the type of checkpoint file. There are 2 variants: 'avro' and 'json'. If nothing is specified, 'avro' is default.", + "type": "string" }, - "archive": { - "description": "Configuration for archiving the already checkpointed files.", - "type": "object", - "properties": { - "interval": { - "description": "Interval at which the checkpointed files should be archived.", - "type": "string" - }, - "directory": { - "description": "Specify the parent directy in which the archived files should be placed.", - "type": "string" - } - } + "interval": { + "description": "Interval at which the metrics should be checkpointed.", + "type": "string" }, - "retention-in-memory": { - "description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.", - "type": "string" - }, - "nats": { - "description": "Configuration for accepting published data through NATS.", - "type": "array", - "items": { - "type": "object", - "properties": { - "address": { - "description": "Address of the NATS server.", - "type": "string" - }, - "username": { - "description": "Optional: If configured with username/password method.", - "type": "string" - }, - "password": { - "description": "Optional: If configured with username/password method.", - "type": "string" - }, - "creds-file-path": { - "description": "Optional: If configured with Credential File method. Path to your NATS cred file.", - "type": "string" - }, - "subscriptions": { - "description": "Array of various subscriptions. Allows to subscibe to different subjects and publishers.", - "type": "array", - "items": { - "type": "object", - "properties": { - "subscribe-to": { - "description": "Channel name", - "type": "string" - }, - "cluster-tag": { - "description": "Optional: Allow lines without a cluster tag, use this as default", - "type": "string" - } - } - } - } - } - } + "directory": { + "description": "Specify the parent directy in which the checkpointed files should be placed.", + "type": "string" } + }, + "required": ["interval"] + }, + "archive": { + "description": "Configuration for archiving the already checkpointed files.", + "type": "object", + "properties": { + "interval": { + "description": "Interval at which the checkpointed files should be archived.", + "type": "string" + }, + "directory": { + "description": "Specify the directy in which the archived files should be placed.", + "type": "string" + } + }, + "required": ["interval", "directory"] + }, + "retention-in-memory": { + "description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.", + "type": "string" + }, + "memory-cap": { + "description": "Upper memory capacity limit used by metricstore in GB", + "type": "integer" + }, + "nats-subscriptions": { + "description": "Array of various subscriptions. Allows to subscibe to different subjects and publishers.", + "type": "array", + "items": { + "type": "object", + "properties": { + "subscribe-to": { + "description": "Channel name", + "type": "string" + }, + "cluster-tag": { + "description": "Optional: Allow lines without a cluster tag, use this as default", + "type": "string" + } + } + } } + }, + "required": ["checkpoints", "retention-in-memory"] }` diff --git a/internal/metricstore/lineprotocol.go b/internal/metricstore/lineprotocol.go index cc59e213..ca8ea138 100644 --- a/internal/metricstore/lineprotocol.go +++ b/internal/metricstore/lineprotocol.go @@ -29,29 +29,30 @@ func ReceiveNats(ms *MemoryStore, } var wg sync.WaitGroup - msgs := make(chan []byte, workers*2) - for _, sc := range Keys.Subscriptions { + for _, sc := range *Keys.Subscriptions { clusterTag := sc.ClusterTag if workers > 1 { wg.Add(workers) for range workers { go func() { + defer wg.Done() for m := range msgs { dec := lineprotocol.NewDecoderWithBytes(m) if err := DecodeLine(dec, ms, clusterTag); err != nil { cclog.Errorf("error: %s", err.Error()) } } - - wg.Done() }() } nc.Subscribe(sc.SubscribeTo, func(subject string, data []byte) { - msgs <- data + select { + case msgs <- data: + case <-ctx.Done(): + } }) } else { nc.Subscribe(sc.SubscribeTo, func(subject string, data []byte) { @@ -64,7 +65,11 @@ func ReceiveNats(ms *MemoryStore, cclog.Infof("NATS subscription to '%s' established", sc.SubscribeTo) } - close(msgs) + go func() { + <-ctx.Done() + close(msgs) + }() + wg.Wait() return nil diff --git a/internal/metricstore/metricstore.go b/internal/metricstore/metricstore.go index d75c9ef8..7c17cd88 100644 --- a/internal/metricstore/metricstore.go +++ b/internal/metricstore/metricstore.go @@ -25,6 +25,7 @@ import ( "encoding/json" "errors" "runtime" + "slices" "sync" "time" @@ -61,7 +62,7 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { if rawConfig != nil { config.Validate(configSchema, rawConfig) dec := json.NewDecoder(bytes.NewReader(rawConfig)) - // dec.DisallowUnknownFields() + dec.DisallowUnknownFields() if err := dec.Decode(&Keys); err != nil { cclog.Abortf("[METRICSTORE]> Metric Store Config Init: Could not decode config file '%s'.\nError: %s\n", rawConfig, err.Error()) } @@ -103,7 +104,7 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { ms := GetMemoryStore() - d, err := time.ParseDuration(Keys.Checkpoints.Restore) + d, err := time.ParseDuration(Keys.RetentionInMemory) if err != nil { cclog.Fatal(err) } @@ -128,11 +129,21 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { ctx, shutdown := context.WithCancel(context.Background()) - wg.Add(4) + retentionGoroutines := 1 + checkpointingGoroutines := 1 + dataStagingGoroutines := 1 + archivingGoroutines := 0 + if Keys.Archive != nil { + archivingGoroutines = 1 + } + totalGoroutines := retentionGoroutines + checkpointingGoroutines + dataStagingGoroutines + archivingGoroutines + wg.Add(totalGoroutines) Retention(wg, ctx) Checkpointing(wg, ctx) - Archiving(wg, ctx) + if Keys.Archive != nil { + Archiving(wg, ctx) + } DataStaging(wg, ctx) // Note: Signal handling has been removed from this function. @@ -141,9 +152,11 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { // Store the shutdown function for later use by Shutdown() shutdownFunc = shutdown - err = ReceiveNats(ms, 1, ctx) - if err != nil { - cclog.Fatal(err) + if Keys.Subscriptions != nil { + err = ReceiveNats(ms, 1, ctx) + if err != nil { + cclog.Fatal(err) + } } } @@ -184,11 +197,14 @@ func GetMemoryStore() *MemoryStore { } func Shutdown() { - // Cancel the context to signal all background goroutines to stop if shutdownFunc != nil { shutdownFunc() } + if Keys.Checkpoints.FileFormat != "json" { + close(LineProtocolMessages) + } + cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir) var files int var err error @@ -199,7 +215,6 @@ func Shutdown() { files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) } else { files, err = GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, true) - close(LineProtocolMessages) } if err != nil { @@ -314,11 +329,8 @@ func GetSelectors(ms *MemoryStore, excludeSelectors map[string][]string) [][]str // Check if the key exists in our exclusion map if excludedValues, exists := excludeSelectors[key]; exists { // The key exists, now check if the specific value is in the exclusion list - for _, ev := range excludedValues { - if ev == value { - exclude = true - break - } + if slices.Contains(excludedValues, value) { + exclude = true } } From cb219b3c748d569e32d4b657f79885eba5bd8def Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 15 Jan 2026 11:34:06 +0100 Subject: [PATCH 02/10] Fix configuration issues. Fix shutdown hangs Always turn on compression --- cmd/cc-backend/main.go | 21 +++++++--- internal/taskmanager/taskManager.go | 64 +++++++++++++++++------------ pkg/nats/client.go | 2 +- 3 files changed, 55 insertions(+), 32 deletions(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 9f98ccbf..12aa9104 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -271,6 +271,7 @@ func initSubsystems() error { // Initialize job archive archiveCfg := ccconf.GetPackageConfig("archive") if archiveCfg == nil { + cclog.Debug("Archive configuration not found, using default archive configuration") archiveCfg = json.RawMessage(defaultArchiveConfig) } if err := archive.Init(archiveCfg, config.Keys.DisableArchive); err != nil { @@ -375,22 +376,32 @@ func runServer(ctx context.Context) error { } runtime.SystemdNotify(true, "running") - // Wait for completion or error + waitDone := make(chan struct{}) go func() { wg.Wait() + close(waitDone) + }() + + go func() { + <-waitDone close(errChan) }() - // Check for server startup errors select { case err := <-errChan: if err != nil { return err } case <-time.After(100 * time.Millisecond): - // Server started successfully, wait for completion - if err := <-errChan; err != nil { - return err + select { + case err := <-errChan: + if err != nil { + return err + } + case <-waitDone: + case <-time.After(45 * time.Second): + cclog.Error("Shutdown timeout after 45 seconds - forcing exit") + return fmt.Errorf("shutdown timeout exceeded") } } diff --git a/internal/taskmanager/taskManager.go b/internal/taskmanager/taskManager.go index 06e4f28f..e1388c86 100644 --- a/internal/taskmanager/taskManager.go +++ b/internal/taskmanager/taskManager.go @@ -17,6 +17,10 @@ import ( "github.com/go-co-op/gocron/v2" ) +const ( + DefaultCompressOlderThen = 7 +) + // Retention defines the configuration for job retention policies. type Retention struct { Policy string `json:"policy"` @@ -60,6 +64,38 @@ func parseDuration(s string) (time.Duration, error) { return interval, nil } +func initArchiveServices(config json.RawMessage) { + var cfg struct { + Retention Retention `json:"retention"` + Compression int `json:"compression"` + } + cfg.Retention.IncludeDB = true + + if err := json.Unmarshal(config, &cfg); err != nil { + cclog.Errorf("error while unmarshaling raw config json: %v", err) + } + + switch cfg.Retention.Policy { + case "delete": + RegisterRetentionDeleteService( + cfg.Retention.Age, + cfg.Retention.IncludeDB, + cfg.Retention.OmitTagged) + case "move": + RegisterRetentionMoveService( + cfg.Retention.Age, + cfg.Retention.IncludeDB, + cfg.Retention.Location, + cfg.Retention.OmitTagged) + } + + if cfg.Compression > 0 { + RegisterCompressionService(cfg.Compression) + } else { + RegisterCompressionService(DefaultCompressOlderThen) + } +} + // Start initializes the task manager, parses configurations, and registers background tasks. // It starts the gocron scheduler. func Start(cronCfg, archiveConfig json.RawMessage) { @@ -80,32 +116,8 @@ func Start(cronCfg, archiveConfig json.RawMessage) { cclog.Errorf("error while decoding cron config: %v", err) } - var cfg struct { - Retention Retention `json:"retention"` - Compression int `json:"compression"` - } - cfg.Retention.IncludeDB = true - - if err := json.Unmarshal(archiveConfig, &cfg); err != nil { - cclog.Warn("Error while unmarshaling raw config json") - } - - switch cfg.Retention.Policy { - case "delete": - RegisterRetentionDeleteService( - cfg.Retention.Age, - cfg.Retention.IncludeDB, - cfg.Retention.OmitTagged) - case "move": - RegisterRetentionMoveService( - cfg.Retention.Age, - cfg.Retention.IncludeDB, - cfg.Retention.Location, - cfg.Retention.OmitTagged) - } - - if cfg.Compression > 0 { - RegisterCompressionService(cfg.Compression) + if archiveConfig != nil { + initArchiveServices(archiveConfig) } lc := auth.Keys.LdapConfig diff --git a/pkg/nats/client.go b/pkg/nats/client.go index a32ebdca..3222a525 100644 --- a/pkg/nats/client.go +++ b/pkg/nats/client.go @@ -77,7 +77,7 @@ type MessageHandler func(subject string, data []byte) func Connect() { clientOnce.Do(func() { if Keys.Address == "" { - cclog.Warn("NATS: no address configured, skipping connection") + cclog.Info("NATS: no address configured, skipping connection") return } From 9c92a7796bff7989b760867329a3dd74363b8308 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 15 Jan 2026 12:20:11 +0100 Subject: [PATCH 03/10] Introduce nodeprovider interface to break import cycle --- cmd/cc-backend/main.go | 7 +++++- internal/metricstore/metricstore.go | 38 +++++++++++++++++++++++------ 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 12aa9104..9b8cd1b0 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -320,8 +320,13 @@ func runServer(ctx context.Context) error { mscfg := ccconf.GetPackageConfig("metric-store") if mscfg != nil { metricstore.Init(mscfg, &wg) + + // Inject repository as NodeProvider to break import cycle + ms := metricstore.GetMemoryStore() + jobRepo := repository.GetJobRepository() + ms.SetNodeProvider(jobRepo) } else { - cclog.Debug("Metric store configuration not found, skipping metricstore initialization") + return fmt.Errorf("missing metricstore configuration") } // Start archiver and task manager diff --git a/internal/metricstore/metricstore.go b/internal/metricstore/metricstore.go index 7c17cd88..37028654 100644 --- a/internal/metricstore/metricstore.go +++ b/internal/metricstore/metricstore.go @@ -45,6 +45,15 @@ var ( shutdownFunc context.CancelFunc ) +// NodeProvider provides information about nodes currently in use by running jobs. +// This interface allows metricstore to query job information without directly +// depending on the repository package, breaking the import cycle. +type NodeProvider interface { + // GetUsedNodes returns a map of cluster names to sorted lists of unique hostnames + // that are currently in use by jobs that started before the given timestamp. + GetUsedNodes(ts int64) (map[string][]string, error) +} + type Metric struct { Name string Value schema.Float @@ -52,8 +61,9 @@ type Metric struct { } type MemoryStore struct { - Metrics map[string]MetricConfig - root Level + Metrics map[string]MetricConfig + root Level + nodeProvider NodeProvider // Injected dependency for querying running jobs } func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { @@ -196,6 +206,14 @@ func GetMemoryStore() *MemoryStore { return msInstance } +// SetNodeProvider sets the NodeProvider implementation for the MemoryStore. +// This must be called during initialization to provide job state information +// for selective buffer retention during Free operations. +// If not set, the Free function will fall back to freeing all buffers. +func (ms *MemoryStore) SetNodeProvider(provider NodeProvider) { + ms.nodeProvider = provider +} + func Shutdown() { if shutdownFunc != nil { shutdownFunc() @@ -263,13 +281,17 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { } func Free(ms *MemoryStore, t time.Time) (int, error) { - // jobRepo := repository.GetJobRepository() - // excludeSelectors, err := jobRepo.GetUsedNodes(t.Unix()) - // if err != nil { - // return 0, err - // } + // If no NodeProvider is configured, free all buffers older than t + if ms.nodeProvider == nil { + return ms.Free(nil, t.Unix()) + } - excludeSelectors := make(map[string][]string, 0) + excludeSelectors, err := ms.nodeProvider.GetUsedNodes(t.Unix()) + if err != nil { + return 0, err + } + + // excludeSelectors := make(map[string][]string, 0) // excludeSelectors := map[string][]string{ // "alex": {"a0122", "a0123", "a0225"}, From 155e05495e3f0f91d0fa6bef5a202cfa709c8f8d Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 15 Jan 2026 13:29:19 +0100 Subject: [PATCH 04/10] Fix shutdown timout bug --- cmd/cc-backend/main.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 9b8cd1b0..1cc4c8cd 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -392,21 +392,26 @@ func runServer(ctx context.Context) error { close(errChan) }() + // Wait for either: + // 1. An error from server startup + // 2. Completion of all goroutines (normal shutdown or crash) select { case err := <-errChan: + // errChan will be closed when waitDone is closed, which happens + // when all goroutines complete (either from normal shutdown or error) if err != nil { return err } case <-time.After(100 * time.Millisecond): + // Give the server 100ms to start and report any immediate startup errors + // After that, just wait for normal shutdown completion select { case err := <-errChan: if err != nil { return err } case <-waitDone: - case <-time.After(45 * time.Second): - cclog.Error("Shutdown timeout after 45 seconds - forcing exit") - return fmt.Errorf("shutdown timeout exceeded") + // Normal shutdown completed } } From 72248defbfa2da4dec7a54ba2fed992c4df01332 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 15 Jan 2026 13:39:22 +0100 Subject: [PATCH 05/10] Cleanup print statements. Always enable Compression --- internal/metricstore/checkpoint.go | 1 - internal/taskmanager/ldapSyncService.go | 4 ++-- internal/taskmanager/taskManager.go | 7 +++++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/internal/metricstore/checkpoint.go b/internal/metricstore/checkpoint.go index fb72fc6e..b5511221 100644 --- a/internal/metricstore/checkpoint.go +++ b/internal/metricstore/checkpoint.go @@ -408,7 +408,6 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { return m.FromCheckpoint(dir, from, altFormat) } - cclog.Info("[METRICSTORE]> No valid checkpoint files found") return 0, nil } diff --git a/internal/taskmanager/ldapSyncService.go b/internal/taskmanager/ldapSyncService.go index 9e99a261..55a99bab 100644 --- a/internal/taskmanager/ldapSyncService.go +++ b/internal/taskmanager/ldapSyncService.go @@ -23,7 +23,8 @@ func RegisterLdapSyncService(ds string) { auth := auth.GetAuthInstance() - cclog.Info("Register LDAP sync service") + cclog.Infof("register ldap sync service with %s interval", ds) + s.NewJob(gocron.DurationJob(interval), gocron.NewTask( func() { @@ -32,6 +33,5 @@ func RegisterLdapSyncService(ds string) { if err := auth.LdapAuth.Sync(); err != nil { cclog.Errorf("ldap sync failed: %s", err.Error()) } - cclog.Print("ldap sync done") })) } diff --git a/internal/taskmanager/taskManager.go b/internal/taskmanager/taskManager.go index e1388c86..cbc4120f 100644 --- a/internal/taskmanager/taskManager.go +++ b/internal/taskmanager/taskManager.go @@ -18,7 +18,7 @@ import ( ) const ( - DefaultCompressOlderThen = 7 + DefaultCompressOlderThan = 7 ) // Retention defines the configuration for job retention policies. @@ -92,7 +92,7 @@ func initArchiveServices(config json.RawMessage) { if cfg.Compression > 0 { RegisterCompressionService(cfg.Compression) } else { - RegisterCompressionService(DefaultCompressOlderThen) + RegisterCompressionService(DefaultCompressOlderThan) } } @@ -118,6 +118,9 @@ func Start(cronCfg, archiveConfig json.RawMessage) { if archiveConfig != nil { initArchiveServices(archiveConfig) + } else { + // Always enable compression + RegisterCompressionService(DefaultCompressOlderThan) } lc := auth.Keys.LdapConfig From a3fffa8e8b17a28ed66badcae5f7c3ab3d5bdcaa Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 15 Jan 2026 13:57:15 +0100 Subject: [PATCH 06/10] Update example and demo config --- configs/config-demo.json | 45 ++++++---------------------------------- configs/config.json | 42 +++++++++++++++++++++++++++++++++---- 2 files changed, 44 insertions(+), 43 deletions(-) diff --git a/configs/config-demo.json b/configs/config-demo.json index e512c9de..79670f91 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -1,55 +1,22 @@ { "main": { "addr": "127.0.0.1:8080", - "short-running-jobs-duration": 300, - "resampling": { - "minimumPoints": 600, - "trigger": 300, - "resolutions": [240, 60] - }, - "apiAllowedIPs": ["*"], - "emission-constant": 317 + "apiAllowedIPs": ["*"] }, "cron": { - "commit-job-worker": "2m", - "duration-worker": "5m", - "footprint-worker": "10m" - }, - "archive": { - "kind": "file", - "path": "./var/job-archive" + "commit-job-worker": "1m", + "duration-worker": "3m", + "footprint-worker": "5m" }, "auth": { "jwts": { "max-age": "2000h" } }, - "nats": { - "address": "nats://0.0.0.0:4222", - "username": "root", - "password": "root" - }, "metric-store": { "checkpoints": { - "file-format": "avro", - "interval": "1h", - "directory": "./var/checkpoints", - "restore": "48h" + "interval": "1h" }, - "archive": { - "interval": "1h", - "directory": "./var/archive" - }, - "retention-in-memory": "48h", - "subscriptions": [ - { - "subscribe-to": "hpc-nats", - "cluster-tag": "fritz" - }, - { - "subscribe-to": "hpc-nats", - "cluster-tag": "alex" - } - ] + "retention-in-memory": "12h" } } diff --git a/configs/config.json b/configs/config.json index 44961c85..291df813 100644 --- a/configs/config.json +++ b/configs/config.json @@ -5,7 +5,6 @@ "https-key-file": "/etc/letsencrypt/live/url/privkey.pem", "user": "clustercockpit", "group": "clustercockpit", - "validate": false, "apiAllowedIPs": ["*"], "short-running-jobs-duration": 300, "resampling": { @@ -18,13 +17,48 @@ "subjectNodeState": "cc.node.state" } }, + "nats": { + "address": "nats://0.0.0.0:4222", + "username": "root", + "password": "root" + }, + "auth": { + "jwts": { + "max-age": "2000h" + } + }, "cron": { "commit-job-worker": "1m", "duration-worker": "5m", "footprint-worker": "10m" }, "archive": { - "kind": "file", - "path": "./var/job-archive" - } + "kind": "s3", + "endpoint": "http://x.x.x.x", + "bucket": "jobarchive", + "accessKey": "xx", + "secretKey": "xx", + "retention": { + "policy": "move", + "age": 365, + "location": "./var/archive" + } + }, + "metric-store": { + "checkpoints": { + "interval": "1h" + }, + "retention-in-memory": "12h", + "subscriptions": [ + { + "subscribe-to": "hpc-nats", + "cluster-tag": "fritz" + }, + { + "subscribe-to": "hpc-nats", + "cluster-tag": "alex" + } + ] + }, + "ui-file": "ui-config.json" } From b6f0faa97fdd1073c5f143d7ea5407d699c3077f Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 15 Jan 2026 15:47:40 +0100 Subject: [PATCH 07/10] Make polarplot default in Jobview --- web/web.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/web/web.go b/web/web.go index b4c3749a..66df18b5 100644 --- a/web/web.go +++ b/web/web.go @@ -77,14 +77,14 @@ type PlotConfiguration struct { var UIDefaults = WebConfig{ JobList: JobListConfig{ UsePaging: false, - ShowFootprint: true, + ShowFootprint: false, }, NodeList: NodeListConfig{ UsePaging: true, }, JobView: JobViewConfig{ ShowPolarPlot: true, - ShowFootprint: true, + ShowFootprint: false, ShowRoofline: true, ShowStatTable: true, }, From 7db2bbe6b0bc1688baa6f6cf828a485e4e1fe4c4 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 15 Jan 2026 15:53:54 +0100 Subject: [PATCH 08/10] Add job tagger option to example config --- configs/config.json | 1 + 1 file changed, 1 insertion(+) diff --git a/configs/config.json b/configs/config.json index 291df813..f91d9661 100644 --- a/configs/config.json +++ b/configs/config.json @@ -7,6 +7,7 @@ "group": "clustercockpit", "apiAllowedIPs": ["*"], "short-running-jobs-duration": 300, + "enable-job-taggers": true, "resampling": { "minimumPoints": 600, "trigger": 180, From 489ad44b9f53c92df9f0f7b0da935489041acfc1 Mon Sep 17 00:00:00 2001 From: Michael Panzlaff Date: Thu, 15 Jan 2026 15:58:14 +0100 Subject: [PATCH 09/10] Make apiAllowedIPs optional If our test and production instance just use *, one might as well make that the default value. This should ease configuration for minimal setups. --- internal/auth/auth.go | 4 ++-- internal/config/schema.go | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/internal/auth/auth.go b/internal/auth/auth.go index 41691d00..cd89369c 100644 --- a/internal/auth/auth.go +++ b/internal/auth/auth.go @@ -616,9 +616,9 @@ func securedCheck(user *schema.User, r *http.Request) error { } // If SplitHostPort fails, IPAddress is already just a host (no port) - // If nothing declared in config: deny all request to this api endpoint + // If nothing declared in config: Continue if len(config.Keys.APIAllowedIPs) == 0 { - return fmt.Errorf("missing configuration key ApiAllowedIPs") + return nil } // If wildcard declared in config: Continue if config.Keys.APIAllowedIPs[0] == "*" { diff --git a/internal/config/schema.go b/internal/config/schema.go index 2d068140..39c7a63c 100644 --- a/internal/config/schema.go +++ b/internal/config/schema.go @@ -6,7 +6,7 @@ package config var configSchema = ` - { +{ "type": "object", "properties": { "addr": { @@ -135,6 +135,5 @@ var configSchema = ` }, "required": ["subjectJobEvent", "subjectNodeState"] } - }, - "required": ["apiAllowedIPs"] - }` + } +}` From 7cd98c4f255f5a5d0934a7da33de9955c0f4aae3 Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Thu, 15 Jan 2026 17:48:59 +0100 Subject: [PATCH 10/10] Test and update files for dynamic retention --- configs/config-demo.json | 4 ++-- configs/config.json | 6 +++--- internal/metricstore/metricstore.go | 20 +++----------------- 3 files changed, 8 insertions(+), 22 deletions(-) diff --git a/configs/config-demo.json b/configs/config-demo.json index 79670f91..e53fa8bd 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -15,8 +15,8 @@ }, "metric-store": { "checkpoints": { - "interval": "1h" + "interval": "12h" }, - "retention-in-memory": "12h" + "retention-in-memory": "48h" } } diff --git a/configs/config.json b/configs/config.json index f91d9661..90a6ba79 100644 --- a/configs/config.json +++ b/configs/config.json @@ -47,10 +47,10 @@ }, "metric-store": { "checkpoints": { - "interval": "1h" + "interval": "12h" }, - "retention-in-memory": "12h", - "subscriptions": [ + "retention-in-memory": "48h", + "nats-subscriptions": [ { "subscribe-to": "hpc-nats", "cluster-tag": "fritz" diff --git a/internal/metricstore/metricstore.go b/internal/metricstore/metricstore.go index 37028654..0d7e5f45 100644 --- a/internal/metricstore/metricstore.go +++ b/internal/metricstore/metricstore.go @@ -142,18 +142,14 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { retentionGoroutines := 1 checkpointingGoroutines := 1 dataStagingGoroutines := 1 - archivingGoroutines := 0 - if Keys.Archive != nil { - archivingGoroutines = 1 - } + archivingGoroutines := 1 + totalGoroutines := retentionGoroutines + checkpointingGoroutines + dataStagingGoroutines + archivingGoroutines wg.Add(totalGoroutines) Retention(wg, ctx) Checkpointing(wg, ctx) - if Keys.Archive != nil { - Archiving(wg, ctx) - } + Archiving(wg, ctx) DataStaging(wg, ctx) // Note: Signal handling has been removed from this function. @@ -291,13 +287,6 @@ func Free(ms *MemoryStore, t time.Time) (int, error) { return 0, err } - // excludeSelectors := make(map[string][]string, 0) - - // excludeSelectors := map[string][]string{ - // "alex": {"a0122", "a0123", "a0225"}, - // "fritz": {"f0201", "f0202"}, - // } - switch lenMap := len(excludeSelectors); lenMap { // If the length of the map returned by GetUsedNodes() is 0, @@ -361,9 +350,6 @@ func GetSelectors(ms *MemoryStore, excludeSelectors map[string][]string) [][]str } } - // fmt.Printf("All selectors: %#v\n\n", allSelectors) - // fmt.Printf("filteredSelectors: %#v\n\n", filteredSelectors) - return filteredSelectors }