mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-27 18:17:29 +01:00
Compare commits
6 Commits
fix/checkp
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5398246a61 | ||
| ac0a4cc39a | |||
|
|
b43c52f5b5 | ||
|
|
b7e133fbaf | ||
|
|
c3b6d93941 | ||
|
|
c13fd68aa9 |
@@ -417,7 +417,7 @@ func (s *Server) Shutdown(ctx context.Context) {
|
|||||||
cclog.Infof("Shutdown: NATS closed (%v)", time.Since(natsStart))
|
cclog.Infof("Shutdown: NATS closed (%v)", time.Since(natsStart))
|
||||||
|
|
||||||
httpStart := time.Now()
|
httpStart := time.Now()
|
||||||
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
shutdownCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if err := s.server.Shutdown(shutdownCtx); err != nil {
|
if err := s.server.Shutdown(shutdownCtx); err != nil {
|
||||||
cclog.Errorf("Server shutdown error: %v", err)
|
cclog.Errorf("Server shutdown error: %v", err)
|
||||||
@@ -440,7 +440,7 @@ func (s *Server) Shutdown(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
wg.Go(func() {
|
wg.Go(func() {
|
||||||
if err := archiver.Shutdown(10 * time.Second); err != nil {
|
if err := archiver.Shutdown(60 * time.Second); err != nil {
|
||||||
cclog.Warnf("Archiver shutdown: %v", err)
|
cclog.Warnf("Archiver shutdown: %v", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -53,16 +53,6 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Account for checkpoint span: files named {from}.bin contain data up to
|
|
||||||
// from+checkpointInterval. Subtract the checkpoint interval so we don't
|
|
||||||
// delete files whose data still falls within the retention window.
|
|
||||||
checkpointSpan := 12 * time.Hour
|
|
||||||
if Keys.CheckpointInterval != "" {
|
|
||||||
if parsed, err := time.ParseDuration(Keys.CheckpointInterval); err == nil {
|
|
||||||
checkpointSpan = parsed
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ticker := time.NewTicker(d)
|
ticker := time.NewTicker(d)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
@@ -71,7 +61,7 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
t := time.Now().Add(-d).Add(-checkpointSpan)
|
t := time.Now().Add(-d)
|
||||||
cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339))
|
cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339))
|
||||||
|
|
||||||
n, err := CleanupCheckpoints(Keys.Checkpoints.RootDir, cleanupDir, t.Unix(), delete)
|
n, err := CleanupCheckpoints(Keys.Checkpoints.RootDir, cleanupDir, t.Unix(), delete)
|
||||||
|
|||||||
@@ -86,16 +86,14 @@ var (
|
|||||||
|
|
||||||
// Checkpointing starts a background worker that periodically saves metric data to disk.
|
// Checkpointing starts a background worker that periodically saves metric data to disk.
|
||||||
//
|
//
|
||||||
// restoreFrom is the earliest timestamp of data loaded from checkpoint files at startup.
|
// Checkpoints are written every 12 hours (hardcoded).
|
||||||
// The first periodic checkpoint after restart will cover [restoreFrom, now], ensuring that
|
|
||||||
// loaded data is re-persisted before old checkpoint files are cleaned up.
|
|
||||||
//
|
//
|
||||||
// Format behaviour:
|
// Format behaviour:
|
||||||
// - "json": Periodic checkpointing every checkpointInterval
|
// - "json": Periodic checkpointing every checkpointInterval
|
||||||
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
|
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
|
||||||
func Checkpointing(wg *sync.WaitGroup, ctx context.Context, restoreFrom time.Time) {
|
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
||||||
lastCheckpointMu.Lock()
|
lastCheckpointMu.Lock()
|
||||||
lastCheckpoint = restoreFrom
|
lastCheckpoint = time.Now()
|
||||||
lastCheckpointMu.Unlock()
|
lastCheckpointMu.Unlock()
|
||||||
|
|
||||||
ms := GetMemoryStore()
|
ms := GetMemoryStore()
|
||||||
@@ -339,35 +337,25 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
|
|||||||
return ErrNoNewArchiveData
|
return ErrNoNewArchiveData
|
||||||
}
|
}
|
||||||
|
|
||||||
finalPath := path.Join(dir, fmt.Sprintf("%d.json", from))
|
filepath := path.Join(dir, fmt.Sprintf("%d.json", from))
|
||||||
tmpPath := finalPath + ".tmp"
|
f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
|
||||||
|
|
||||||
f, err := os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
|
|
||||||
if err != nil && os.IsNotExist(err) {
|
if err != nil && os.IsNotExist(err) {
|
||||||
err = os.MkdirAll(dir, CheckpointDirPerms)
|
err = os.MkdirAll(dir, CheckpointDirPerms)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
f, err = os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
|
f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
bw := bufio.NewWriter(f)
|
bw := bufio.NewWriter(f)
|
||||||
if err = json.NewEncoder(bw).Encode(cf); err != nil {
|
if err = json.NewEncoder(bw).Encode(cf); err != nil {
|
||||||
f.Close()
|
|
||||||
os.Remove(tmpPath)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = bw.Flush(); err != nil {
|
return bw.Flush()
|
||||||
f.Close()
|
|
||||||
os.Remove(tmpPath)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
f.Close()
|
|
||||||
|
|
||||||
return os.Rename(tmpPath, finalPath)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs.
|
// enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs.
|
||||||
@@ -482,7 +470,7 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
|
|||||||
data: metric.Data[0:n:n],
|
data: metric.Data[0:n:n],
|
||||||
prev: nil,
|
prev: nil,
|
||||||
next: nil,
|
next: nil,
|
||||||
archived: false,
|
archived: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
minfo, ok := m.Metrics[name]
|
minfo, ok := m.Metrics[name]
|
||||||
|
|||||||
@@ -170,7 +170,7 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W
|
|||||||
ctx, shutdown := context.WithCancel(context.Background())
|
ctx, shutdown := context.WithCancel(context.Background())
|
||||||
|
|
||||||
Retention(wg, ctx)
|
Retention(wg, ctx)
|
||||||
Checkpointing(wg, ctx, restoreFrom)
|
Checkpointing(wg, ctx)
|
||||||
CleanUp(wg, ctx)
|
CleanUp(wg, ctx)
|
||||||
WALStaging(wg, ctx)
|
WALStaging(wg, ctx)
|
||||||
MemoryUsageTracker(wg, ctx)
|
MemoryUsageTracker(wg, ctx)
|
||||||
|
|||||||
@@ -127,7 +127,7 @@ type walFileState struct {
|
|||||||
// walFlushInterval controls how often dirty WAL files are flushed to disk.
|
// walFlushInterval controls how often dirty WAL files are flushed to disk.
|
||||||
// Decoupling flushes from message processing lets the consumer run at memory
|
// Decoupling flushes from message processing lets the consumer run at memory
|
||||||
// speed, amortizing syscall overhead across many writes.
|
// speed, amortizing syscall overhead across many writes.
|
||||||
const walFlushInterval = 5 * time.Second
|
const walFlushInterval = 1 * time.Second
|
||||||
|
|
||||||
// walShardIndex computes which shard a message belongs to based on cluster+node.
|
// walShardIndex computes which shard a message belongs to based on cluster+node.
|
||||||
// Uses FNV-1a hash for fast, well-distributed mapping.
|
// Uses FNV-1a hash for fast, well-distributed mapping.
|
||||||
|
|||||||
Reference in New Issue
Block a user