From 02f82c2c0bb0bf3cffa7704f267c24878f517128 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 18 Mar 2026 05:08:37 +0100 Subject: [PATCH 1/8] fix: Prevent memory spikes in parquet writer for metricstore move policy Entire-Checkpoint: 4a675b8352a2 --- pkg/metricstore/archive.go | 57 +++++++++---- pkg/metricstore/parquetArchive.go | 108 ++++++++++++++++++++----- pkg/metricstore/parquetArchive_test.go | 10 ++- 3 files changed, 140 insertions(+), 35 deletions(-) diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index 916736d0..5c1a8f0f 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -168,6 +168,8 @@ func deleteCheckpoints(checkpointsDir string, from int64) (int, error) { // archiveCheckpoints archives checkpoint files to Parquet format. // Produces one Parquet file per cluster: //.parquet +// Each host's rows are written as a separate row group to avoid accumulating +// all data in memory at once. func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, error) { clusterEntries, err := os.ReadDir(checkpointsDir) if err != nil { @@ -187,7 +189,7 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err return totalFiles, err } - // Collect rows from all hosts in this cluster using worker pool + // Stream per-host rows to parquet writer via worker pool type hostResult struct { rows []ParquetMetricRow files []string // checkpoint filenames to delete after successful write @@ -235,32 +237,57 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err close(results) }() - // Collect all rows and file info - var allRows []ParquetMetricRow - var allResults []hostResult + // Open streaming writer and write each host's rows as a row group + parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from)) + writer, err := newParquetArchiveWriter(parquetFile) + if err != nil { + // Drain results channel to unblock workers + for range results { + } + return totalFiles, fmt.Errorf("creating parquet writer for cluster %s: %w", cluster, err) + } + + type deleteItem struct { + dir string + files []string + } + var toDelete []deleteItem + writeErr := error(nil) + for r := range results { - allRows = append(allRows, r.rows...) - allResults = append(allResults, r) + if writeErr == nil { + sortParquetRows(r.rows) + if err := writer.WriteHostRows(r.rows); err != nil { + writeErr = err + } + } + // Always track files for deletion (even if write failed, we still drain) + toDelete = append(toDelete, deleteItem{dir: r.dir, files: r.files}) + } + + if err := writer.Close(); err != nil && writeErr == nil { + writeErr = err } if errs > 0 { return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster) } - if len(allRows) == 0 { + if writer.count == 0 { + // No data written — remove empty file + os.Remove(parquetFile) continue } - // Write one Parquet file per cluster - parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from)) - if err := writeParquetArchive(parquetFile, allRows); err != nil { - return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, err) + if writeErr != nil { + os.Remove(parquetFile) + return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, writeErr) } // Delete archived checkpoint files - for _, result := range allResults { - for _, file := range result.files { - filename := filepath.Join(result.dir, file) + for _, item := range toDelete { + for _, file := range item.files { + filename := filepath.Join(item.dir, file) if err := os.Remove(filename); err != nil { cclog.Warnf("[METRICSTORE]> could not remove archived checkpoint %s: %v", filename, err) } else { @@ -270,7 +297,7 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err } cclog.Infof("[METRICSTORE]> archived %d rows from %d files for cluster %s to %s", - len(allRows), totalFiles, cluster, parquetFile) + writer.count, totalFiles, cluster, parquetFile) } return totalFiles, nil diff --git a/pkg/metricstore/parquetArchive.go b/pkg/metricstore/parquetArchive.go index 420ee4e5..18ee2c64 100644 --- a/pkg/metricstore/parquetArchive.go +++ b/pkg/metricstore/parquetArchive.go @@ -12,6 +12,7 @@ import ( "fmt" "os" "path/filepath" + "sort" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" pq "github.com/parquet-go/parquet-go" @@ -91,43 +92,81 @@ func parseScopeFromName(name string) (string, string) { return name, "" } -// writeParquetArchive writes rows to a Parquet file with Zstd compression. -func writeParquetArchive(filename string, rows []ParquetMetricRow) error { +// parquetArchiveWriter supports incremental writes to a Parquet file. +// Each call to WriteHostRows writes one row group (typically one host's data), +// avoiding accumulation of all rows in memory. +type parquetArchiveWriter struct { + writer *pq.GenericWriter[ParquetMetricRow] + bw *bufio.Writer + f *os.File + count int +} + +// newParquetArchiveWriter creates a streaming Parquet writer with Zstd compression. +func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) { if err := os.MkdirAll(filepath.Dir(filename), CheckpointDirPerms); err != nil { - return fmt.Errorf("creating archive directory: %w", err) + return nil, fmt.Errorf("creating archive directory: %w", err) } f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) if err != nil { - return fmt.Errorf("creating parquet file: %w", err) + return nil, fmt.Errorf("creating parquet file: %w", err) } - defer f.Close() bw := bufio.NewWriterSize(f, 1<<20) // 1MB write buffer writer := pq.NewGenericWriter[ParquetMetricRow](bw, pq.Compression(&pq.Zstd), - pq.SortingWriterConfig(pq.SortingColumns( - pq.Ascending("cluster"), - pq.Ascending("hostname"), - pq.Ascending("metric"), - pq.Ascending("timestamp"), - )), ) - if _, err := writer.Write(rows); err != nil { + return &parquetArchiveWriter{writer: writer, bw: bw, f: f}, nil +} + +// WriteHostRows sorts rows by (metric, timestamp) in-place, writes them, +// and flushes to create a separate row group. +func (w *parquetArchiveWriter) WriteHostRows(rows []ParquetMetricRow) error { + sort.Slice(rows, func(i, j int) bool { + if rows[i].Metric != rows[j].Metric { + return rows[i].Metric < rows[j].Metric + } + return rows[i].Timestamp < rows[j].Timestamp + }) + + if _, err := w.writer.Write(rows); err != nil { return fmt.Errorf("writing parquet rows: %w", err) } - if err := writer.Close(); err != nil { + if err := w.writer.Flush(); err != nil { + return fmt.Errorf("flushing parquet row group: %w", err) + } + + w.count += len(rows) + return nil +} + +// Close finalises the Parquet file (footer, buffered I/O, file handle). +func (w *parquetArchiveWriter) Close() error { + if err := w.writer.Close(); err != nil { + w.f.Close() return fmt.Errorf("closing parquet writer: %w", err) } - if err := bw.Flush(); err != nil { + if err := w.bw.Flush(); err != nil { + w.f.Close() return fmt.Errorf("flushing parquet file: %w", err) } - return nil + return w.f.Close() +} + +// sortParquetRows sorts rows by (metric, timestamp) in-place. +func sortParquetRows(rows []ParquetMetricRow) { + sort.Slice(rows, func(i, j int) bool { + if rows[i].Metric != rows[j].Metric { + return rows[i].Metric < rows[j].Metric + } + return rows[i].Timestamp < rows[j].Timestamp + }) } // loadCheckpointFileFromDisk reads a JSON or binary checkpoint file and returns @@ -179,6 +218,19 @@ func loadCheckpointFileFromDisk(filename string) (*CheckpointFile, error) { } } +// estimateRowCount estimates the number of Parquet rows a CheckpointFile will produce. +// Used for pre-allocating the rows slice to avoid repeated append doubling. +func estimateRowCount(cf *CheckpointFile) int { + n := 0 + for _, cm := range cf.Metrics { + n += len(cm.Data) + } + for _, child := range cf.Children { + n += estimateRowCount(child) + } + return n +} + // archiveCheckpointsToParquet reads checkpoint files for a host directory, // converts them to Parquet rows. Returns the rows and filenames that were processed. func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]ParquetMetricRow, []string, error) { @@ -196,7 +248,13 @@ func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]Parqu return nil, nil, nil } - var rows []ParquetMetricRow + // First pass: load checkpoints and estimate total rows for pre-allocation. + type loaded struct { + cf *CheckpointFile + filename string + } + var checkpoints []loaded + totalEstimate := 0 for _, checkpoint := range files { filename := filepath.Join(dir, checkpoint) @@ -205,9 +263,21 @@ func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]Parqu cclog.Warnf("[METRICSTORE]> skipping unreadable checkpoint %s: %v", filename, err) continue } - - rows = flattenCheckpointFile(cf, cluster, host, "node", "", rows) + totalEstimate += estimateRowCount(cf) + checkpoints = append(checkpoints, loaded{cf: cf, filename: checkpoint}) } - return rows, files, nil + if len(checkpoints) == 0 { + return nil, nil, nil + } + + rows := make([]ParquetMetricRow, 0, totalEstimate) + processedFiles := make([]string, 0, len(checkpoints)) + + for _, cp := range checkpoints { + rows = flattenCheckpointFile(cp.cf, cluster, host, "node", "", rows) + processedFiles = append(processedFiles, cp.filename) + } + + return rows, processedFiles, nil } diff --git a/pkg/metricstore/parquetArchive_test.go b/pkg/metricstore/parquetArchive_test.go index d3d70c02..e10b0d03 100644 --- a/pkg/metricstore/parquetArchive_test.go +++ b/pkg/metricstore/parquetArchive_test.go @@ -162,7 +162,15 @@ func TestParquetArchiveRoundtrip(t *testing.T) { } parquetFile := filepath.Join(archiveDir, "testcluster", "1000.parquet") - if err := writeParquetArchive(parquetFile, rows); err != nil { + writer, err := newParquetArchiveWriter(parquetFile) + if err != nil { + t.Fatal(err) + } + sortParquetRows(rows) + if err := writer.WriteHostRows(rows); err != nil { + t.Fatal(err) + } + if err := writer.Close(); err != nil { t.Fatal(err) } From d46e6371fc74285d69021af357c1c53963140815 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 18 Mar 2026 05:22:39 +0100 Subject: [PATCH 2/8] Add log about checkpoint archiving Entire-Checkpoint: bf29af79b268 --- pkg/metricstore/archive.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index 5c1a8f0f..deebc869 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -171,6 +171,9 @@ func deleteCheckpoints(checkpointsDir string, from int64) (int, error) { // Each host's rows are written as a separate row group to avoid accumulating // all data in memory at once. func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, error) { + cclog.Info("[METRICSTORE]> start archiving checkpoints to parquet") + startTime := time.Now() + clusterEntries, err := os.ReadDir(checkpointsDir) if err != nil { return 0, err @@ -300,5 +303,6 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err writer.count, totalFiles, cluster, parquetFile) } + cclog.Infof("[METRICSTORE]> archiving checkpoints completed in %s (%d files)", time.Since(startTime).Round(time.Millisecond), totalFiles) return totalFiles, nil } From 045f81f985d8ba4adfbe1ebb62e6bf7255e40bd2 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 18 Mar 2026 05:31:49 +0100 Subject: [PATCH 3/8] Prepare release v1.5.2 Entire-Checkpoint: 9286f4c43ab5 --- Makefile | 2 +- ReleaseNotes.md | 48 +++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 8f805c87..61e74c71 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ TARGET = ./cc-backend FRONTEND = ./web/frontend -VERSION = 1.5.1 +VERSION = 1.5.2 GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development') CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S") LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}' diff --git a/ReleaseNotes.md b/ReleaseNotes.md index 84e3a417..1ac2f229 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -1,16 +1,62 @@ -# `cc-backend` version 1.5.1 +# `cc-backend` version 1.5.2 Supports job archive version 3 and database version 11. This is a bugfix release of `cc-backend`, the API backend and frontend implementation of ClusterCockpit. For release specific notes visit the [ClusterCockpit Documentation](https://clusterockpit.org/docs/release/). +If you are upgrading from v1.5.1 no database migration is required. If you are upgrading from v1.5.0 you need to do another DB migration. This should not take long. For optimal database performance after the migration it is recommended to apply the new `optimize-db` flag, which runs the sqlite `ANALYZE` and `VACUUM` commands. Depending on your database size (more then 40GB) the `VACUUM` may take up to 2h. +## Changes in 1.5.2 + +### Bug fixes + +- **Memory spike in parquet writer**: Fixed memory spikes when using the + metricstore move (archive) policy with the parquet writer. The writer now + processes data in a streaming fashion to avoid accumulating large allocations. + +### Database performance + +- **Reduced insert pressure**: Bulk insert operations (node state updates, user + and job cache syncs) now use explicit transactions and deferred inserts, + significantly reducing write contention on the SQLite database. +- **SyncJobs wrapped in transaction**: `SyncJobs` now runs inside a transaction + for better consistency and reduced lock contention. +- **Configurable busy timeout**: New `busy-timeout` configuration option for the + SQLite connection. This allows tuning how long the driver waits for a locked + database before returning an error, which improves resilience under concurrent + write load. +- **Increased default SQLite timeout**: The default SQLite connection timeout + has been raised to reduce spurious timeout errors under load. + +### NATS API + +- **Nodestate health checks in NATS API**: The NATS node state handler now + performs the same metric health checks as the REST API handler, including + per-subcluster health checks and `MonitoringStateFailed` fallback for nodes + without health data. + +### Logging improvements + +- **Better error context**: Several repository functions now include the calling + function name in error messages for easier diagnosis. +- **Reduced log noise**: `ErrNoRows` (no results found) is no longer logged as + an error in `scanRow`; common "no rows" paths are now silent. +- **Debug-level missing metrics**: Warning about missing metrics in the metric + store has been downgraded to debug level to reduce log noise in normal + operation. +- **Checkpoint archiving log**: Added an informational log message when the + metricstore checkpoint archiving process runs. + +### Dependencies + +- **cc-lib upgraded**: Updated to latest cc-lib version. + ## Known issues - The new dynamic memory management is not bullet proof yet across restarts. From 33bc19c732ea493d519a8ada7d61875d2884f4e4 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 18 Mar 2026 05:52:58 +0100 Subject: [PATCH 4/8] Upgrade cc-lib --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 802bfb19..1e3b7bf1 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ tool ( require ( github.com/99designs/gqlgen v0.17.88 - github.com/ClusterCockpit/cc-lib/v2 v2.9.0 + github.com/ClusterCockpit/cc-lib/v2 v2.9.1 github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 github.com/Masterminds/squirrel v1.5.4 github.com/aws/aws-sdk-go-v2 v1.41.3 diff --git a/go.sum b/go.sum index ec440467..4006036e 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/ClusterCockpit/cc-lib/v2 v2.8.2 h1:rCLZk8wz8yq8xBnBEdVKigvA2ngR8dPmHb github.com/ClusterCockpit/cc-lib/v2 v2.8.2/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o= github.com/ClusterCockpit/cc-lib/v2 v2.9.0 h1:mzUYakcjwb+UP5II4jOvr36rSYct90gXBbtUg+nvm9c= github.com/ClusterCockpit/cc-lib/v2 v2.9.0/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o= +github.com/ClusterCockpit/cc-lib/v2 v2.9.1 h1:eplKhXQyGAElBGCEGdmxwj7fLv26Op16uK0KxUePDak= +github.com/ClusterCockpit/cc-lib/v2 v2.9.1/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o= github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 h1:hIzxgTBWcmCIHtoDKDkSCsKCOCOwUC34sFsbD2wcW0Q= github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0/go.mod h1:y42qUu+YFmu5fdNuUAS4VbbIKxVjxCvbVqFdpdh8ahY= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= From 50aed595cf2c04031fee0bccace2108071b79ef4 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 18 Mar 2026 06:14:15 +0100 Subject: [PATCH 5/8] fix: metricstore NATS contention Entire-Checkpoint: 7e68050cab59 --- pkg/metricstore/lineprotocol.go | 18 ++++++++++++++++-- pkg/metricstore/metricstore.go | 2 +- pkg/metricstore/walCheckpoint.go | 24 +++++++++++++++++++++++- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/pkg/metricstore/lineprotocol.go b/pkg/metricstore/lineprotocol.go index 21f036ed..d7b96b91 100644 --- a/pkg/metricstore/lineprotocol.go +++ b/pkg/metricstore/lineprotocol.go @@ -23,6 +23,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" @@ -53,7 +54,7 @@ func ReceiveNats(ms *MemoryStore, } var wg sync.WaitGroup - msgs := make(chan []byte, workers*2) + msgs := make(chan []byte, max(workers*256, 8192)) for _, sc := range *Keys.Subscriptions { clusterTag := sc.ClusterTag @@ -123,6 +124,10 @@ func reorder(buf, prefix []byte) []byte { } } +// walDropped counts WAL messages dropped due to a full channel. +// Logged periodically; not critical since binary snapshots capture the data. +var walDropped atomic.Int64 + // decodeState holds the per-call scratch buffers used by DecodeLine. // Instances are recycled via decodeStatePool to avoid repeated allocations // during high-throughput ingestion. @@ -348,7 +353,7 @@ func DecodeLine(dec *lineprotocol.Decoder, time := t.Unix() if Keys.Checkpoints.FileFormat == "wal" { - WALMessages <- &WALMessage{ + msg := &WALMessage{ MetricName: string(st.metricBuf), Cluster: cluster, Node: host, @@ -356,6 +361,15 @@ func DecodeLine(dec *lineprotocol.Decoder, Value: metric.Value, Timestamp: time, } + select { + case WALMessages <- msg: + default: + // WAL channel full — metric is written to memory store but not WAL. + // Next binary snapshot will capture it. + if dropped := walDropped.Add(1); dropped%10000 == 1 { + cclog.Warnf("[METRICSTORE]> WAL channel full, dropped %d messages (data safe in memory, next snapshot will capture)", dropped) + } + } } if err := ms.WriteToLevel(lvl, st.selector, time, []Metric{metric}); err != nil { diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 84dad72b..cd74f481 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -184,7 +184,7 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W shutdownFuncMu.Unlock() if Keys.Subscriptions != nil { - err = ReceiveNats(ms, 1, ctx) + err = ReceiveNats(ms, Keys.NumWorkers, ctx) if err != nil { cclog.Fatal(err) } diff --git a/pkg/metricstore/walCheckpoint.go b/pkg/metricstore/walCheckpoint.go index e8a6586f..2131331e 100644 --- a/pkg/metricstore/walCheckpoint.go +++ b/pkg/metricstore/walCheckpoint.go @@ -82,7 +82,7 @@ const ( // WALMessages is the channel for sending metric writes to the WAL staging goroutine. // Buffered to allow burst writes without blocking the metric ingestion path. -var WALMessages = make(chan *WALMessage, 4096) +var WALMessages = make(chan *WALMessage, 65536) // walRotateCh is used by the checkpoint goroutine to request WAL file rotation // (close, delete, reopen) after a binary snapshot has been written. @@ -226,6 +226,28 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { return } processMsg(msg) + + // Drain up to 256 more messages without blocking to batch writes. + for range 256 { + select { + case msg, ok := <-WALMessages: + if !ok { + return + } + processMsg(msg) + case req := <-walRotateCh: + processRotate(req) + default: + goto flushed + } + } + flushed: + // Flush all buffered writers after processing the batch. + for _, ws := range hostFiles { + if ws.f != nil { + ws.w.Flush() + } + } case req := <-walRotateCh: processRotate(req) } From bf1a8a174eca262d3f3d1179df3bff86f276d9a3 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 18 Mar 2026 06:32:14 +0100 Subject: [PATCH 6/8] fix: Shard WAL consumer for higher throughput Entire-Checkpoint: e583b7b11439 --- pkg/metricstore/lineprotocol.go | 8 +- pkg/metricstore/metricstore.go | 4 +- pkg/metricstore/walCheckpoint.go | 392 +++++++++++++++++++++---------- 3 files changed, 269 insertions(+), 135 deletions(-) diff --git a/pkg/metricstore/lineprotocol.go b/pkg/metricstore/lineprotocol.go index d7b96b91..caec82e9 100644 --- a/pkg/metricstore/lineprotocol.go +++ b/pkg/metricstore/lineprotocol.go @@ -361,13 +361,11 @@ func DecodeLine(dec *lineprotocol.Decoder, Value: metric.Value, Timestamp: time, } - select { - case WALMessages <- msg: - default: - // WAL channel full — metric is written to memory store but not WAL. + if !SendWALMessage(msg) { + // WAL shard channel full — metric is written to memory store but not WAL. // Next binary snapshot will capture it. if dropped := walDropped.Add(1); dropped%10000 == 1 { - cclog.Warnf("[METRICSTORE]> WAL channel full, dropped %d messages (data safe in memory, next snapshot will capture)", dropped) + cclog.Warnf("[METRICSTORE]> WAL shard channel full, dropped %d messages (data safe in memory, next snapshot will capture)", dropped) } } } diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index cd74f481..83c78ab2 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -277,7 +277,9 @@ func Shutdown() { } if Keys.Checkpoints.FileFormat == "wal" { - close(WALMessages) + for _, ch := range walShardChs { + close(ch) + } } cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir) diff --git a/pkg/metricstore/walCheckpoint.go b/pkg/metricstore/walCheckpoint.go index 2131331e..38585cf5 100644 --- a/pkg/metricstore/walCheckpoint.go +++ b/pkg/metricstore/walCheckpoint.go @@ -61,6 +61,7 @@ import ( "encoding/binary" "fmt" "hash/crc32" + "hash/fnv" "io" "math" "os" @@ -80,13 +81,15 @@ const ( snapFileMagic = uint32(0xCC5B0001) // Binary snapshot magic ) -// WALMessages is the channel for sending metric writes to the WAL staging goroutine. -// Buffered to allow burst writes without blocking the metric ingestion path. -var WALMessages = make(chan *WALMessage, 65536) +// walShardChs holds per-shard channels for WAL messages. +// Initialized by WALStaging; nil when WAL is not active. +var walShardChs []chan *WALMessage -// walRotateCh is used by the checkpoint goroutine to request WAL file rotation -// (close, delete, reopen) after a binary snapshot has been written. -var walRotateCh = make(chan walRotateReq, 256) +// walShardRotateChs holds per-shard channels for WAL rotation requests. +var walShardRotateChs []chan walRotateReq + +// walNumShards stores the number of shards (set during WALStaging init). +var walNumShards int // WALMessage represents a single metric write to be appended to the WAL. // Cluster and Node are NOT stored in the WAL record (inferred from file path). @@ -112,162 +115,218 @@ type walFileState struct { w *bufio.Writer } -// WALStaging starts a background goroutine that receives WALMessage items -// and appends binary WAL records to per-host current.wal files. -// Also handles WAL rotation requests from the checkpoint goroutine. +// walShardIndex computes which shard a message belongs to based on cluster+node. +// Uses FNV-1a hash for fast, well-distributed mapping. +func walShardIndex(cluster, node string) int { + h := fnv.New32a() + h.Write([]byte(cluster)) + h.Write([]byte{0}) + h.Write([]byte(node)) + return int(h.Sum32()) % walNumShards +} + +// SendWALMessage routes a WAL message to the appropriate shard channel. +// Returns false if the channel is full (message dropped). +func SendWALMessage(msg *WALMessage) bool { + if walShardChs == nil { + return false + } + shard := walShardIndex(msg.Cluster, msg.Node) + select { + case walShardChs[shard] <- msg: + return true + default: + return false + } +} + +// WALStaging starts N sharded background goroutines that receive WALMessage items +// and append binary WAL records to per-host current.wal files. +// Each shard owns a subset of hosts (determined by hash of cluster+node), +// parallelizing serialization and I/O while keeping per-host writes serial. func WALStaging(wg *sync.WaitGroup, ctx context.Context) { - wg.Go(func() { - if Keys.Checkpoints.FileFormat == "json" { - return - } + if Keys.Checkpoints.FileFormat == "json" { + return + } - hostFiles := make(map[string]*walFileState) + walNumShards = max(Keys.NumWorkers, 1) + chBufSize := max(65536/walNumShards, 1024) - defer func() { - for _, ws := range hostFiles { - if ws.f != nil { - ws.w.Flush() - ws.f.Close() + walShardChs = make([]chan *WALMessage, walNumShards) + walShardRotateChs = make([]chan walRotateReq, walNumShards) + + for i := range walNumShards { + walShardChs[i] = make(chan *WALMessage, chBufSize) + walShardRotateChs[i] = make(chan walRotateReq, 64) + } + + for i := range walNumShards { + msgCh := walShardChs[i] + rotateCh := walShardRotateChs[i] + + wg.Go(func() { + hostFiles := make(map[string]*walFileState) + + defer func() { + for _, ws := range hostFiles { + if ws.f != nil { + ws.w.Flush() + ws.f.Close() + } } - } - }() + }() - getOrOpenWAL := func(hostDir string) *walFileState { - ws, ok := hostFiles[hostDir] - if ok { + getOrOpenWAL := func(hostDir string) *walFileState { + ws, ok := hostFiles[hostDir] + if ok { + return ws + } + + if err := os.MkdirAll(hostDir, CheckpointDirPerms); err != nil { + cclog.Errorf("[METRICSTORE]> WAL: mkdir %s: %v", hostDir, err) + return nil + } + + walPath := path.Join(hostDir, "current.wal") + f, err := os.OpenFile(walPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, CheckpointFilePerms) + if err != nil { + cclog.Errorf("[METRICSTORE]> WAL: open %s: %v", walPath, err) + return nil + } + + w := bufio.NewWriter(f) + + // Write file header magic if file is new (empty). + info, err := f.Stat() + if err == nil && info.Size() == 0 { + var hdr [4]byte + binary.LittleEndian.PutUint32(hdr[:], walFileMagic) + if _, err := w.Write(hdr[:]); err != nil { + cclog.Errorf("[METRICSTORE]> WAL: write header %s: %v", walPath, err) + f.Close() + return nil + } + } + + ws = &walFileState{f: f, w: w} + hostFiles[hostDir] = ws return ws } - if err := os.MkdirAll(hostDir, CheckpointDirPerms); err != nil { - cclog.Errorf("[METRICSTORE]> WAL: mkdir %s: %v", hostDir, err) - return nil - } - - walPath := path.Join(hostDir, "current.wal") - f, err := os.OpenFile(walPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, CheckpointFilePerms) - if err != nil { - cclog.Errorf("[METRICSTORE]> WAL: open %s: %v", walPath, err) - return nil - } - - w := bufio.NewWriter(f) - - // Write file header magic if file is new (empty). - info, err := f.Stat() - if err == nil && info.Size() == 0 { - var hdr [4]byte - binary.LittleEndian.PutUint32(hdr[:], walFileMagic) - if _, err := w.Write(hdr[:]); err != nil { - cclog.Errorf("[METRICSTORE]> WAL: write header %s: %v", walPath, err) - f.Close() - return nil - } - } - - ws = &walFileState{f: f, w: w} - hostFiles[hostDir] = ws - return ws - } - - processMsg := func(msg *WALMessage) { - hostDir := path.Join(Keys.Checkpoints.RootDir, msg.Cluster, msg.Node) - ws := getOrOpenWAL(hostDir) - if ws == nil { - return - } - if err := writeWALRecord(ws.w, msg); err != nil { - cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) - } - } - - processRotate := func(req walRotateReq) { - ws, ok := hostFiles[req.hostDir] - if ok && ws.f != nil { - ws.w.Flush() - ws.f.Close() - walPath := path.Join(req.hostDir, "current.wal") - if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { - cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err) - } - delete(hostFiles, req.hostDir) - } - close(req.done) - } - - drain := func() { - for { - select { - case msg, ok := <-WALMessages: - if !ok { - return - } - processMsg(msg) - case req := <-walRotateCh: - processRotate(req) - default: - // Flush all buffered writers after draining remaining messages. - for _, ws := range hostFiles { - if ws.f != nil { - ws.w.Flush() - } - } + processMsg := func(msg *WALMessage) { + hostDir := path.Join(Keys.Checkpoints.RootDir, msg.Cluster, msg.Node) + ws := getOrOpenWAL(hostDir) + if ws == nil { return } + if err := writeWALRecordDirect(ws.w, msg); err != nil { + cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) + } } - } - for { - select { - case <-ctx.Done(): - drain() - return - case msg, ok := <-WALMessages: - if !ok { - return - } - processMsg(msg) - - // Drain up to 256 more messages without blocking to batch writes. - for range 256 { - select { - case msg, ok := <-WALMessages: - if !ok { - return - } - processMsg(msg) - case req := <-walRotateCh: - processRotate(req) - default: - goto flushed + processRotate := func(req walRotateReq) { + ws, ok := hostFiles[req.hostDir] + if ok && ws.f != nil { + ws.w.Flush() + ws.f.Close() + walPath := path.Join(req.hostDir, "current.wal") + if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { + cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err) } + delete(hostFiles, req.hostDir) } - flushed: - // Flush all buffered writers after processing the batch. + close(req.done) + } + + flushAll := func() { for _, ws := range hostFiles { if ws.f != nil { ws.w.Flush() } } - case req := <-walRotateCh: - processRotate(req) } - } - }) + + drain := func() { + for { + select { + case msg, ok := <-msgCh: + if !ok { + return + } + processMsg(msg) + case req := <-rotateCh: + processRotate(req) + default: + flushAll() + return + } + } + } + + for { + select { + case <-ctx.Done(): + drain() + return + case msg, ok := <-msgCh: + if !ok { + return + } + processMsg(msg) + + // Drain up to 256 more messages without blocking to batch writes. + for range 256 { + select { + case msg, ok := <-msgCh: + if !ok { + return + } + processMsg(msg) + case req := <-rotateCh: + processRotate(req) + default: + goto flushed + } + } + flushed: + flushAll() + case req := <-rotateCh: + processRotate(req) + } + } + }) + } } // RotateWALFiles sends rotation requests for the given host directories -// and blocks until all rotations complete. +// and blocks until all rotations complete. Each request is routed to the +// shard that owns the host directory. func RotateWALFiles(hostDirs []string) { + if walShardRotateChs == nil { + return + } dones := make([]chan struct{}, len(hostDirs)) for i, dir := range hostDirs { dones[i] = make(chan struct{}) - walRotateCh <- walRotateReq{hostDir: dir, done: dones[i]} + // Extract cluster/node from hostDir to find the right shard. + // hostDir = rootDir/cluster/node + shard := walShardIndexFromDir(dir) + walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: dones[i]} } for _, done := range dones { <-done } } +// walShardIndexFromDir extracts cluster and node from a host directory path +// and returns the shard index. The path format is: rootDir/cluster/node +func walShardIndexFromDir(hostDir string) int { + // hostDir = .../cluster/node — extract last two path components + node := path.Base(hostDir) + cluster := path.Base(path.Dir(hostDir)) + return walShardIndex(cluster, node) +} + // RotateWALFiles sends rotation requests for the given host directories // and blocks until all rotations complete. func RotateWALFilesAfterShutdown(hostDirs []string) { @@ -279,6 +338,81 @@ func RotateWALFilesAfterShutdown(hostDirs []string) { } } +// writeWALRecordDirect encodes a WAL record directly into the bufio.Writer, +// avoiding heap allocations by using a stack-allocated scratch buffer for +// the fixed-size header/trailer and computing CRC inline. +func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error { + // Compute payload size. + payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4 + for _, s := range msg.Selector { + payloadSize += 1 + len(s) + } + + // Write magic + payload length (8 bytes header). + var hdr [8]byte + binary.LittleEndian.PutUint32(hdr[0:4], walRecordMagic) + binary.LittleEndian.PutUint32(hdr[4:8], uint32(payloadSize)) + if _, err := w.Write(hdr[:]); err != nil { + return err + } + + // We need to compute CRC over the payload as we write it. + crc := crc32.NewIEEE() + + // Timestamp (8 bytes). + var scratch [8]byte + binary.LittleEndian.PutUint64(scratch[:8], uint64(msg.Timestamp)) + crc.Write(scratch[:8]) + if _, err := w.Write(scratch[:8]); err != nil { + return err + } + + // Metric name length (2 bytes) + metric name. + binary.LittleEndian.PutUint16(scratch[:2], uint16(len(msg.MetricName))) + crc.Write(scratch[:2]) + if _, err := w.Write(scratch[:2]); err != nil { + return err + } + nameBytes := []byte(msg.MetricName) + crc.Write(nameBytes) + if _, err := w.Write(nameBytes); err != nil { + return err + } + + // Selector count (1 byte). + scratch[0] = byte(len(msg.Selector)) + crc.Write(scratch[:1]) + if _, err := w.Write(scratch[:1]); err != nil { + return err + } + + // Selectors (1-byte length + bytes each). + for _, sel := range msg.Selector { + scratch[0] = byte(len(sel)) + crc.Write(scratch[:1]) + if _, err := w.Write(scratch[:1]); err != nil { + return err + } + selBytes := []byte(sel) + crc.Write(selBytes) + if _, err := w.Write(selBytes); err != nil { + return err + } + } + + // Value (4 bytes, float32 bits). + binary.LittleEndian.PutUint32(scratch[:4], math.Float32bits(float32(msg.Value))) + crc.Write(scratch[:4]) + if _, err := w.Write(scratch[:4]); err != nil { + return err + } + + // CRC32 (4 bytes). + binary.LittleEndian.PutUint32(scratch[:4], crc.Sum32()) + _, err := w.Write(scratch[:4]) + return err +} + // buildWALPayload encodes a WALMessage into a binary payload (without magic/length/CRC). func buildWALPayload(msg *WALMessage) []byte { size := 8 + 2 + len(msg.MetricName) + 1 + 4 From 8b132ed7f82631de23a63d0cb4ea59f43127a953 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 18 Mar 2026 06:47:45 +0100 Subject: [PATCH 7/8] fix: Blocking ReceiveNats call Entire-Checkpoint: 38a235c86ceb --- pkg/metricstore/metricstore.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 83c78ab2..1c475269 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -184,10 +184,11 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W shutdownFuncMu.Unlock() if Keys.Subscriptions != nil { - err = ReceiveNats(ms, Keys.NumWorkers, ctx) - if err != nil { - cclog.Fatal(err) - } + wg.Go(func() { + if err := ReceiveNats(ms, Keys.NumWorkers, ctx); err != nil { + cclog.Fatal(err) + } + }) } } From 6ebc9e88fa3fb418bea9acc0056a2caa10148eb5 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 18 Mar 2026 06:56:01 +0100 Subject: [PATCH 8/8] Add more context information to auth failed log Entire-Checkpoint: 2187cd89cb78 --- internal/auth/auth.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/auth/auth.go b/internal/auth/auth.go index 69f4f078..327e48a3 100644 --- a/internal/auth/auth.go +++ b/internal/auth/auth.go @@ -421,7 +421,7 @@ func (auth *Authentication) Auth( return } - cclog.Info("auth -> authentication failed") + cclog.Infof("auth -> authentication failed: no valid session or JWT for %s %s from %s", r.Method, r.URL.Path, r.RemoteAddr) onfailure(rw, r, errors.New("unauthorized (please login first)")) }) }