Merge pull request #523 from ClusterCockpit/hotfix

Hotfix
This commit is contained in:
Jan Eitzinger
2026-03-18 07:04:12 +01:00
committed by GitHub
11 changed files with 477 additions and 149 deletions

View File

@@ -1,6 +1,6 @@
TARGET = ./cc-backend TARGET = ./cc-backend
FRONTEND = ./web/frontend FRONTEND = ./web/frontend
VERSION = 1.5.1 VERSION = 1.5.2
GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development') GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development')
CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S") CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S")
LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}' LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}'

View File

@@ -1,16 +1,62 @@
# `cc-backend` version 1.5.1 # `cc-backend` version 1.5.2
Supports job archive version 3 and database version 11. Supports job archive version 3 and database version 11.
This is a bugfix release of `cc-backend`, the API backend and frontend This is a bugfix release of `cc-backend`, the API backend and frontend
implementation of ClusterCockpit. implementation of ClusterCockpit.
For release specific notes visit the [ClusterCockpit Documentation](https://clusterockpit.org/docs/release/). For release specific notes visit the [ClusterCockpit Documentation](https://clusterockpit.org/docs/release/).
If you are upgrading from v1.5.1 no database migration is required.
If you are upgrading from v1.5.0 you need to do another DB migration. This If you are upgrading from v1.5.0 you need to do another DB migration. This
should not take long. For optimal database performance after the migration it is should not take long. For optimal database performance after the migration it is
recommended to apply the new `optimize-db` flag, which runs the sqlite `ANALYZE` recommended to apply the new `optimize-db` flag, which runs the sqlite `ANALYZE`
and `VACUUM` commands. Depending on your database size (more then 40GB) the and `VACUUM` commands. Depending on your database size (more then 40GB) the
`VACUUM` may take up to 2h. `VACUUM` may take up to 2h.
## Changes in 1.5.2
### Bug fixes
- **Memory spike in parquet writer**: Fixed memory spikes when using the
metricstore move (archive) policy with the parquet writer. The writer now
processes data in a streaming fashion to avoid accumulating large allocations.
### Database performance
- **Reduced insert pressure**: Bulk insert operations (node state updates, user
and job cache syncs) now use explicit transactions and deferred inserts,
significantly reducing write contention on the SQLite database.
- **SyncJobs wrapped in transaction**: `SyncJobs` now runs inside a transaction
for better consistency and reduced lock contention.
- **Configurable busy timeout**: New `busy-timeout` configuration option for the
SQLite connection. This allows tuning how long the driver waits for a locked
database before returning an error, which improves resilience under concurrent
write load.
- **Increased default SQLite timeout**: The default SQLite connection timeout
has been raised to reduce spurious timeout errors under load.
### NATS API
- **Nodestate health checks in NATS API**: The NATS node state handler now
performs the same metric health checks as the REST API handler, including
per-subcluster health checks and `MonitoringStateFailed` fallback for nodes
without health data.
### Logging improvements
- **Better error context**: Several repository functions now include the calling
function name in error messages for easier diagnosis.
- **Reduced log noise**: `ErrNoRows` (no results found) is no longer logged as
an error in `scanRow`; common "no rows" paths are now silent.
- **Debug-level missing metrics**: Warning about missing metrics in the metric
store has been downgraded to debug level to reduce log noise in normal
operation.
- **Checkpoint archiving log**: Added an informational log message when the
metricstore checkpoint archiving process runs.
### Dependencies
- **cc-lib upgraded**: Updated to latest cc-lib version.
## Known issues ## Known issues
- The new dynamic memory management is not bullet proof yet across restarts. - The new dynamic memory management is not bullet proof yet across restarts.

2
go.mod
View File

@@ -9,7 +9,7 @@ tool (
require ( require (
github.com/99designs/gqlgen v0.17.88 github.com/99designs/gqlgen v0.17.88
github.com/ClusterCockpit/cc-lib/v2 v2.9.0 github.com/ClusterCockpit/cc-lib/v2 v2.9.1
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0
github.com/Masterminds/squirrel v1.5.4 github.com/Masterminds/squirrel v1.5.4
github.com/aws/aws-sdk-go-v2 v1.41.3 github.com/aws/aws-sdk-go-v2 v1.41.3

2
go.sum
View File

@@ -8,6 +8,8 @@ github.com/ClusterCockpit/cc-lib/v2 v2.8.2 h1:rCLZk8wz8yq8xBnBEdVKigvA2ngR8dPmHb
github.com/ClusterCockpit/cc-lib/v2 v2.8.2/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o= github.com/ClusterCockpit/cc-lib/v2 v2.8.2/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
github.com/ClusterCockpit/cc-lib/v2 v2.9.0 h1:mzUYakcjwb+UP5II4jOvr36rSYct90gXBbtUg+nvm9c= github.com/ClusterCockpit/cc-lib/v2 v2.9.0 h1:mzUYakcjwb+UP5II4jOvr36rSYct90gXBbtUg+nvm9c=
github.com/ClusterCockpit/cc-lib/v2 v2.9.0/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o= github.com/ClusterCockpit/cc-lib/v2 v2.9.0/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
github.com/ClusterCockpit/cc-lib/v2 v2.9.1 h1:eplKhXQyGAElBGCEGdmxwj7fLv26Op16uK0KxUePDak=
github.com/ClusterCockpit/cc-lib/v2 v2.9.1/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 h1:hIzxgTBWcmCIHtoDKDkSCsKCOCOwUC34sFsbD2wcW0Q= github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 h1:hIzxgTBWcmCIHtoDKDkSCsKCOCOwUC34sFsbD2wcW0Q=
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0/go.mod h1:y42qUu+YFmu5fdNuUAS4VbbIKxVjxCvbVqFdpdh8ahY= github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0/go.mod h1:y42qUu+YFmu5fdNuUAS4VbbIKxVjxCvbVqFdpdh8ahY=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=

View File

@@ -421,7 +421,7 @@ func (auth *Authentication) Auth(
return return
} }
cclog.Info("auth -> authentication failed") cclog.Infof("auth -> authentication failed: no valid session or JWT for %s %s from %s", r.Method, r.URL.Path, r.RemoteAddr)
onfailure(rw, r, errors.New("unauthorized (please login first)")) onfailure(rw, r, errors.New("unauthorized (please login first)"))
}) })
} }

View File

@@ -168,7 +168,12 @@ func deleteCheckpoints(checkpointsDir string, from int64) (int, error) {
// archiveCheckpoints archives checkpoint files to Parquet format. // archiveCheckpoints archives checkpoint files to Parquet format.
// Produces one Parquet file per cluster: <cleanupDir>/<cluster>/<timestamp>.parquet // Produces one Parquet file per cluster: <cleanupDir>/<cluster>/<timestamp>.parquet
// Each host's rows are written as a separate row group to avoid accumulating
// all data in memory at once.
func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, error) { func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, error) {
cclog.Info("[METRICSTORE]> start archiving checkpoints to parquet")
startTime := time.Now()
clusterEntries, err := os.ReadDir(checkpointsDir) clusterEntries, err := os.ReadDir(checkpointsDir)
if err != nil { if err != nil {
return 0, err return 0, err
@@ -187,7 +192,7 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
return totalFiles, err return totalFiles, err
} }
// Collect rows from all hosts in this cluster using worker pool // Stream per-host rows to parquet writer via worker pool
type hostResult struct { type hostResult struct {
rows []ParquetMetricRow rows []ParquetMetricRow
files []string // checkpoint filenames to delete after successful write files []string // checkpoint filenames to delete after successful write
@@ -235,32 +240,57 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
close(results) close(results)
}() }()
// Collect all rows and file info // Open streaming writer and write each host's rows as a row group
var allRows []ParquetMetricRow parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from))
var allResults []hostResult writer, err := newParquetArchiveWriter(parquetFile)
if err != nil {
// Drain results channel to unblock workers
for range results {
}
return totalFiles, fmt.Errorf("creating parquet writer for cluster %s: %w", cluster, err)
}
type deleteItem struct {
dir string
files []string
}
var toDelete []deleteItem
writeErr := error(nil)
for r := range results { for r := range results {
allRows = append(allRows, r.rows...) if writeErr == nil {
allResults = append(allResults, r) sortParquetRows(r.rows)
if err := writer.WriteHostRows(r.rows); err != nil {
writeErr = err
}
}
// Always track files for deletion (even if write failed, we still drain)
toDelete = append(toDelete, deleteItem{dir: r.dir, files: r.files})
}
if err := writer.Close(); err != nil && writeErr == nil {
writeErr = err
} }
if errs > 0 { if errs > 0 {
return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster) return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster)
} }
if len(allRows) == 0 { if writer.count == 0 {
// No data written — remove empty file
os.Remove(parquetFile)
continue continue
} }
// Write one Parquet file per cluster if writeErr != nil {
parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from)) os.Remove(parquetFile)
if err := writeParquetArchive(parquetFile, allRows); err != nil { return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, writeErr)
return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, err)
} }
// Delete archived checkpoint files // Delete archived checkpoint files
for _, result := range allResults { for _, item := range toDelete {
for _, file := range result.files { for _, file := range item.files {
filename := filepath.Join(result.dir, file) filename := filepath.Join(item.dir, file)
if err := os.Remove(filename); err != nil { if err := os.Remove(filename); err != nil {
cclog.Warnf("[METRICSTORE]> could not remove archived checkpoint %s: %v", filename, err) cclog.Warnf("[METRICSTORE]> could not remove archived checkpoint %s: %v", filename, err)
} else { } else {
@@ -270,8 +300,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
} }
cclog.Infof("[METRICSTORE]> archived %d rows from %d files for cluster %s to %s", cclog.Infof("[METRICSTORE]> archived %d rows from %d files for cluster %s to %s",
len(allRows), totalFiles, cluster, parquetFile) writer.count, totalFiles, cluster, parquetFile)
} }
cclog.Infof("[METRICSTORE]> archiving checkpoints completed in %s (%d files)", time.Since(startTime).Round(time.Millisecond), totalFiles)
return totalFiles, nil return totalFiles, nil
} }

View File

@@ -23,6 +23,7 @@ import (
"context" "context"
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"time" "time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
@@ -53,7 +54,7 @@ func ReceiveNats(ms *MemoryStore,
} }
var wg sync.WaitGroup var wg sync.WaitGroup
msgs := make(chan []byte, workers*2) msgs := make(chan []byte, max(workers*256, 8192))
for _, sc := range *Keys.Subscriptions { for _, sc := range *Keys.Subscriptions {
clusterTag := sc.ClusterTag clusterTag := sc.ClusterTag
@@ -123,6 +124,10 @@ func reorder(buf, prefix []byte) []byte {
} }
} }
// walDropped counts WAL messages dropped due to a full channel.
// Logged periodically; not critical since binary snapshots capture the data.
var walDropped atomic.Int64
// decodeState holds the per-call scratch buffers used by DecodeLine. // decodeState holds the per-call scratch buffers used by DecodeLine.
// Instances are recycled via decodeStatePool to avoid repeated allocations // Instances are recycled via decodeStatePool to avoid repeated allocations
// during high-throughput ingestion. // during high-throughput ingestion.
@@ -348,7 +353,7 @@ func DecodeLine(dec *lineprotocol.Decoder,
time := t.Unix() time := t.Unix()
if Keys.Checkpoints.FileFormat == "wal" { if Keys.Checkpoints.FileFormat == "wal" {
WALMessages <- &WALMessage{ msg := &WALMessage{
MetricName: string(st.metricBuf), MetricName: string(st.metricBuf),
Cluster: cluster, Cluster: cluster,
Node: host, Node: host,
@@ -356,6 +361,13 @@ func DecodeLine(dec *lineprotocol.Decoder,
Value: metric.Value, Value: metric.Value,
Timestamp: time, Timestamp: time,
} }
if !SendWALMessage(msg) {
// WAL shard channel full — metric is written to memory store but not WAL.
// Next binary snapshot will capture it.
if dropped := walDropped.Add(1); dropped%10000 == 1 {
cclog.Warnf("[METRICSTORE]> WAL shard channel full, dropped %d messages (data safe in memory, next snapshot will capture)", dropped)
}
}
} }
if err := ms.WriteToLevel(lvl, st.selector, time, []Metric{metric}); err != nil { if err := ms.WriteToLevel(lvl, st.selector, time, []Metric{metric}); err != nil {

View File

@@ -184,10 +184,11 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W
shutdownFuncMu.Unlock() shutdownFuncMu.Unlock()
if Keys.Subscriptions != nil { if Keys.Subscriptions != nil {
err = ReceiveNats(ms, 1, ctx) wg.Go(func() {
if err != nil { if err := ReceiveNats(ms, Keys.NumWorkers, ctx); err != nil {
cclog.Fatal(err) cclog.Fatal(err)
} }
})
} }
} }
@@ -277,7 +278,9 @@ func Shutdown() {
} }
if Keys.Checkpoints.FileFormat == "wal" { if Keys.Checkpoints.FileFormat == "wal" {
close(WALMessages) for _, ch := range walShardChs {
close(ch)
}
} }
cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir) cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir)

View File

@@ -12,6 +12,7 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"sort"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
pq "github.com/parquet-go/parquet-go" pq "github.com/parquet-go/parquet-go"
@@ -91,43 +92,81 @@ func parseScopeFromName(name string) (string, string) {
return name, "" return name, ""
} }
// writeParquetArchive writes rows to a Parquet file with Zstd compression. // parquetArchiveWriter supports incremental writes to a Parquet file.
func writeParquetArchive(filename string, rows []ParquetMetricRow) error { // Each call to WriteHostRows writes one row group (typically one host's data),
// avoiding accumulation of all rows in memory.
type parquetArchiveWriter struct {
writer *pq.GenericWriter[ParquetMetricRow]
bw *bufio.Writer
f *os.File
count int
}
// newParquetArchiveWriter creates a streaming Parquet writer with Zstd compression.
func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) {
if err := os.MkdirAll(filepath.Dir(filename), CheckpointDirPerms); err != nil { if err := os.MkdirAll(filepath.Dir(filename), CheckpointDirPerms); err != nil {
return fmt.Errorf("creating archive directory: %w", err) return nil, fmt.Errorf("creating archive directory: %w", err)
} }
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
if err != nil { if err != nil {
return fmt.Errorf("creating parquet file: %w", err) return nil, fmt.Errorf("creating parquet file: %w", err)
} }
defer f.Close()
bw := bufio.NewWriterSize(f, 1<<20) // 1MB write buffer bw := bufio.NewWriterSize(f, 1<<20) // 1MB write buffer
writer := pq.NewGenericWriter[ParquetMetricRow](bw, writer := pq.NewGenericWriter[ParquetMetricRow](bw,
pq.Compression(&pq.Zstd), pq.Compression(&pq.Zstd),
pq.SortingWriterConfig(pq.SortingColumns(
pq.Ascending("cluster"),
pq.Ascending("hostname"),
pq.Ascending("metric"),
pq.Ascending("timestamp"),
)),
) )
if _, err := writer.Write(rows); err != nil { return &parquetArchiveWriter{writer: writer, bw: bw, f: f}, nil
}
// WriteHostRows sorts rows by (metric, timestamp) in-place, writes them,
// and flushes to create a separate row group.
func (w *parquetArchiveWriter) WriteHostRows(rows []ParquetMetricRow) error {
sort.Slice(rows, func(i, j int) bool {
if rows[i].Metric != rows[j].Metric {
return rows[i].Metric < rows[j].Metric
}
return rows[i].Timestamp < rows[j].Timestamp
})
if _, err := w.writer.Write(rows); err != nil {
return fmt.Errorf("writing parquet rows: %w", err) return fmt.Errorf("writing parquet rows: %w", err)
} }
if err := writer.Close(); err != nil { if err := w.writer.Flush(); err != nil {
return fmt.Errorf("flushing parquet row group: %w", err)
}
w.count += len(rows)
return nil
}
// Close finalises the Parquet file (footer, buffered I/O, file handle).
func (w *parquetArchiveWriter) Close() error {
if err := w.writer.Close(); err != nil {
w.f.Close()
return fmt.Errorf("closing parquet writer: %w", err) return fmt.Errorf("closing parquet writer: %w", err)
} }
if err := bw.Flush(); err != nil { if err := w.bw.Flush(); err != nil {
w.f.Close()
return fmt.Errorf("flushing parquet file: %w", err) return fmt.Errorf("flushing parquet file: %w", err)
} }
return nil return w.f.Close()
}
// sortParquetRows sorts rows by (metric, timestamp) in-place.
func sortParquetRows(rows []ParquetMetricRow) {
sort.Slice(rows, func(i, j int) bool {
if rows[i].Metric != rows[j].Metric {
return rows[i].Metric < rows[j].Metric
}
return rows[i].Timestamp < rows[j].Timestamp
})
} }
// loadCheckpointFileFromDisk reads a JSON or binary checkpoint file and returns // loadCheckpointFileFromDisk reads a JSON or binary checkpoint file and returns
@@ -179,6 +218,19 @@ func loadCheckpointFileFromDisk(filename string) (*CheckpointFile, error) {
} }
} }
// estimateRowCount estimates the number of Parquet rows a CheckpointFile will produce.
// Used for pre-allocating the rows slice to avoid repeated append doubling.
func estimateRowCount(cf *CheckpointFile) int {
n := 0
for _, cm := range cf.Metrics {
n += len(cm.Data)
}
for _, child := range cf.Children {
n += estimateRowCount(child)
}
return n
}
// archiveCheckpointsToParquet reads checkpoint files for a host directory, // archiveCheckpointsToParquet reads checkpoint files for a host directory,
// converts them to Parquet rows. Returns the rows and filenames that were processed. // converts them to Parquet rows. Returns the rows and filenames that were processed.
func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]ParquetMetricRow, []string, error) { func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]ParquetMetricRow, []string, error) {
@@ -196,7 +248,13 @@ func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]Parqu
return nil, nil, nil return nil, nil, nil
} }
var rows []ParquetMetricRow // First pass: load checkpoints and estimate total rows for pre-allocation.
type loaded struct {
cf *CheckpointFile
filename string
}
var checkpoints []loaded
totalEstimate := 0
for _, checkpoint := range files { for _, checkpoint := range files {
filename := filepath.Join(dir, checkpoint) filename := filepath.Join(dir, checkpoint)
@@ -205,9 +263,21 @@ func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]Parqu
cclog.Warnf("[METRICSTORE]> skipping unreadable checkpoint %s: %v", filename, err) cclog.Warnf("[METRICSTORE]> skipping unreadable checkpoint %s: %v", filename, err)
continue continue
} }
totalEstimate += estimateRowCount(cf)
rows = flattenCheckpointFile(cf, cluster, host, "node", "", rows) checkpoints = append(checkpoints, loaded{cf: cf, filename: checkpoint})
} }
return rows, files, nil if len(checkpoints) == 0 {
return nil, nil, nil
}
rows := make([]ParquetMetricRow, 0, totalEstimate)
processedFiles := make([]string, 0, len(checkpoints))
for _, cp := range checkpoints {
rows = flattenCheckpointFile(cp.cf, cluster, host, "node", "", rows)
processedFiles = append(processedFiles, cp.filename)
}
return rows, processedFiles, nil
} }

View File

@@ -162,7 +162,15 @@ func TestParquetArchiveRoundtrip(t *testing.T) {
} }
parquetFile := filepath.Join(archiveDir, "testcluster", "1000.parquet") parquetFile := filepath.Join(archiveDir, "testcluster", "1000.parquet")
if err := writeParquetArchive(parquetFile, rows); err != nil { writer, err := newParquetArchiveWriter(parquetFile)
if err != nil {
t.Fatal(err)
}
sortParquetRows(rows)
if err := writer.WriteHostRows(rows); err != nil {
t.Fatal(err)
}
if err := writer.Close(); err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -61,6 +61,7 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"hash/crc32" "hash/crc32"
"hash/fnv"
"io" "io"
"math" "math"
"os" "os"
@@ -80,13 +81,15 @@ const (
snapFileMagic = uint32(0xCC5B0001) // Binary snapshot magic snapFileMagic = uint32(0xCC5B0001) // Binary snapshot magic
) )
// WALMessages is the channel for sending metric writes to the WAL staging goroutine. // walShardChs holds per-shard channels for WAL messages.
// Buffered to allow burst writes without blocking the metric ingestion path. // Initialized by WALStaging; nil when WAL is not active.
var WALMessages = make(chan *WALMessage, 4096) var walShardChs []chan *WALMessage
// walRotateCh is used by the checkpoint goroutine to request WAL file rotation // walShardRotateChs holds per-shard channels for WAL rotation requests.
// (close, delete, reopen) after a binary snapshot has been written. var walShardRotateChs []chan walRotateReq
var walRotateCh = make(chan walRotateReq, 256)
// walNumShards stores the number of shards (set during WALStaging init).
var walNumShards int
// WALMessage represents a single metric write to be appended to the WAL. // WALMessage represents a single metric write to be appended to the WAL.
// Cluster and Node are NOT stored in the WAL record (inferred from file path). // Cluster and Node are NOT stored in the WAL record (inferred from file path).
@@ -112,140 +115,218 @@ type walFileState struct {
w *bufio.Writer w *bufio.Writer
} }
// WALStaging starts a background goroutine that receives WALMessage items // walShardIndex computes which shard a message belongs to based on cluster+node.
// and appends binary WAL records to per-host current.wal files. // Uses FNV-1a hash for fast, well-distributed mapping.
// Also handles WAL rotation requests from the checkpoint goroutine. func walShardIndex(cluster, node string) int {
h := fnv.New32a()
h.Write([]byte(cluster))
h.Write([]byte{0})
h.Write([]byte(node))
return int(h.Sum32()) % walNumShards
}
// SendWALMessage routes a WAL message to the appropriate shard channel.
// Returns false if the channel is full (message dropped).
func SendWALMessage(msg *WALMessage) bool {
if walShardChs == nil {
return false
}
shard := walShardIndex(msg.Cluster, msg.Node)
select {
case walShardChs[shard] <- msg:
return true
default:
return false
}
}
// WALStaging starts N sharded background goroutines that receive WALMessage items
// and append binary WAL records to per-host current.wal files.
// Each shard owns a subset of hosts (determined by hash of cluster+node),
// parallelizing serialization and I/O while keeping per-host writes serial.
func WALStaging(wg *sync.WaitGroup, ctx context.Context) { func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
wg.Go(func() { if Keys.Checkpoints.FileFormat == "json" {
if Keys.Checkpoints.FileFormat == "json" { return
return }
}
hostFiles := make(map[string]*walFileState) walNumShards = max(Keys.NumWorkers, 1)
chBufSize := max(65536/walNumShards, 1024)
defer func() { walShardChs = make([]chan *WALMessage, walNumShards)
for _, ws := range hostFiles { walShardRotateChs = make([]chan walRotateReq, walNumShards)
if ws.f != nil {
ws.w.Flush() for i := range walNumShards {
ws.f.Close() walShardChs[i] = make(chan *WALMessage, chBufSize)
walShardRotateChs[i] = make(chan walRotateReq, 64)
}
for i := range walNumShards {
msgCh := walShardChs[i]
rotateCh := walShardRotateChs[i]
wg.Go(func() {
hostFiles := make(map[string]*walFileState)
defer func() {
for _, ws := range hostFiles {
if ws.f != nil {
ws.w.Flush()
ws.f.Close()
}
} }
} }()
}()
getOrOpenWAL := func(hostDir string) *walFileState { getOrOpenWAL := func(hostDir string) *walFileState {
ws, ok := hostFiles[hostDir] ws, ok := hostFiles[hostDir]
if ok { if ok {
return ws
}
if err := os.MkdirAll(hostDir, CheckpointDirPerms); err != nil {
cclog.Errorf("[METRICSTORE]> WAL: mkdir %s: %v", hostDir, err)
return nil
}
walPath := path.Join(hostDir, "current.wal")
f, err := os.OpenFile(walPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, CheckpointFilePerms)
if err != nil {
cclog.Errorf("[METRICSTORE]> WAL: open %s: %v", walPath, err)
return nil
}
w := bufio.NewWriter(f)
// Write file header magic if file is new (empty).
info, err := f.Stat()
if err == nil && info.Size() == 0 {
var hdr [4]byte
binary.LittleEndian.PutUint32(hdr[:], walFileMagic)
if _, err := w.Write(hdr[:]); err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write header %s: %v", walPath, err)
f.Close()
return nil
}
}
ws = &walFileState{f: f, w: w}
hostFiles[hostDir] = ws
return ws return ws
} }
if err := os.MkdirAll(hostDir, CheckpointDirPerms); err != nil { processMsg := func(msg *WALMessage) {
cclog.Errorf("[METRICSTORE]> WAL: mkdir %s: %v", hostDir, err) hostDir := path.Join(Keys.Checkpoints.RootDir, msg.Cluster, msg.Node)
return nil ws := getOrOpenWAL(hostDir)
} if ws == nil {
return
walPath := path.Join(hostDir, "current.wal") }
f, err := os.OpenFile(walPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, CheckpointFilePerms) if err := writeWALRecordDirect(ws.w, msg); err != nil {
if err != nil { cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
cclog.Errorf("[METRICSTORE]> WAL: open %s: %v", walPath, err)
return nil
}
w := bufio.NewWriter(f)
// Write file header magic if file is new (empty).
info, err := f.Stat()
if err == nil && info.Size() == 0 {
var hdr [4]byte
binary.LittleEndian.PutUint32(hdr[:], walFileMagic)
if _, err := w.Write(hdr[:]); err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write header %s: %v", walPath, err)
f.Close()
return nil
} }
} }
ws = &walFileState{f: f, w: w} processRotate := func(req walRotateReq) {
hostFiles[hostDir] = ws ws, ok := hostFiles[req.hostDir]
return ws if ok && ws.f != nil {
} ws.w.Flush()
ws.f.Close()
processMsg := func(msg *WALMessage) { walPath := path.Join(req.hostDir, "current.wal")
hostDir := path.Join(Keys.Checkpoints.RootDir, msg.Cluster, msg.Node) if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
ws := getOrOpenWAL(hostDir) cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err)
if ws == nil { }
return delete(hostFiles, req.hostDir)
} }
if err := writeWALRecord(ws.w, msg); err != nil { close(req.done)
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) }
}
} flushAll := func() {
for _, ws := range hostFiles {
processRotate := func(req walRotateReq) { if ws.f != nil {
ws, ok := hostFiles[req.hostDir] ws.w.Flush()
if ok && ws.f != nil { }
ws.w.Flush() }
ws.f.Close() }
walPath := path.Join(req.hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { drain := func() {
cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err) for {
select {
case msg, ok := <-msgCh:
if !ok {
return
}
processMsg(msg)
case req := <-rotateCh:
processRotate(req)
default:
flushAll()
return
}
} }
delete(hostFiles, req.hostDir)
} }
close(req.done)
}
drain := func() {
for { for {
select { select {
case msg, ok := <-WALMessages: case <-ctx.Done():
drain()
return
case msg, ok := <-msgCh:
if !ok { if !ok {
return return
} }
processMsg(msg) processMsg(msg)
case req := <-walRotateCh:
processRotate(req) // Drain up to 256 more messages without blocking to batch writes.
default: for range 256 {
// Flush all buffered writers after draining remaining messages. select {
for _, ws := range hostFiles { case msg, ok := <-msgCh:
if ws.f != nil { if !ok {
ws.w.Flush() return
}
processMsg(msg)
case req := <-rotateCh:
processRotate(req)
default:
goto flushed
} }
} }
return flushed:
flushAll()
case req := <-rotateCh:
processRotate(req)
} }
} }
} })
}
for {
select {
case <-ctx.Done():
drain()
return
case msg, ok := <-WALMessages:
if !ok {
return
}
processMsg(msg)
case req := <-walRotateCh:
processRotate(req)
}
}
})
} }
// RotateWALFiles sends rotation requests for the given host directories // RotateWALFiles sends rotation requests for the given host directories
// and blocks until all rotations complete. // and blocks until all rotations complete. Each request is routed to the
// shard that owns the host directory.
func RotateWALFiles(hostDirs []string) { func RotateWALFiles(hostDirs []string) {
if walShardRotateChs == nil {
return
}
dones := make([]chan struct{}, len(hostDirs)) dones := make([]chan struct{}, len(hostDirs))
for i, dir := range hostDirs { for i, dir := range hostDirs {
dones[i] = make(chan struct{}) dones[i] = make(chan struct{})
walRotateCh <- walRotateReq{hostDir: dir, done: dones[i]} // Extract cluster/node from hostDir to find the right shard.
// hostDir = rootDir/cluster/node
shard := walShardIndexFromDir(dir)
walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: dones[i]}
} }
for _, done := range dones { for _, done := range dones {
<-done <-done
} }
} }
// walShardIndexFromDir extracts cluster and node from a host directory path
// and returns the shard index. The path format is: rootDir/cluster/node
func walShardIndexFromDir(hostDir string) int {
// hostDir = .../cluster/node — extract last two path components
node := path.Base(hostDir)
cluster := path.Base(path.Dir(hostDir))
return walShardIndex(cluster, node)
}
// RotateWALFiles sends rotation requests for the given host directories // RotateWALFiles sends rotation requests for the given host directories
// and blocks until all rotations complete. // and blocks until all rotations complete.
func RotateWALFilesAfterShutdown(hostDirs []string) { func RotateWALFilesAfterShutdown(hostDirs []string) {
@@ -257,6 +338,81 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
} }
} }
// writeWALRecordDirect encodes a WAL record directly into the bufio.Writer,
// avoiding heap allocations by using a stack-allocated scratch buffer for
// the fixed-size header/trailer and computing CRC inline.
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
// Compute payload size.
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
for _, s := range msg.Selector {
payloadSize += 1 + len(s)
}
// Write magic + payload length (8 bytes header).
var hdr [8]byte
binary.LittleEndian.PutUint32(hdr[0:4], walRecordMagic)
binary.LittleEndian.PutUint32(hdr[4:8], uint32(payloadSize))
if _, err := w.Write(hdr[:]); err != nil {
return err
}
// We need to compute CRC over the payload as we write it.
crc := crc32.NewIEEE()
// Timestamp (8 bytes).
var scratch [8]byte
binary.LittleEndian.PutUint64(scratch[:8], uint64(msg.Timestamp))
crc.Write(scratch[:8])
if _, err := w.Write(scratch[:8]); err != nil {
return err
}
// Metric name length (2 bytes) + metric name.
binary.LittleEndian.PutUint16(scratch[:2], uint16(len(msg.MetricName)))
crc.Write(scratch[:2])
if _, err := w.Write(scratch[:2]); err != nil {
return err
}
nameBytes := []byte(msg.MetricName)
crc.Write(nameBytes)
if _, err := w.Write(nameBytes); err != nil {
return err
}
// Selector count (1 byte).
scratch[0] = byte(len(msg.Selector))
crc.Write(scratch[:1])
if _, err := w.Write(scratch[:1]); err != nil {
return err
}
// Selectors (1-byte length + bytes each).
for _, sel := range msg.Selector {
scratch[0] = byte(len(sel))
crc.Write(scratch[:1])
if _, err := w.Write(scratch[:1]); err != nil {
return err
}
selBytes := []byte(sel)
crc.Write(selBytes)
if _, err := w.Write(selBytes); err != nil {
return err
}
}
// Value (4 bytes, float32 bits).
binary.LittleEndian.PutUint32(scratch[:4], math.Float32bits(float32(msg.Value)))
crc.Write(scratch[:4])
if _, err := w.Write(scratch[:4]); err != nil {
return err
}
// CRC32 (4 bytes).
binary.LittleEndian.PutUint32(scratch[:4], crc.Sum32())
_, err := w.Write(scratch[:4])
return err
}
// buildWALPayload encodes a WALMessage into a binary payload (without magic/length/CRC). // buildWALPayload encodes a WALMessage into a binary payload (without magic/length/CRC).
func buildWALPayload(msg *WALMessage) []byte { func buildWALPayload(msg *WALMessage) []byte {
size := 8 + 2 + len(msg.MetricName) + 1 + 4 size := 8 + 2 + len(msg.MetricName) + 1 + 4