mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-26 09:37:30 +01:00
Add shutdown timings. Do not drain WAL buffers on shutdown
Entire-Checkpoint: d4b497002f54
This commit is contained in:
@@ -407,21 +407,27 @@ func (s *Server) Start(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) Shutdown(ctx context.Context) {
|
func (s *Server) Shutdown(ctx context.Context) {
|
||||||
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
shutdownStart := time.Now()
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
|
natsStart := time.Now()
|
||||||
nc := nats.GetClient()
|
nc := nats.GetClient()
|
||||||
if nc != nil {
|
if nc != nil {
|
||||||
nc.Close()
|
nc.Close()
|
||||||
}
|
}
|
||||||
|
cclog.Infof("Shutdown: NATS closed (%v)", time.Since(natsStart))
|
||||||
|
|
||||||
|
httpStart := time.Now()
|
||||||
|
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
if err := s.server.Shutdown(shutdownCtx); err != nil {
|
if err := s.server.Shutdown(shutdownCtx); err != nil {
|
||||||
cclog.Errorf("Server shutdown error: %v", err)
|
cclog.Errorf("Server shutdown error: %v", err)
|
||||||
}
|
}
|
||||||
|
cclog.Infof("Shutdown: HTTP server stopped (%v)", time.Since(httpStart))
|
||||||
|
|
||||||
// Run metricstore and archiver shutdown concurrently.
|
// Run metricstore and archiver shutdown concurrently.
|
||||||
// They are independent: metricstore writes .bin snapshots,
|
// They are independent: metricstore writes .bin snapshots,
|
||||||
// archiver flushes pending job archives.
|
// archiver flushes pending job archives.
|
||||||
|
storeStart := time.Now()
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
defer close(done)
|
defer close(done)
|
||||||
@@ -444,7 +450,10 @@ func (s *Server) Shutdown(ctx context.Context) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-done:
|
case <-done:
|
||||||
|
cclog.Infof("Shutdown: metricstore + archiver completed (%v)", time.Since(storeStart))
|
||||||
case <-time.After(60 * time.Second):
|
case <-time.After(60 * time.Second):
|
||||||
cclog.Warn("Shutdown deadline exceeded, forcing exit")
|
cclog.Warnf("Shutdown deadline exceeded after %v, forcing exit", time.Since(shutdownStart))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cclog.Infof("Shutdown: total time %v", time.Since(shutdownStart))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -271,19 +271,26 @@ func (ms *MemoryStore) SetNodeProvider(provider NodeProvider) {
|
|||||||
//
|
//
|
||||||
// Note: This function blocks until the final checkpoint is written.
|
// Note: This function blocks until the final checkpoint is written.
|
||||||
func Shutdown() {
|
func Shutdown() {
|
||||||
|
totalStart := time.Now()
|
||||||
|
|
||||||
shutdownFuncMu.Lock()
|
shutdownFuncMu.Lock()
|
||||||
defer shutdownFuncMu.Unlock()
|
defer shutdownFuncMu.Unlock()
|
||||||
if shutdownFunc != nil {
|
if shutdownFunc != nil {
|
||||||
shutdownFunc()
|
shutdownFunc()
|
||||||
}
|
}
|
||||||
|
cclog.Infof("[METRICSTORE]> Background workers cancelled (%v)", time.Since(totalStart))
|
||||||
|
|
||||||
if Keys.Checkpoints.FileFormat == "wal" {
|
if Keys.Checkpoints.FileFormat == "wal" {
|
||||||
for _, ch := range walShardChs {
|
for _, ch := range walShardChs {
|
||||||
close(ch)
|
close(ch)
|
||||||
}
|
}
|
||||||
|
drainStart := time.Now()
|
||||||
|
WaitForWALStagingDrain()
|
||||||
|
cclog.Infof("[METRICSTORE]> WAL staging goroutines exited (%v)", time.Since(drainStart))
|
||||||
}
|
}
|
||||||
|
|
||||||
cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir)
|
cclog.Infof("[METRICSTORE]> Writing checkpoint to '%s'...", Keys.Checkpoints.RootDir)
|
||||||
|
checkpointStart := time.Now()
|
||||||
var files int
|
var files int
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
@@ -294,19 +301,16 @@ func Shutdown() {
|
|||||||
lastCheckpointMu.Unlock()
|
lastCheckpointMu.Unlock()
|
||||||
|
|
||||||
if Keys.Checkpoints.FileFormat == "wal" {
|
if Keys.Checkpoints.FileFormat == "wal" {
|
||||||
var hostDirs []string
|
// WAL files are deleted per-host inside ToCheckpointWAL workers.
|
||||||
files, hostDirs, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
|
files, _, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
|
||||||
if err == nil {
|
|
||||||
RotateWALFilesAfterShutdown(hostDirs)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
|
files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s\n", err.Error())
|
cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s", err.Error())
|
||||||
}
|
}
|
||||||
cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files)
|
cclog.Infof("[METRICSTORE]> Done! (%d files written in %v, total shutdown: %v)", files, time.Since(checkpointStart), time.Since(totalStart))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retention starts a background goroutine that periodically frees old metric data.
|
// Retention starts a background goroutine that periodically frees old metric data.
|
||||||
|
|||||||
@@ -92,6 +92,9 @@ var walShardRotateChs []chan walRotateReq
|
|||||||
// walNumShards stores the number of shards (set during WALStaging init).
|
// walNumShards stores the number of shards (set during WALStaging init).
|
||||||
var walNumShards int
|
var walNumShards int
|
||||||
|
|
||||||
|
// walStagingWg tracks WALStaging goroutine exits for shutdown synchronization.
|
||||||
|
var walStagingWg sync.WaitGroup
|
||||||
|
|
||||||
// WALMessage represents a single metric write to be appended to the WAL.
|
// WALMessage represents a single metric write to be appended to the WAL.
|
||||||
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
|
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
|
||||||
type WALMessage struct {
|
type WALMessage struct {
|
||||||
@@ -171,7 +174,9 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
msgCh := walShardChs[i]
|
msgCh := walShardChs[i]
|
||||||
rotateCh := walShardRotateChs[i]
|
rotateCh := walShardRotateChs[i]
|
||||||
|
|
||||||
|
walStagingWg.Add(1)
|
||||||
wg.Go(func() {
|
wg.Go(func() {
|
||||||
|
defer walStagingWg.Done()
|
||||||
hostFiles := make(map[string]*walFileState)
|
hostFiles := make(map[string]*walFileState)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -255,23 +260,6 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
drain := func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case msg, ok := <-msgCh:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
processMsg(msg)
|
|
||||||
case req := <-rotateCh:
|
|
||||||
processRotate(req)
|
|
||||||
default:
|
|
||||||
flushDirty()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ticker := time.NewTicker(walFlushInterval)
|
ticker := time.NewTicker(walFlushInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
@@ -298,7 +286,10 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
drain()
|
// On shutdown, skip draining buffered messages — a full binary
|
||||||
|
// checkpoint will be written from in-memory state, making
|
||||||
|
// buffered WAL records redundant.
|
||||||
|
flushDirty()
|
||||||
return
|
return
|
||||||
case msg, ok := <-msgCh:
|
case msg, ok := <-msgCh:
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -319,6 +310,13 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WaitForWALStagingDrain blocks until all WALStaging goroutines have exited.
|
||||||
|
// Must be called after closing walShardChs to ensure all file handles are
|
||||||
|
// flushed and closed before checkpoint writes begin.
|
||||||
|
func WaitForWALStagingDrain() {
|
||||||
|
walStagingWg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
// RotateWALFiles sends rotation requests for the given host directories
|
// RotateWALFiles sends rotation requests for the given host directories
|
||||||
// and blocks until all rotations complete. Each request is routed to the
|
// and blocks until all rotations complete. Each request is routed to the
|
||||||
// shard that owns the host directory.
|
// shard that owns the host directory.
|
||||||
@@ -655,7 +653,10 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
|
|||||||
selector []string
|
selector []string
|
||||||
}
|
}
|
||||||
|
|
||||||
n, errs := int32(0), int32(0)
|
totalWork := len(levels)
|
||||||
|
cclog.Infof("[METRICSTORE]> Starting binary checkpoint for %d hosts with %d workers", totalWork, Keys.NumWorkers)
|
||||||
|
|
||||||
|
n, errs, completed := int32(0), int32(0), int32(0)
|
||||||
var successDirs []string
|
var successDirs []string
|
||||||
var successMu sync.Mutex
|
var successMu sync.Mutex
|
||||||
|
|
||||||
@@ -663,6 +664,22 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
|
|||||||
wg.Add(Keys.NumWorkers)
|
wg.Add(Keys.NumWorkers)
|
||||||
work := make(chan workItem, Keys.NumWorkers*2)
|
work := make(chan workItem, Keys.NumWorkers*2)
|
||||||
|
|
||||||
|
// Progress logging goroutine.
|
||||||
|
stopProgress := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(10 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
cclog.Infof("[METRICSTORE]> Checkpoint progress: %d/%d hosts (%d written, %d errors)",
|
||||||
|
atomic.LoadInt32(&completed), totalWork, atomic.LoadInt32(&n), atomic.LoadInt32(&errs))
|
||||||
|
case <-stopProgress:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for range Keys.NumWorkers {
|
for range Keys.NumWorkers {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
@@ -670,16 +687,23 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
|
|||||||
err := wi.level.toCheckpointBinary(wi.hostDir, from, to, m)
|
err := wi.level.toCheckpointBinary(wi.hostDir, from, to, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == ErrNoNewArchiveData {
|
if err == ErrNoNewArchiveData {
|
||||||
|
atomic.AddInt32(&completed, 1)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
cclog.Errorf("[METRICSTORE]> binary checkpoint error for %s: %v", wi.hostDir, err)
|
cclog.Errorf("[METRICSTORE]> binary checkpoint error for %s: %v", wi.hostDir, err)
|
||||||
atomic.AddInt32(&errs, 1)
|
atomic.AddInt32(&errs, 1)
|
||||||
} else {
|
} else {
|
||||||
atomic.AddInt32(&n, 1)
|
atomic.AddInt32(&n, 1)
|
||||||
|
// Delete WAL immediately after successful snapshot.
|
||||||
|
walPath := path.Join(wi.hostDir, "current.wal")
|
||||||
|
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
|
||||||
|
cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err)
|
||||||
|
}
|
||||||
successMu.Lock()
|
successMu.Lock()
|
||||||
successDirs = append(successDirs, wi.hostDir)
|
successDirs = append(successDirs, wi.hostDir)
|
||||||
successMu.Unlock()
|
successMu.Unlock()
|
||||||
}
|
}
|
||||||
|
atomic.AddInt32(&completed, 1)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@@ -694,6 +718,7 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
|
|||||||
}
|
}
|
||||||
close(work)
|
close(work)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
close(stopProgress)
|
||||||
|
|
||||||
if errs > 0 {
|
if errs > 0 {
|
||||||
return int(n), successDirs, fmt.Errorf("[METRICSTORE]> %d errors during binary checkpoint (%d successes)", errs, n)
|
return int(n), successDirs, fmt.Errorf("[METRICSTORE]> %d errors during binary checkpoint (%d successes)", errs, n)
|
||||||
|
|||||||
Reference in New Issue
Block a user