diff --git a/configs/config-demo.json b/configs/config-demo.json index 509c8f18..50dff298 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -21,6 +21,7 @@ ], "metric-store": { "checkpoints": { + "file-format": "wal", "interval": "12h" }, "retention-in-memory": "48h", diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index b5b1a528..6d49624a 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -294,7 +294,7 @@ func Shutdown() { var hostDirs []string files, hostDirs, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix()) if err == nil { - RotateWALFiles(hostDirs) + RotateWALFilesAfterShutdown(hostDirs) } } else { files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix()) diff --git a/pkg/metricstore/walCheckpoint.go b/pkg/metricstore/walCheckpoint.go index 685a8388..07414d98 100644 --- a/pkg/metricstore/walCheckpoint.go +++ b/pkg/metricstore/walCheckpoint.go @@ -116,7 +116,6 @@ type walFileState struct { // Also handles WAL rotation requests from the checkpoint goroutine. func WALStaging(wg *sync.WaitGroup, ctx context.Context) { wg.Go(func() { - if Keys.Checkpoints.FileFormat == "json" { return } @@ -235,6 +234,17 @@ func RotateWALFiles(hostDirs []string) { } } +// RotateWALFiles sends rotation requests for the given host directories +// and blocks until all rotations complete. +func RotateWALFilesAfterShutdown(hostDirs []string) { + for _, dir := range hostDirs { + walPath := path.Join(dir, "current.wal") + if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { + cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err) + } + } +} + // buildWALPayload encodes a WALMessage into a binary payload (without magic/length/CRC). func buildWALPayload(msg *WALMessage) []byte { size := 8 + 2 + len(msg.MetricName) + 1 + 4