Update to shutdown worker for WAL checkpointing mode

This commit is contained in:
Aditya Ujeniya
2026-03-02 15:27:06 +01:00
parent 1ec41d8389
commit a243e17499
3 changed files with 13 additions and 2 deletions

View File

@@ -21,6 +21,7 @@
], ],
"metric-store": { "metric-store": {
"checkpoints": { "checkpoints": {
"file-format": "wal",
"interval": "12h" "interval": "12h"
}, },
"retention-in-memory": "48h", "retention-in-memory": "48h",

View File

@@ -294,7 +294,7 @@ func Shutdown() {
var hostDirs []string var hostDirs []string
files, hostDirs, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix()) files, hostDirs, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
if err == nil { if err == nil {
RotateWALFiles(hostDirs) RotateWALFilesAfterShutdown(hostDirs)
} }
} else { } else {
files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix()) files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())

View File

@@ -116,7 +116,6 @@ type walFileState struct {
// Also handles WAL rotation requests from the checkpoint goroutine. // Also handles WAL rotation requests from the checkpoint goroutine.
func WALStaging(wg *sync.WaitGroup, ctx context.Context) { func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
wg.Go(func() { wg.Go(func() {
if Keys.Checkpoints.FileFormat == "json" { if Keys.Checkpoints.FileFormat == "json" {
return return
} }
@@ -235,6 +234,17 @@ func RotateWALFiles(hostDirs []string) {
} }
} }
// RotateWALFiles sends rotation requests for the given host directories
// and blocks until all rotations complete.
func RotateWALFilesAfterShutdown(hostDirs []string) {
for _, dir := range hostDirs {
walPath := path.Join(dir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err)
}
}
}
// buildWALPayload encodes a WALMessage into a binary payload (without magic/length/CRC). // buildWALPayload encodes a WALMessage into a binary payload (without magic/length/CRC).
func buildWALPayload(msg *WALMessage) []byte { func buildWALPayload(msg *WALMessage) []byte {
size := 8 + 2 + len(msg.MetricName) + 1 + 4 size := 8 + 2 + len(msg.MetricName) + 1 + 4