diff --git a/Makefile b/Makefile index 8f805c87..61e74c71 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ TARGET = ./cc-backend FRONTEND = ./web/frontend -VERSION = 1.5.1 +VERSION = 1.5.2 GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development') CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S") LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}' diff --git a/ReleaseNotes.md b/ReleaseNotes.md index 84e3a417..1ac2f229 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -1,16 +1,62 @@ -# `cc-backend` version 1.5.1 +# `cc-backend` version 1.5.2 Supports job archive version 3 and database version 11. This is a bugfix release of `cc-backend`, the API backend and frontend implementation of ClusterCockpit. For release specific notes visit the [ClusterCockpit Documentation](https://clusterockpit.org/docs/release/). +If you are upgrading from v1.5.1 no database migration is required. If you are upgrading from v1.5.0 you need to do another DB migration. This should not take long. For optimal database performance after the migration it is recommended to apply the new `optimize-db` flag, which runs the sqlite `ANALYZE` and `VACUUM` commands. Depending on your database size (more then 40GB) the `VACUUM` may take up to 2h. +## Changes in 1.5.2 + +### Bug fixes + +- **Memory spike in parquet writer**: Fixed memory spikes when using the + metricstore move (archive) policy with the parquet writer. The writer now + processes data in a streaming fashion to avoid accumulating large allocations. + +### Database performance + +- **Reduced insert pressure**: Bulk insert operations (node state updates, user + and job cache syncs) now use explicit transactions and deferred inserts, + significantly reducing write contention on the SQLite database. +- **SyncJobs wrapped in transaction**: `SyncJobs` now runs inside a transaction + for better consistency and reduced lock contention. +- **Configurable busy timeout**: New `busy-timeout` configuration option for the + SQLite connection. This allows tuning how long the driver waits for a locked + database before returning an error, which improves resilience under concurrent + write load. +- **Increased default SQLite timeout**: The default SQLite connection timeout + has been raised to reduce spurious timeout errors under load. + +### NATS API + +- **Nodestate health checks in NATS API**: The NATS node state handler now + performs the same metric health checks as the REST API handler, including + per-subcluster health checks and `MonitoringStateFailed` fallback for nodes + without health data. + +### Logging improvements + +- **Better error context**: Several repository functions now include the calling + function name in error messages for easier diagnosis. +- **Reduced log noise**: `ErrNoRows` (no results found) is no longer logged as + an error in `scanRow`; common "no rows" paths are now silent. +- **Debug-level missing metrics**: Warning about missing metrics in the metric + store has been downgraded to debug level to reduce log noise in normal + operation. +- **Checkpoint archiving log**: Added an informational log message when the + metricstore checkpoint archiving process runs. + +### Dependencies + +- **cc-lib upgraded**: Updated to latest cc-lib version. + ## Known issues - The new dynamic memory management is not bullet proof yet across restarts. diff --git a/go.mod b/go.mod index 802bfb19..1e3b7bf1 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ tool ( require ( github.com/99designs/gqlgen v0.17.88 - github.com/ClusterCockpit/cc-lib/v2 v2.9.0 + github.com/ClusterCockpit/cc-lib/v2 v2.9.1 github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 github.com/Masterminds/squirrel v1.5.4 github.com/aws/aws-sdk-go-v2 v1.41.3 diff --git a/go.sum b/go.sum index ec440467..4006036e 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/ClusterCockpit/cc-lib/v2 v2.8.2 h1:rCLZk8wz8yq8xBnBEdVKigvA2ngR8dPmHb github.com/ClusterCockpit/cc-lib/v2 v2.8.2/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o= github.com/ClusterCockpit/cc-lib/v2 v2.9.0 h1:mzUYakcjwb+UP5II4jOvr36rSYct90gXBbtUg+nvm9c= github.com/ClusterCockpit/cc-lib/v2 v2.9.0/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o= +github.com/ClusterCockpit/cc-lib/v2 v2.9.1 h1:eplKhXQyGAElBGCEGdmxwj7fLv26Op16uK0KxUePDak= +github.com/ClusterCockpit/cc-lib/v2 v2.9.1/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o= github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 h1:hIzxgTBWcmCIHtoDKDkSCsKCOCOwUC34sFsbD2wcW0Q= github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0/go.mod h1:y42qUu+YFmu5fdNuUAS4VbbIKxVjxCvbVqFdpdh8ahY= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= diff --git a/internal/auth/auth.go b/internal/auth/auth.go index 69f4f078..327e48a3 100644 --- a/internal/auth/auth.go +++ b/internal/auth/auth.go @@ -421,7 +421,7 @@ func (auth *Authentication) Auth( return } - cclog.Info("auth -> authentication failed") + cclog.Infof("auth -> authentication failed: no valid session or JWT for %s %s from %s", r.Method, r.URL.Path, r.RemoteAddr) onfailure(rw, r, errors.New("unauthorized (please login first)")) }) } diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index 916736d0..deebc869 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -168,7 +168,12 @@ func deleteCheckpoints(checkpointsDir string, from int64) (int, error) { // archiveCheckpoints archives checkpoint files to Parquet format. // Produces one Parquet file per cluster: //.parquet +// Each host's rows are written as a separate row group to avoid accumulating +// all data in memory at once. func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, error) { + cclog.Info("[METRICSTORE]> start archiving checkpoints to parquet") + startTime := time.Now() + clusterEntries, err := os.ReadDir(checkpointsDir) if err != nil { return 0, err @@ -187,7 +192,7 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err return totalFiles, err } - // Collect rows from all hosts in this cluster using worker pool + // Stream per-host rows to parquet writer via worker pool type hostResult struct { rows []ParquetMetricRow files []string // checkpoint filenames to delete after successful write @@ -235,32 +240,57 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err close(results) }() - // Collect all rows and file info - var allRows []ParquetMetricRow - var allResults []hostResult + // Open streaming writer and write each host's rows as a row group + parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from)) + writer, err := newParquetArchiveWriter(parquetFile) + if err != nil { + // Drain results channel to unblock workers + for range results { + } + return totalFiles, fmt.Errorf("creating parquet writer for cluster %s: %w", cluster, err) + } + + type deleteItem struct { + dir string + files []string + } + var toDelete []deleteItem + writeErr := error(nil) + for r := range results { - allRows = append(allRows, r.rows...) - allResults = append(allResults, r) + if writeErr == nil { + sortParquetRows(r.rows) + if err := writer.WriteHostRows(r.rows); err != nil { + writeErr = err + } + } + // Always track files for deletion (even if write failed, we still drain) + toDelete = append(toDelete, deleteItem{dir: r.dir, files: r.files}) + } + + if err := writer.Close(); err != nil && writeErr == nil { + writeErr = err } if errs > 0 { return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster) } - if len(allRows) == 0 { + if writer.count == 0 { + // No data written — remove empty file + os.Remove(parquetFile) continue } - // Write one Parquet file per cluster - parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from)) - if err := writeParquetArchive(parquetFile, allRows); err != nil { - return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, err) + if writeErr != nil { + os.Remove(parquetFile) + return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, writeErr) } // Delete archived checkpoint files - for _, result := range allResults { - for _, file := range result.files { - filename := filepath.Join(result.dir, file) + for _, item := range toDelete { + for _, file := range item.files { + filename := filepath.Join(item.dir, file) if err := os.Remove(filename); err != nil { cclog.Warnf("[METRICSTORE]> could not remove archived checkpoint %s: %v", filename, err) } else { @@ -270,8 +300,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err } cclog.Infof("[METRICSTORE]> archived %d rows from %d files for cluster %s to %s", - len(allRows), totalFiles, cluster, parquetFile) + writer.count, totalFiles, cluster, parquetFile) } + cclog.Infof("[METRICSTORE]> archiving checkpoints completed in %s (%d files)", time.Since(startTime).Round(time.Millisecond), totalFiles) return totalFiles, nil } diff --git a/pkg/metricstore/lineprotocol.go b/pkg/metricstore/lineprotocol.go index 21f036ed..caec82e9 100644 --- a/pkg/metricstore/lineprotocol.go +++ b/pkg/metricstore/lineprotocol.go @@ -23,6 +23,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" @@ -53,7 +54,7 @@ func ReceiveNats(ms *MemoryStore, } var wg sync.WaitGroup - msgs := make(chan []byte, workers*2) + msgs := make(chan []byte, max(workers*256, 8192)) for _, sc := range *Keys.Subscriptions { clusterTag := sc.ClusterTag @@ -123,6 +124,10 @@ func reorder(buf, prefix []byte) []byte { } } +// walDropped counts WAL messages dropped due to a full channel. +// Logged periodically; not critical since binary snapshots capture the data. +var walDropped atomic.Int64 + // decodeState holds the per-call scratch buffers used by DecodeLine. // Instances are recycled via decodeStatePool to avoid repeated allocations // during high-throughput ingestion. @@ -348,7 +353,7 @@ func DecodeLine(dec *lineprotocol.Decoder, time := t.Unix() if Keys.Checkpoints.FileFormat == "wal" { - WALMessages <- &WALMessage{ + msg := &WALMessage{ MetricName: string(st.metricBuf), Cluster: cluster, Node: host, @@ -356,6 +361,13 @@ func DecodeLine(dec *lineprotocol.Decoder, Value: metric.Value, Timestamp: time, } + if !SendWALMessage(msg) { + // WAL shard channel full — metric is written to memory store but not WAL. + // Next binary snapshot will capture it. + if dropped := walDropped.Add(1); dropped%10000 == 1 { + cclog.Warnf("[METRICSTORE]> WAL shard channel full, dropped %d messages (data safe in memory, next snapshot will capture)", dropped) + } + } } if err := ms.WriteToLevel(lvl, st.selector, time, []Metric{metric}); err != nil { diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 84dad72b..1c475269 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -184,10 +184,11 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W shutdownFuncMu.Unlock() if Keys.Subscriptions != nil { - err = ReceiveNats(ms, 1, ctx) - if err != nil { - cclog.Fatal(err) - } + wg.Go(func() { + if err := ReceiveNats(ms, Keys.NumWorkers, ctx); err != nil { + cclog.Fatal(err) + } + }) } } @@ -277,7 +278,9 @@ func Shutdown() { } if Keys.Checkpoints.FileFormat == "wal" { - close(WALMessages) + for _, ch := range walShardChs { + close(ch) + } } cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir) diff --git a/pkg/metricstore/parquetArchive.go b/pkg/metricstore/parquetArchive.go index 420ee4e5..18ee2c64 100644 --- a/pkg/metricstore/parquetArchive.go +++ b/pkg/metricstore/parquetArchive.go @@ -12,6 +12,7 @@ import ( "fmt" "os" "path/filepath" + "sort" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" pq "github.com/parquet-go/parquet-go" @@ -91,43 +92,81 @@ func parseScopeFromName(name string) (string, string) { return name, "" } -// writeParquetArchive writes rows to a Parquet file with Zstd compression. -func writeParquetArchive(filename string, rows []ParquetMetricRow) error { +// parquetArchiveWriter supports incremental writes to a Parquet file. +// Each call to WriteHostRows writes one row group (typically one host's data), +// avoiding accumulation of all rows in memory. +type parquetArchiveWriter struct { + writer *pq.GenericWriter[ParquetMetricRow] + bw *bufio.Writer + f *os.File + count int +} + +// newParquetArchiveWriter creates a streaming Parquet writer with Zstd compression. +func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) { if err := os.MkdirAll(filepath.Dir(filename), CheckpointDirPerms); err != nil { - return fmt.Errorf("creating archive directory: %w", err) + return nil, fmt.Errorf("creating archive directory: %w", err) } f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) if err != nil { - return fmt.Errorf("creating parquet file: %w", err) + return nil, fmt.Errorf("creating parquet file: %w", err) } - defer f.Close() bw := bufio.NewWriterSize(f, 1<<20) // 1MB write buffer writer := pq.NewGenericWriter[ParquetMetricRow](bw, pq.Compression(&pq.Zstd), - pq.SortingWriterConfig(pq.SortingColumns( - pq.Ascending("cluster"), - pq.Ascending("hostname"), - pq.Ascending("metric"), - pq.Ascending("timestamp"), - )), ) - if _, err := writer.Write(rows); err != nil { + return &parquetArchiveWriter{writer: writer, bw: bw, f: f}, nil +} + +// WriteHostRows sorts rows by (metric, timestamp) in-place, writes them, +// and flushes to create a separate row group. +func (w *parquetArchiveWriter) WriteHostRows(rows []ParquetMetricRow) error { + sort.Slice(rows, func(i, j int) bool { + if rows[i].Metric != rows[j].Metric { + return rows[i].Metric < rows[j].Metric + } + return rows[i].Timestamp < rows[j].Timestamp + }) + + if _, err := w.writer.Write(rows); err != nil { return fmt.Errorf("writing parquet rows: %w", err) } - if err := writer.Close(); err != nil { + if err := w.writer.Flush(); err != nil { + return fmt.Errorf("flushing parquet row group: %w", err) + } + + w.count += len(rows) + return nil +} + +// Close finalises the Parquet file (footer, buffered I/O, file handle). +func (w *parquetArchiveWriter) Close() error { + if err := w.writer.Close(); err != nil { + w.f.Close() return fmt.Errorf("closing parquet writer: %w", err) } - if err := bw.Flush(); err != nil { + if err := w.bw.Flush(); err != nil { + w.f.Close() return fmt.Errorf("flushing parquet file: %w", err) } - return nil + return w.f.Close() +} + +// sortParquetRows sorts rows by (metric, timestamp) in-place. +func sortParquetRows(rows []ParquetMetricRow) { + sort.Slice(rows, func(i, j int) bool { + if rows[i].Metric != rows[j].Metric { + return rows[i].Metric < rows[j].Metric + } + return rows[i].Timestamp < rows[j].Timestamp + }) } // loadCheckpointFileFromDisk reads a JSON or binary checkpoint file and returns @@ -179,6 +218,19 @@ func loadCheckpointFileFromDisk(filename string) (*CheckpointFile, error) { } } +// estimateRowCount estimates the number of Parquet rows a CheckpointFile will produce. +// Used for pre-allocating the rows slice to avoid repeated append doubling. +func estimateRowCount(cf *CheckpointFile) int { + n := 0 + for _, cm := range cf.Metrics { + n += len(cm.Data) + } + for _, child := range cf.Children { + n += estimateRowCount(child) + } + return n +} + // archiveCheckpointsToParquet reads checkpoint files for a host directory, // converts them to Parquet rows. Returns the rows and filenames that were processed. func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]ParquetMetricRow, []string, error) { @@ -196,7 +248,13 @@ func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]Parqu return nil, nil, nil } - var rows []ParquetMetricRow + // First pass: load checkpoints and estimate total rows for pre-allocation. + type loaded struct { + cf *CheckpointFile + filename string + } + var checkpoints []loaded + totalEstimate := 0 for _, checkpoint := range files { filename := filepath.Join(dir, checkpoint) @@ -205,9 +263,21 @@ func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]Parqu cclog.Warnf("[METRICSTORE]> skipping unreadable checkpoint %s: %v", filename, err) continue } - - rows = flattenCheckpointFile(cf, cluster, host, "node", "", rows) + totalEstimate += estimateRowCount(cf) + checkpoints = append(checkpoints, loaded{cf: cf, filename: checkpoint}) } - return rows, files, nil + if len(checkpoints) == 0 { + return nil, nil, nil + } + + rows := make([]ParquetMetricRow, 0, totalEstimate) + processedFiles := make([]string, 0, len(checkpoints)) + + for _, cp := range checkpoints { + rows = flattenCheckpointFile(cp.cf, cluster, host, "node", "", rows) + processedFiles = append(processedFiles, cp.filename) + } + + return rows, processedFiles, nil } diff --git a/pkg/metricstore/parquetArchive_test.go b/pkg/metricstore/parquetArchive_test.go index d3d70c02..e10b0d03 100644 --- a/pkg/metricstore/parquetArchive_test.go +++ b/pkg/metricstore/parquetArchive_test.go @@ -162,7 +162,15 @@ func TestParquetArchiveRoundtrip(t *testing.T) { } parquetFile := filepath.Join(archiveDir, "testcluster", "1000.parquet") - if err := writeParquetArchive(parquetFile, rows); err != nil { + writer, err := newParquetArchiveWriter(parquetFile) + if err != nil { + t.Fatal(err) + } + sortParquetRows(rows) + if err := writer.WriteHostRows(rows); err != nil { + t.Fatal(err) + } + if err := writer.Close(); err != nil { t.Fatal(err) } diff --git a/pkg/metricstore/walCheckpoint.go b/pkg/metricstore/walCheckpoint.go index e8a6586f..38585cf5 100644 --- a/pkg/metricstore/walCheckpoint.go +++ b/pkg/metricstore/walCheckpoint.go @@ -61,6 +61,7 @@ import ( "encoding/binary" "fmt" "hash/crc32" + "hash/fnv" "io" "math" "os" @@ -80,13 +81,15 @@ const ( snapFileMagic = uint32(0xCC5B0001) // Binary snapshot magic ) -// WALMessages is the channel for sending metric writes to the WAL staging goroutine. -// Buffered to allow burst writes without blocking the metric ingestion path. -var WALMessages = make(chan *WALMessage, 4096) +// walShardChs holds per-shard channels for WAL messages. +// Initialized by WALStaging; nil when WAL is not active. +var walShardChs []chan *WALMessage -// walRotateCh is used by the checkpoint goroutine to request WAL file rotation -// (close, delete, reopen) after a binary snapshot has been written. -var walRotateCh = make(chan walRotateReq, 256) +// walShardRotateChs holds per-shard channels for WAL rotation requests. +var walShardRotateChs []chan walRotateReq + +// walNumShards stores the number of shards (set during WALStaging init). +var walNumShards int // WALMessage represents a single metric write to be appended to the WAL. // Cluster and Node are NOT stored in the WAL record (inferred from file path). @@ -112,140 +115,218 @@ type walFileState struct { w *bufio.Writer } -// WALStaging starts a background goroutine that receives WALMessage items -// and appends binary WAL records to per-host current.wal files. -// Also handles WAL rotation requests from the checkpoint goroutine. +// walShardIndex computes which shard a message belongs to based on cluster+node. +// Uses FNV-1a hash for fast, well-distributed mapping. +func walShardIndex(cluster, node string) int { + h := fnv.New32a() + h.Write([]byte(cluster)) + h.Write([]byte{0}) + h.Write([]byte(node)) + return int(h.Sum32()) % walNumShards +} + +// SendWALMessage routes a WAL message to the appropriate shard channel. +// Returns false if the channel is full (message dropped). +func SendWALMessage(msg *WALMessage) bool { + if walShardChs == nil { + return false + } + shard := walShardIndex(msg.Cluster, msg.Node) + select { + case walShardChs[shard] <- msg: + return true + default: + return false + } +} + +// WALStaging starts N sharded background goroutines that receive WALMessage items +// and append binary WAL records to per-host current.wal files. +// Each shard owns a subset of hosts (determined by hash of cluster+node), +// parallelizing serialization and I/O while keeping per-host writes serial. func WALStaging(wg *sync.WaitGroup, ctx context.Context) { - wg.Go(func() { - if Keys.Checkpoints.FileFormat == "json" { - return - } + if Keys.Checkpoints.FileFormat == "json" { + return + } - hostFiles := make(map[string]*walFileState) + walNumShards = max(Keys.NumWorkers, 1) + chBufSize := max(65536/walNumShards, 1024) - defer func() { - for _, ws := range hostFiles { - if ws.f != nil { - ws.w.Flush() - ws.f.Close() + walShardChs = make([]chan *WALMessage, walNumShards) + walShardRotateChs = make([]chan walRotateReq, walNumShards) + + for i := range walNumShards { + walShardChs[i] = make(chan *WALMessage, chBufSize) + walShardRotateChs[i] = make(chan walRotateReq, 64) + } + + for i := range walNumShards { + msgCh := walShardChs[i] + rotateCh := walShardRotateChs[i] + + wg.Go(func() { + hostFiles := make(map[string]*walFileState) + + defer func() { + for _, ws := range hostFiles { + if ws.f != nil { + ws.w.Flush() + ws.f.Close() + } } - } - }() + }() - getOrOpenWAL := func(hostDir string) *walFileState { - ws, ok := hostFiles[hostDir] - if ok { + getOrOpenWAL := func(hostDir string) *walFileState { + ws, ok := hostFiles[hostDir] + if ok { + return ws + } + + if err := os.MkdirAll(hostDir, CheckpointDirPerms); err != nil { + cclog.Errorf("[METRICSTORE]> WAL: mkdir %s: %v", hostDir, err) + return nil + } + + walPath := path.Join(hostDir, "current.wal") + f, err := os.OpenFile(walPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, CheckpointFilePerms) + if err != nil { + cclog.Errorf("[METRICSTORE]> WAL: open %s: %v", walPath, err) + return nil + } + + w := bufio.NewWriter(f) + + // Write file header magic if file is new (empty). + info, err := f.Stat() + if err == nil && info.Size() == 0 { + var hdr [4]byte + binary.LittleEndian.PutUint32(hdr[:], walFileMagic) + if _, err := w.Write(hdr[:]); err != nil { + cclog.Errorf("[METRICSTORE]> WAL: write header %s: %v", walPath, err) + f.Close() + return nil + } + } + + ws = &walFileState{f: f, w: w} + hostFiles[hostDir] = ws return ws } - if err := os.MkdirAll(hostDir, CheckpointDirPerms); err != nil { - cclog.Errorf("[METRICSTORE]> WAL: mkdir %s: %v", hostDir, err) - return nil - } - - walPath := path.Join(hostDir, "current.wal") - f, err := os.OpenFile(walPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, CheckpointFilePerms) - if err != nil { - cclog.Errorf("[METRICSTORE]> WAL: open %s: %v", walPath, err) - return nil - } - - w := bufio.NewWriter(f) - - // Write file header magic if file is new (empty). - info, err := f.Stat() - if err == nil && info.Size() == 0 { - var hdr [4]byte - binary.LittleEndian.PutUint32(hdr[:], walFileMagic) - if _, err := w.Write(hdr[:]); err != nil { - cclog.Errorf("[METRICSTORE]> WAL: write header %s: %v", walPath, err) - f.Close() - return nil + processMsg := func(msg *WALMessage) { + hostDir := path.Join(Keys.Checkpoints.RootDir, msg.Cluster, msg.Node) + ws := getOrOpenWAL(hostDir) + if ws == nil { + return + } + if err := writeWALRecordDirect(ws.w, msg); err != nil { + cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) } } - ws = &walFileState{f: f, w: w} - hostFiles[hostDir] = ws - return ws - } - - processMsg := func(msg *WALMessage) { - hostDir := path.Join(Keys.Checkpoints.RootDir, msg.Cluster, msg.Node) - ws := getOrOpenWAL(hostDir) - if ws == nil { - return - } - if err := writeWALRecord(ws.w, msg); err != nil { - cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) - } - } - - processRotate := func(req walRotateReq) { - ws, ok := hostFiles[req.hostDir] - if ok && ws.f != nil { - ws.w.Flush() - ws.f.Close() - walPath := path.Join(req.hostDir, "current.wal") - if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { - cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err) + processRotate := func(req walRotateReq) { + ws, ok := hostFiles[req.hostDir] + if ok && ws.f != nil { + ws.w.Flush() + ws.f.Close() + walPath := path.Join(req.hostDir, "current.wal") + if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { + cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err) + } + delete(hostFiles, req.hostDir) + } + close(req.done) + } + + flushAll := func() { + for _, ws := range hostFiles { + if ws.f != nil { + ws.w.Flush() + } + } + } + + drain := func() { + for { + select { + case msg, ok := <-msgCh: + if !ok { + return + } + processMsg(msg) + case req := <-rotateCh: + processRotate(req) + default: + flushAll() + return + } } - delete(hostFiles, req.hostDir) } - close(req.done) - } - drain := func() { for { select { - case msg, ok := <-WALMessages: + case <-ctx.Done(): + drain() + return + case msg, ok := <-msgCh: if !ok { return } processMsg(msg) - case req := <-walRotateCh: - processRotate(req) - default: - // Flush all buffered writers after draining remaining messages. - for _, ws := range hostFiles { - if ws.f != nil { - ws.w.Flush() + + // Drain up to 256 more messages without blocking to batch writes. + for range 256 { + select { + case msg, ok := <-msgCh: + if !ok { + return + } + processMsg(msg) + case req := <-rotateCh: + processRotate(req) + default: + goto flushed } } - return + flushed: + flushAll() + case req := <-rotateCh: + processRotate(req) } } - } - - for { - select { - case <-ctx.Done(): - drain() - return - case msg, ok := <-WALMessages: - if !ok { - return - } - processMsg(msg) - case req := <-walRotateCh: - processRotate(req) - } - } - }) + }) + } } // RotateWALFiles sends rotation requests for the given host directories -// and blocks until all rotations complete. +// and blocks until all rotations complete. Each request is routed to the +// shard that owns the host directory. func RotateWALFiles(hostDirs []string) { + if walShardRotateChs == nil { + return + } dones := make([]chan struct{}, len(hostDirs)) for i, dir := range hostDirs { dones[i] = make(chan struct{}) - walRotateCh <- walRotateReq{hostDir: dir, done: dones[i]} + // Extract cluster/node from hostDir to find the right shard. + // hostDir = rootDir/cluster/node + shard := walShardIndexFromDir(dir) + walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: dones[i]} } for _, done := range dones { <-done } } +// walShardIndexFromDir extracts cluster and node from a host directory path +// and returns the shard index. The path format is: rootDir/cluster/node +func walShardIndexFromDir(hostDir string) int { + // hostDir = .../cluster/node — extract last two path components + node := path.Base(hostDir) + cluster := path.Base(path.Dir(hostDir)) + return walShardIndex(cluster, node) +} + // RotateWALFiles sends rotation requests for the given host directories // and blocks until all rotations complete. func RotateWALFilesAfterShutdown(hostDirs []string) { @@ -257,6 +338,81 @@ func RotateWALFilesAfterShutdown(hostDirs []string) { } } +// writeWALRecordDirect encodes a WAL record directly into the bufio.Writer, +// avoiding heap allocations by using a stack-allocated scratch buffer for +// the fixed-size header/trailer and computing CRC inline. +func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error { + // Compute payload size. + payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4 + for _, s := range msg.Selector { + payloadSize += 1 + len(s) + } + + // Write magic + payload length (8 bytes header). + var hdr [8]byte + binary.LittleEndian.PutUint32(hdr[0:4], walRecordMagic) + binary.LittleEndian.PutUint32(hdr[4:8], uint32(payloadSize)) + if _, err := w.Write(hdr[:]); err != nil { + return err + } + + // We need to compute CRC over the payload as we write it. + crc := crc32.NewIEEE() + + // Timestamp (8 bytes). + var scratch [8]byte + binary.LittleEndian.PutUint64(scratch[:8], uint64(msg.Timestamp)) + crc.Write(scratch[:8]) + if _, err := w.Write(scratch[:8]); err != nil { + return err + } + + // Metric name length (2 bytes) + metric name. + binary.LittleEndian.PutUint16(scratch[:2], uint16(len(msg.MetricName))) + crc.Write(scratch[:2]) + if _, err := w.Write(scratch[:2]); err != nil { + return err + } + nameBytes := []byte(msg.MetricName) + crc.Write(nameBytes) + if _, err := w.Write(nameBytes); err != nil { + return err + } + + // Selector count (1 byte). + scratch[0] = byte(len(msg.Selector)) + crc.Write(scratch[:1]) + if _, err := w.Write(scratch[:1]); err != nil { + return err + } + + // Selectors (1-byte length + bytes each). + for _, sel := range msg.Selector { + scratch[0] = byte(len(sel)) + crc.Write(scratch[:1]) + if _, err := w.Write(scratch[:1]); err != nil { + return err + } + selBytes := []byte(sel) + crc.Write(selBytes) + if _, err := w.Write(selBytes); err != nil { + return err + } + } + + // Value (4 bytes, float32 bits). + binary.LittleEndian.PutUint32(scratch[:4], math.Float32bits(float32(msg.Value))) + crc.Write(scratch[:4]) + if _, err := w.Write(scratch[:4]); err != nil { + return err + } + + // CRC32 (4 bytes). + binary.LittleEndian.PutUint32(scratch[:4], crc.Sum32()) + _, err := w.Write(scratch[:4]) + return err +} + // buildWALPayload encodes a WALMessage into a binary payload (without magic/length/CRC). func buildWALPayload(msg *WALMessage) []byte { size := 8 + 2 + len(msg.MetricName) + 1 + 4