From e7598100518b181b9e2e05f9f1315e2c3409eda9 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 26 Mar 2026 07:02:37 +0100 Subject: [PATCH] Add shutdown timings. Do not drain WAL buffers on shutdown Entire-Checkpoint: d4b497002f54 --- cmd/cc-backend/server.go | 15 ++++++-- pkg/metricstore/metricstore.go | 20 ++++++---- pkg/metricstore/walCheckpoint.go | 63 ++++++++++++++++++++++---------- 3 files changed, 68 insertions(+), 30 deletions(-) diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index c9ba1e9c..a2d5df25 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -407,21 +407,27 @@ func (s *Server) Start(ctx context.Context) error { } func (s *Server) Shutdown(ctx context.Context) { - shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() + shutdownStart := time.Now() + natsStart := time.Now() nc := nats.GetClient() if nc != nil { nc.Close() } + cclog.Infof("Shutdown: NATS closed (%v)", time.Since(natsStart)) + httpStart := time.Now() + shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() if err := s.server.Shutdown(shutdownCtx); err != nil { cclog.Errorf("Server shutdown error: %v", err) } + cclog.Infof("Shutdown: HTTP server stopped (%v)", time.Since(httpStart)) // Run metricstore and archiver shutdown concurrently. // They are independent: metricstore writes .bin snapshots, // archiver flushes pending job archives. + storeStart := time.Now() done := make(chan struct{}) go func() { defer close(done) @@ -444,7 +450,10 @@ func (s *Server) Shutdown(ctx context.Context) { select { case <-done: + cclog.Infof("Shutdown: metricstore + archiver completed (%v)", time.Since(storeStart)) case <-time.After(60 * time.Second): - cclog.Warn("Shutdown deadline exceeded, forcing exit") + cclog.Warnf("Shutdown deadline exceeded after %v, forcing exit", time.Since(shutdownStart)) } + + cclog.Infof("Shutdown: total time %v", time.Since(shutdownStart)) } diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 1c475269..e85116e5 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -271,19 +271,26 @@ func (ms *MemoryStore) SetNodeProvider(provider NodeProvider) { // // Note: This function blocks until the final checkpoint is written. func Shutdown() { + totalStart := time.Now() + shutdownFuncMu.Lock() defer shutdownFuncMu.Unlock() if shutdownFunc != nil { shutdownFunc() } + cclog.Infof("[METRICSTORE]> Background workers cancelled (%v)", time.Since(totalStart)) if Keys.Checkpoints.FileFormat == "wal" { for _, ch := range walShardChs { close(ch) } + drainStart := time.Now() + WaitForWALStagingDrain() + cclog.Infof("[METRICSTORE]> WAL staging goroutines exited (%v)", time.Since(drainStart)) } - cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir) + cclog.Infof("[METRICSTORE]> Writing checkpoint to '%s'...", Keys.Checkpoints.RootDir) + checkpointStart := time.Now() var files int var err error @@ -294,19 +301,16 @@ func Shutdown() { lastCheckpointMu.Unlock() if Keys.Checkpoints.FileFormat == "wal" { - var hostDirs []string - files, hostDirs, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix()) - if err == nil { - RotateWALFilesAfterShutdown(hostDirs) - } + // WAL files are deleted per-host inside ToCheckpointWAL workers. + files, _, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix()) } else { files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix()) } if err != nil { - cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s\n", err.Error()) + cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s", err.Error()) } - cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files) + cclog.Infof("[METRICSTORE]> Done! (%d files written in %v, total shutdown: %v)", files, time.Since(checkpointStart), time.Since(totalStart)) } // Retention starts a background goroutine that periodically frees old metric data. diff --git a/pkg/metricstore/walCheckpoint.go b/pkg/metricstore/walCheckpoint.go index 57064e5f..1736d754 100644 --- a/pkg/metricstore/walCheckpoint.go +++ b/pkg/metricstore/walCheckpoint.go @@ -92,6 +92,9 @@ var walShardRotateChs []chan walRotateReq // walNumShards stores the number of shards (set during WALStaging init). var walNumShards int +// walStagingWg tracks WALStaging goroutine exits for shutdown synchronization. +var walStagingWg sync.WaitGroup + // WALMessage represents a single metric write to be appended to the WAL. // Cluster and Node are NOT stored in the WAL record (inferred from file path). type WALMessage struct { @@ -171,7 +174,9 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { msgCh := walShardChs[i] rotateCh := walShardRotateChs[i] + walStagingWg.Add(1) wg.Go(func() { + defer walStagingWg.Done() hostFiles := make(map[string]*walFileState) defer func() { @@ -255,23 +260,6 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { } } - drain := func() { - for { - select { - case msg, ok := <-msgCh: - if !ok { - return - } - processMsg(msg) - case req := <-rotateCh: - processRotate(req) - default: - flushDirty() - return - } - } - } - ticker := time.NewTicker(walFlushInterval) defer ticker.Stop() @@ -298,7 +286,10 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { for { select { case <-ctx.Done(): - drain() + // On shutdown, skip draining buffered messages — a full binary + // checkpoint will be written from in-memory state, making + // buffered WAL records redundant. + flushDirty() return case msg, ok := <-msgCh: if !ok { @@ -319,6 +310,13 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { } } +// WaitForWALStagingDrain blocks until all WALStaging goroutines have exited. +// Must be called after closing walShardChs to ensure all file handles are +// flushed and closed before checkpoint writes begin. +func WaitForWALStagingDrain() { + walStagingWg.Wait() +} + // RotateWALFiles sends rotation requests for the given host directories // and blocks until all rotations complete. Each request is routed to the // shard that owns the host directory. @@ -655,7 +653,10 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string selector []string } - n, errs := int32(0), int32(0) + totalWork := len(levels) + cclog.Infof("[METRICSTORE]> Starting binary checkpoint for %d hosts with %d workers", totalWork, Keys.NumWorkers) + + n, errs, completed := int32(0), int32(0), int32(0) var successDirs []string var successMu sync.Mutex @@ -663,6 +664,22 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string wg.Add(Keys.NumWorkers) work := make(chan workItem, Keys.NumWorkers*2) + // Progress logging goroutine. + stopProgress := make(chan struct{}) + go func() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + cclog.Infof("[METRICSTORE]> Checkpoint progress: %d/%d hosts (%d written, %d errors)", + atomic.LoadInt32(&completed), totalWork, atomic.LoadInt32(&n), atomic.LoadInt32(&errs)) + case <-stopProgress: + return + } + } + }() + for range Keys.NumWorkers { go func() { defer wg.Done() @@ -670,16 +687,23 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string err := wi.level.toCheckpointBinary(wi.hostDir, from, to, m) if err != nil { if err == ErrNoNewArchiveData { + atomic.AddInt32(&completed, 1) continue } cclog.Errorf("[METRICSTORE]> binary checkpoint error for %s: %v", wi.hostDir, err) atomic.AddInt32(&errs, 1) } else { atomic.AddInt32(&n, 1) + // Delete WAL immediately after successful snapshot. + walPath := path.Join(wi.hostDir, "current.wal") + if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { + cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err) + } successMu.Lock() successDirs = append(successDirs, wi.hostDir) successMu.Unlock() } + atomic.AddInt32(&completed, 1) } }() } @@ -694,6 +718,7 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string } close(work) wg.Wait() + close(stopProgress) if errs > 0 { return int(n), successDirs, fmt.Errorf("[METRICSTORE]> %d errors during binary checkpoint (%d successes)", errs, n)