Add buffered I/O to WAL writes and fix MemoryCap comment

WAL writes now go through bufio.Writer instead of raw syscalls per record,
reducing I/O overhead. Buffers are flushed on rotate, drain, and shutdown.
Fixed misleading MemoryCap comment ("Max bytes" → "Max memory in GB").

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Entire-Checkpoint: b38dc35e5334
This commit is contained in:
2026-03-13 09:05:24 +01:00
parent a4f9ba6975
commit b214e1755a
2 changed files with 33 additions and 20 deletions

View File

@@ -113,7 +113,7 @@ type Subscriptions []struct {
// Fields: // Fields:
// - NumWorkers: Parallel workers for checkpoint/archive (0 = auto: min(NumCPU/2+1, 10)) // - NumWorkers: Parallel workers for checkpoint/archive (0 = auto: min(NumCPU/2+1, 10))
// - RetentionInMemory: Duration string (e.g., "48h") for in-memory data retention // - RetentionInMemory: Duration string (e.g., "48h") for in-memory data retention
// - MemoryCap: Max bytes for buffer data (0 = unlimited); triggers forceFree when exceeded // - MemoryCap: Max memory in GB for buffer data (0 = unlimited); triggers forceFree when exceeded
// - Checkpoints: Periodic persistence configuration // - Checkpoints: Periodic persistence configuration
// - Debug: Development/profiling options (nil = disabled) // - Debug: Development/profiling options (nil = disabled)
// - Archive: Long-term storage configuration (nil = disabled) // - Archive: Long-term storage configuration (nil = disabled)
@@ -121,13 +121,14 @@ type Subscriptions []struct {
type MetricStoreConfig struct { type MetricStoreConfig struct {
// Number of concurrent workers for checkpoint and archive operations. // Number of concurrent workers for checkpoint and archive operations.
// If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10) // If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10)
NumWorkers int `json:"num-workers"` NumWorkers int `json:"num-workers"`
RetentionInMemory string `json:"retention-in-memory"` RetentionInMemory string `json:"retention-in-memory"`
MemoryCap int `json:"memory-cap"` CheckpointInterval string `json:"checkpoint-interval,omitempty"`
Checkpoints Checkpoints `json:"checkpoints"` MemoryCap int `json:"memory-cap"`
Debug *Debug `json:"debug"` Checkpoints Checkpoints `json:"checkpoints"`
Cleanup *Cleanup `json:"cleanup"` Debug *Debug `json:"debug"`
Subscriptions *Subscriptions `json:"nats-subscriptions"` Cleanup *Cleanup `json:"cleanup"`
Subscriptions *Subscriptions `json:"nats-subscriptions"`
} }
// Keys is the global metricstore configuration instance. // Keys is the global metricstore configuration instance.

View File

@@ -106,9 +106,10 @@ type walRotateReq struct {
done chan struct{} done chan struct{}
} }
// walFileState holds an open WAL file handle for one host directory. // walFileState holds an open WAL file handle and buffered writer for one host directory.
type walFileState struct { type walFileState struct {
f *os.File f *os.File
w *bufio.Writer
} }
// WALStaging starts a background goroutine that receives WALMessage items // WALStaging starts a background goroutine that receives WALMessage items
@@ -125,15 +126,16 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
defer func() { defer func() {
for _, ws := range hostFiles { for _, ws := range hostFiles {
if ws.f != nil { if ws.f != nil {
ws.w.Flush()
ws.f.Close() ws.f.Close()
} }
} }
}() }()
getOrOpenWAL := func(hostDir string) *os.File { getOrOpenWAL := func(hostDir string) *walFileState {
ws, ok := hostFiles[hostDir] ws, ok := hostFiles[hostDir]
if ok { if ok {
return ws.f return ws
} }
if err := os.MkdirAll(hostDir, CheckpointDirPerms); err != nil { if err := os.MkdirAll(hostDir, CheckpointDirPerms); err != nil {
@@ -148,29 +150,32 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
return nil return nil
} }
w := bufio.NewWriter(f)
// Write file header magic if file is new (empty). // Write file header magic if file is new (empty).
info, err := f.Stat() info, err := f.Stat()
if err == nil && info.Size() == 0 { if err == nil && info.Size() == 0 {
var hdr [4]byte var hdr [4]byte
binary.LittleEndian.PutUint32(hdr[:], walFileMagic) binary.LittleEndian.PutUint32(hdr[:], walFileMagic)
if _, err := f.Write(hdr[:]); err != nil { if _, err := w.Write(hdr[:]); err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write header %s: %v", walPath, err) cclog.Errorf("[METRICSTORE]> WAL: write header %s: %v", walPath, err)
f.Close() f.Close()
return nil return nil
} }
} }
hostFiles[hostDir] = &walFileState{f: f} ws = &walFileState{f: f, w: w}
return f hostFiles[hostDir] = ws
return ws
} }
processMsg := func(msg *WALMessage) { processMsg := func(msg *WALMessage) {
hostDir := path.Join(Keys.Checkpoints.RootDir, msg.Cluster, msg.Node) hostDir := path.Join(Keys.Checkpoints.RootDir, msg.Cluster, msg.Node)
f := getOrOpenWAL(hostDir) ws := getOrOpenWAL(hostDir)
if f == nil { if ws == nil {
return return
} }
if err := writeWALRecord(f, msg); err != nil { if err := writeWALRecord(ws.w, msg); err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
} }
} }
@@ -178,6 +183,7 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
processRotate := func(req walRotateReq) { processRotate := func(req walRotateReq) {
ws, ok := hostFiles[req.hostDir] ws, ok := hostFiles[req.hostDir]
if ok && ws.f != nil { if ok && ws.f != nil {
ws.w.Flush()
ws.f.Close() ws.f.Close()
walPath := path.Join(req.hostDir, "current.wal") walPath := path.Join(req.hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
@@ -199,6 +205,12 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
case req := <-walRotateCh: case req := <-walRotateCh:
processRotate(req) processRotate(req)
default: default:
// Flush all buffered writers after draining remaining messages.
for _, ws := range hostFiles {
if ws.f != nil {
ws.w.Flush()
}
}
return return
} }
} }
@@ -282,9 +294,9 @@ func buildWALPayload(msg *WALMessage) []byte {
return buf return buf
} }
// writeWALRecord appends a binary WAL record to the file. // writeWALRecord appends a binary WAL record to the writer.
// Format: [4B magic][4B payload_len][payload][4B CRC32] // Format: [4B magic][4B payload_len][payload][4B CRC32]
func writeWALRecord(f *os.File, msg *WALMessage) error { func writeWALRecord(w io.Writer, msg *WALMessage) error {
payload := buildWALPayload(msg) payload := buildWALPayload(msg)
crc := crc32.ChecksumIEEE(payload) crc := crc32.ChecksumIEEE(payload)
@@ -304,7 +316,7 @@ func writeWALRecord(f *os.File, msg *WALMessage) error {
binary.LittleEndian.PutUint32(crcBytes[:], crc) binary.LittleEndian.PutUint32(crcBytes[:], crc)
record = append(record, crcBytes[:]...) record = append(record, crcBytes[:]...)
_, err := f.Write(record) _, err := w.Write(record)
return err return err
} }