6 Commits

Author SHA1 Message Date
Jan Eitzinger
5398246a61 Merge pull request #540 from ClusterCockpit/hotfix
Hotfix
2026-03-27 10:00:32 +01:00
ac0a4cc39a Increase shutdown timeouts and WAL flush interval
Entire-Checkpoint: 94ee2fb97830
2026-03-27 09:56:34 +01:00
Jan Eitzinger
b43c52f5b5 Merge pull request #538 from ClusterCockpit/hotfix
Hotfix
2026-03-26 08:01:31 +01:00
Jan Eitzinger
b7e133fbaf Merge pull request #536 from ClusterCockpit/hotfix
Hotfix
2026-03-25 07:07:49 +01:00
Jan Eitzinger
c3b6d93941 Merge pull request #535 from ClusterCockpit/hotfix
Hotfix
2026-03-24 19:01:49 +01:00
Jan Eitzinger
c13fd68aa9 Merge pull request #534 from ClusterCockpit/hotfix
feat: Add command line switch to trigger manual metricstore checkpoin…
2026-03-23 19:28:06 +01:00
5 changed files with 14 additions and 36 deletions

View File

@@ -417,7 +417,7 @@ func (s *Server) Shutdown(ctx context.Context) {
cclog.Infof("Shutdown: NATS closed (%v)", time.Since(natsStart)) cclog.Infof("Shutdown: NATS closed (%v)", time.Since(natsStart))
httpStart := time.Now() httpStart := time.Now()
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second) shutdownCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel() defer cancel()
if err := s.server.Shutdown(shutdownCtx); err != nil { if err := s.server.Shutdown(shutdownCtx); err != nil {
cclog.Errorf("Server shutdown error: %v", err) cclog.Errorf("Server shutdown error: %v", err)
@@ -440,7 +440,7 @@ func (s *Server) Shutdown(ctx context.Context) {
} }
wg.Go(func() { wg.Go(func() {
if err := archiver.Shutdown(10 * time.Second); err != nil { if err := archiver.Shutdown(60 * time.Second); err != nil {
cclog.Warnf("Archiver shutdown: %v", err) cclog.Warnf("Archiver shutdown: %v", err)
} }
}) })

View File

@@ -53,16 +53,6 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod
return return
} }
// Account for checkpoint span: files named {from}.bin contain data up to
// from+checkpointInterval. Subtract the checkpoint interval so we don't
// delete files whose data still falls within the retention window.
checkpointSpan := 12 * time.Hour
if Keys.CheckpointInterval != "" {
if parsed, err := time.ParseDuration(Keys.CheckpointInterval); err == nil {
checkpointSpan = parsed
}
}
ticker := time.NewTicker(d) ticker := time.NewTicker(d)
defer ticker.Stop() defer ticker.Stop()
@@ -71,7 +61,7 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod
case <-ctx.Done(): case <-ctx.Done():
return return
case <-ticker.C: case <-ticker.C:
t := time.Now().Add(-d).Add(-checkpointSpan) t := time.Now().Add(-d)
cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339)) cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339))
n, err := CleanupCheckpoints(Keys.Checkpoints.RootDir, cleanupDir, t.Unix(), delete) n, err := CleanupCheckpoints(Keys.Checkpoints.RootDir, cleanupDir, t.Unix(), delete)

View File

@@ -86,16 +86,14 @@ var (
// Checkpointing starts a background worker that periodically saves metric data to disk. // Checkpointing starts a background worker that periodically saves metric data to disk.
// //
// restoreFrom is the earliest timestamp of data loaded from checkpoint files at startup. // Checkpoints are written every 12 hours (hardcoded).
// The first periodic checkpoint after restart will cover [restoreFrom, now], ensuring that
// loaded data is re-persisted before old checkpoint files are cleaned up.
// //
// Format behaviour: // Format behaviour:
// - "json": Periodic checkpointing every checkpointInterval // - "json": Periodic checkpointing every checkpointInterval
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval // - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
func Checkpointing(wg *sync.WaitGroup, ctx context.Context, restoreFrom time.Time) { func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
lastCheckpointMu.Lock() lastCheckpointMu.Lock()
lastCheckpoint = restoreFrom lastCheckpoint = time.Now()
lastCheckpointMu.Unlock() lastCheckpointMu.Unlock()
ms := GetMemoryStore() ms := GetMemoryStore()
@@ -339,35 +337,25 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
return ErrNoNewArchiveData return ErrNoNewArchiveData
} }
finalPath := path.Join(dir, fmt.Sprintf("%d.json", from)) filepath := path.Join(dir, fmt.Sprintf("%d.json", from))
tmpPath := finalPath + ".tmp" f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
f, err := os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
if err != nil && os.IsNotExist(err) { if err != nil && os.IsNotExist(err) {
err = os.MkdirAll(dir, CheckpointDirPerms) err = os.MkdirAll(dir, CheckpointDirPerms)
if err == nil { if err == nil {
f, err = os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
} }
} }
if err != nil { if err != nil {
return err return err
} }
defer f.Close()
bw := bufio.NewWriter(f) bw := bufio.NewWriter(f)
if err = json.NewEncoder(bw).Encode(cf); err != nil { if err = json.NewEncoder(bw).Encode(cf); err != nil {
f.Close()
os.Remove(tmpPath)
return err return err
} }
if err = bw.Flush(); err != nil { return bw.Flush()
f.Close()
os.Remove(tmpPath)
return err
}
f.Close()
return os.Rename(tmpPath, finalPath)
} }
// enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs. // enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs.
@@ -482,7 +470,7 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
data: metric.Data[0:n:n], data: metric.Data[0:n:n],
prev: nil, prev: nil,
next: nil, next: nil,
archived: false, archived: true,
} }
minfo, ok := m.Metrics[name] minfo, ok := m.Metrics[name]

View File

@@ -170,7 +170,7 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W
ctx, shutdown := context.WithCancel(context.Background()) ctx, shutdown := context.WithCancel(context.Background())
Retention(wg, ctx) Retention(wg, ctx)
Checkpointing(wg, ctx, restoreFrom) Checkpointing(wg, ctx)
CleanUp(wg, ctx) CleanUp(wg, ctx)
WALStaging(wg, ctx) WALStaging(wg, ctx)
MemoryUsageTracker(wg, ctx) MemoryUsageTracker(wg, ctx)

View File

@@ -127,7 +127,7 @@ type walFileState struct {
// walFlushInterval controls how often dirty WAL files are flushed to disk. // walFlushInterval controls how often dirty WAL files are flushed to disk.
// Decoupling flushes from message processing lets the consumer run at memory // Decoupling flushes from message processing lets the consumer run at memory
// speed, amortizing syscall overhead across many writes. // speed, amortizing syscall overhead across many writes.
const walFlushInterval = 5 * time.Second const walFlushInterval = 1 * time.Second
// walShardIndex computes which shard a message belongs to based on cluster+node. // walShardIndex computes which shard a message belongs to based on cluster+node.
// Uses FNV-1a hash for fast, well-distributed mapping. // Uses FNV-1a hash for fast, well-distributed mapping.