mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-27 10:07:34 +01:00
fix: checkpoint initialization gap on restarts
Entire-Checkpoint: 3f4d366b037c
This commit is contained in:
@@ -53,6 +53,16 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod
|
||||
return
|
||||
}
|
||||
|
||||
// Account for checkpoint span: files named {from}.bin contain data up to
|
||||
// from+checkpointInterval. Subtract the checkpoint interval so we don't
|
||||
// delete files whose data still falls within the retention window.
|
||||
checkpointSpan := 12 * time.Hour
|
||||
if Keys.CheckpointInterval != "" {
|
||||
if parsed, err := time.ParseDuration(Keys.CheckpointInterval); err == nil {
|
||||
checkpointSpan = parsed
|
||||
}
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(d)
|
||||
defer ticker.Stop()
|
||||
|
||||
@@ -61,7 +71,7 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
t := time.Now().Add(-d)
|
||||
t := time.Now().Add(-d).Add(-checkpointSpan)
|
||||
cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339))
|
||||
|
||||
n, err := CleanupCheckpoints(Keys.Checkpoints.RootDir, cleanupDir, t.Unix(), delete)
|
||||
|
||||
@@ -86,14 +86,16 @@ var (
|
||||
|
||||
// Checkpointing starts a background worker that periodically saves metric data to disk.
|
||||
//
|
||||
// Checkpoints are written every 12 hours (hardcoded).
|
||||
// restoreFrom is the earliest timestamp of data loaded from checkpoint files at startup.
|
||||
// The first periodic checkpoint after restart will cover [restoreFrom, now], ensuring that
|
||||
// loaded data is re-persisted before old checkpoint files are cleaned up.
|
||||
//
|
||||
// Format behaviour:
|
||||
// - "json": Periodic checkpointing every checkpointInterval
|
||||
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
|
||||
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
||||
func Checkpointing(wg *sync.WaitGroup, ctx context.Context, restoreFrom time.Time) {
|
||||
lastCheckpointMu.Lock()
|
||||
lastCheckpoint = time.Now()
|
||||
lastCheckpoint = restoreFrom
|
||||
lastCheckpointMu.Unlock()
|
||||
|
||||
ms := GetMemoryStore()
|
||||
@@ -337,25 +339,35 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
|
||||
return ErrNoNewArchiveData
|
||||
}
|
||||
|
||||
filepath := path.Join(dir, fmt.Sprintf("%d.json", from))
|
||||
f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
|
||||
finalPath := path.Join(dir, fmt.Sprintf("%d.json", from))
|
||||
tmpPath := finalPath + ".tmp"
|
||||
|
||||
f, err := os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
|
||||
if err != nil && os.IsNotExist(err) {
|
||||
err = os.MkdirAll(dir, CheckpointDirPerms)
|
||||
if err == nil {
|
||||
f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
|
||||
f, err = os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
bw := bufio.NewWriter(f)
|
||||
if err = json.NewEncoder(bw).Encode(cf); err != nil {
|
||||
f.Close()
|
||||
os.Remove(tmpPath)
|
||||
return err
|
||||
}
|
||||
|
||||
return bw.Flush()
|
||||
if err = bw.Flush(); err != nil {
|
||||
f.Close()
|
||||
os.Remove(tmpPath)
|
||||
return err
|
||||
}
|
||||
f.Close()
|
||||
|
||||
return os.Rename(tmpPath, finalPath)
|
||||
}
|
||||
|
||||
// enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs.
|
||||
@@ -470,7 +482,7 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
|
||||
data: metric.Data[0:n:n],
|
||||
prev: nil,
|
||||
next: nil,
|
||||
archived: true,
|
||||
archived: false,
|
||||
}
|
||||
|
||||
minfo, ok := m.Metrics[name]
|
||||
|
||||
@@ -170,7 +170,7 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W
|
||||
ctx, shutdown := context.WithCancel(context.Background())
|
||||
|
||||
Retention(wg, ctx)
|
||||
Checkpointing(wg, ctx)
|
||||
Checkpointing(wg, ctx, restoreFrom)
|
||||
CleanUp(wg, ctx)
|
||||
WALStaging(wg, ctx)
|
||||
MemoryUsageTracker(wg, ctx)
|
||||
|
||||
Reference in New Issue
Block a user