12 Commits

Author SHA1 Message Date
641dc0e3b8 Run gofumpt 2026-03-30 16:49:27 +02:00
b734c1a92a And another update
Entire-Checkpoint: 9bb66d18af6d
2026-03-30 16:48:12 +02:00
c5fe3c5cd9 Update golangci settings
Entire-Checkpoint: b9544ef2c54f
2026-03-30 16:46:30 +02:00
e2910b18b3 Fix golangci config
Entire-Checkpoint: 1a908bd95cfa
2026-03-30 16:29:15 +02:00
ed236ec539 Add Make targets for formatting and linting
Add configuration and document usage in README

Entire-Checkpoint: 53425877e242
2026-03-30 16:23:12 +02:00
82c514b11a Ease samesite cookie settings
Entire-Checkpoint: 2fe286e23a4a
2026-03-30 16:10:15 +02:00
66707bbf15 Update metricstore documentation
Entire-Checkpoint: 99f20c1edd90
2026-03-29 21:38:04 +02:00
fc47b12fed fix: Pause WAL writes during binary checkpoint to prevent message drops
WAL writes during checkpoint are redundant since the binary snapshot
captures all in-memory data. Pausing eliminates channel saturation
(1.4M+ dropped messages) caused by disk I/O contention between
checkpoint writes and WAL staging. Also removes direct WAL file
deletion in checkpoint workers that raced with the staging goroutine.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Entire-Checkpoint: 34d698f40bac
2026-03-29 11:13:39 +02:00
937984d11f fix: WAL rotation skipped for all nodes due to non-blocking send on small channel
RotateWALFiles used a non-blocking send (select/default) on rotation
channels buffered at 64. With thousands of nodes and few shards, the
channel fills instantly and nearly all hosts are skipped, leaving WAL
files unrotated indefinitely.

Replace with a blocking send using a shared 2-minute deadline so the
checkpoint goroutine waits for the staging goroutine to drain the
channel instead of immediately giving up.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Entire-Checkpoint: a1ec897216fa
2026-03-28 06:55:45 +01:00
3d99aec185 fix: Log viewer auto-refresh
Entire-Checkpoint: 0fc6e5388e08
2026-03-28 06:45:03 +01:00
280289185a Add checkpointReader to ReleaseNotes
Entire-Checkpoint: ea34ae75e21a
2026-03-28 06:28:07 +01:00
cc3d03bb5b fix: Unbound growth of wal files in case of checkpointing error
Entire-Checkpoint: 95a89a7127c5
2026-03-28 06:26:21 +01:00
17 changed files with 250 additions and 49 deletions

38
.golangci.yml Normal file
View File

@@ -0,0 +1,38 @@
version: "2"
run:
timeout: 5m
formatters:
enable:
- gofumpt # gofumpt formatter
settings:
gofumpt:
module-path: github.com/ClusterCockpit/cc-backend
linters:
enable:
- staticcheck # staticcheck = true
- unparam # unusedparams = true
- nilnil # nilness = true (catches nil returns of nil interface values)
- govet # base vet; nilness + unusedwrite via govet analyzers
settings:
staticcheck:
checks: ["ST1003"]
govet:
enable:
- nilness
- unusedwrite
exclusions:
paths:
- .git
- .vscode
- .idea
- node_modules
- internal/graph/generated # gqlgen generated code
- internal/api/docs\.go # swaggo generated code
- _test\.go # test files excluded from all linters

View File

@@ -36,7 +36,7 @@ SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \
$(wildcard $(FRONTEND)/src/header/*.svelte) \ $(wildcard $(FRONTEND)/src/header/*.svelte) \
$(wildcard $(FRONTEND)/src/job/*.svelte) $(wildcard $(FRONTEND)/src/job/*.svelte)
.PHONY: clean distclean test tags frontend swagger graphql $(TARGET) .PHONY: clean distclean fmt lint test tags frontend swagger graphql $(TARGET)
.NOTPARALLEL: .NOTPARALLEL:
@@ -75,6 +75,14 @@ test:
@go vet ./... @go vet ./...
@go test ./... @go test ./...
fmt:
$(info ===> FORMAT)
@gofumpt -l -w .
lint:
$(info ===> LINT)
@golangci-lint run ./...
tags: tags:
$(info ===> TAGS) $(info ===> TAGS)
@ctags -R @ctags -R

View File

@@ -100,6 +100,26 @@ the following targets:
frontend source files will result in a complete rebuild. frontend source files will result in a complete rebuild.
- `make clean`: Clean go build cache and remove binary. - `make clean`: Clean go build cache and remove binary.
- `make test`: Run the tests that are also run in the GitHub workflow setup. - `make test`: Run the tests that are also run in the GitHub workflow setup.
- `make fmt`: Format all Go source files using
[gofumpt](https://github.com/mvdan/gofumpt), a stricter superset of `gofmt`.
Requires `gofumpt` to be installed (see below).
- `make lint`: Run [golangci-lint](https://golangci-lint.run/) with the project
configuration in `.golangci.yml`. Requires `golangci-lint` to be installed
(see below).
### Installing development tools
`gofumpt` and `golangci-lint` are not part of the Go module and must be
installed separately:
```sh
# Formatter (gofumpt)
go install mvdan.cc/gofumpt@latest
# Linter (golangci-lint) — use the official install script to get a
# pre-built binary; installing via `go install` is not supported upstream.
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/HEAD/install.sh | sh -s -- -b $(go env GOPATH)/bin
```
A common workflow for setting up cc-backend from scratch is: A common workflow for setting up cc-backend from scratch is:

View File

@@ -19,6 +19,16 @@ This is also the default.
### Bug fixes ### Bug fixes
- **WAL not rotated on partial checkpoint failure**: When binary checkpointing
failed for some hosts, WAL files for successfully checkpointed hosts were not
rotated and the checkpoint timestamp was not advanced. Partial successes now
correctly advance the checkpoint and rotate WAL files for completed hosts.
- **Unbounded WAL file growth**: If binary checkpointing consistently failed for
a host, its `current.wal` file grew without limit until disk exhaustion. A new
`max-wal-size` configuration option (in the `checkpoints` block) allows setting
a per-host WAL size cap in bytes. When exceeded, the WAL is force-rotated.
Defaults to 0 (unlimited) for backward compatibility.
- **Doubleranged filter fixes**: Range filters now correctly handle zero as a - **Doubleranged filter fixes**: Range filters now correctly handle zero as a
boundary value. Improved validation and UI text for "more than equal" and boundary value. Improved validation and UI text for "more than equal" and
"less than equal" range selections. "less than equal" range selections.
@@ -50,6 +60,14 @@ This is also the default.
- **Explicit node state queries in node view**: Node health and scheduler state - **Explicit node state queries in node view**: Node health and scheduler state
are now fetched independently from metric data for fresher status information. are now fetched independently from metric data for fresher status information.
### New tools
- **binaryCheckpointReader**: New utility tool (`tools/binaryCheckpointReader`)
that reads `.wal` or `.bin` checkpoint files produced by the metricstore
WAL/snapshot system and dumps their contents to a human-readable `.txt` file.
Useful for debugging and inspecting checkpoint data. Usage:
`go run ./tools/binaryCheckpointReader <file.wal|file.bin>`
### Logging improvements ### Logging improvements
- **Reduced tagger log noise**: Missing metrics and expression evaluation errors - **Reduced tagger log noise**: Missing metrics and expression evaluation errors

49
gopls.json Normal file
View File

@@ -0,0 +1,49 @@
{
"gofumpt": true,
"codelenses": {
"gc_details": false,
"generate": true,
"regenerate_cgo": true,
"run_govulncheck": true,
"test": true,
"tidy": true,
"upgrade_dependency": true,
"vendor": true
},
"hints": {
"assignVariableTypes": true,
"compositeLiteralFields": true,
"compositeLiteralTypes": true,
"constantValues": true,
"functionTypeParameters": true,
"parameterNames": true,
"rangeVariableTypes": true
},
"analyses": {
"nilness": true,
"unusedparams": true,
"unusedwrite": true,
"useany": true
},
"usePlaceholders": true,
"completeUnimported": true,
"staticcheck": true,
"directoryFilters": [
"-.git",
"-.vscode",
"-.idea",
"-.vscode-test",
"-node_modules",
"-web/frontend",
"-web/templates",
"-var",
"-api",
"-configs",
"-init",
"-internal/repository/migrations",
"-internal/repository/testdata",
"-internal/importer/testdata",
"-pkg/archive/testdata"
],
"semanticTokens": true
}

View File

@@ -164,12 +164,17 @@ func (auth *Authentication) AuthViaSession(
return nil, errors.New("invalid session data") return nil, errors.New("invalid session data")
} }
authSourceInt, ok := session.Values["authSource"].(int)
if !ok {
authSourceInt = int(schema.AuthViaLocalPassword)
}
return &schema.User{ return &schema.User{
Username: username, Username: username,
Projects: projects, Projects: projects,
Roles: roles, Roles: roles,
AuthType: schema.AuthSession, AuthType: schema.AuthSession,
AuthSource: -1, AuthSource: schema.AuthSource(authSourceInt),
}, nil }, nil
} }
@@ -319,10 +324,11 @@ func (auth *Authentication) SaveSession(rw http.ResponseWriter, r *http.Request,
} }
session.Options.Secure = false session.Options.Secure = false
} }
session.Options.SameSite = http.SameSiteStrictMode session.Options.SameSite = http.SameSiteLaxMode
session.Values["username"] = user.Username session.Values["username"] = user.Username
session.Values["projects"] = user.Projects session.Values["projects"] = user.Projects
session.Values["roles"] = user.Roles session.Values["roles"] = user.Roles
session.Values["authSource"] = int(user.AuthSource)
if err := auth.sessionStore.Save(r, rw, session); err != nil { if err := auth.sessionStore.Save(r, rw, session); err != nil {
cclog.Warnf("session save failed: %s", err.Error()) cclog.Warnf("session save failed: %s", err.Error())
http.Error(rw, err.Error(), http.StatusInternalServerError) http.Error(rw, err.Error(), http.StatusInternalServerError)

View File

@@ -1072,10 +1072,12 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
// SubCluster returns generated.SubClusterResolver implementation. // SubCluster returns generated.SubClusterResolver implementation.
func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} } func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} }
type clusterResolver struct{ *Resolver } type (
type jobResolver struct{ *Resolver } clusterResolver struct{ *Resolver }
type metricValueResolver struct{ *Resolver } jobResolver struct{ *Resolver }
type mutationResolver struct{ *Resolver } metricValueResolver struct{ *Resolver }
type nodeResolver struct{ *Resolver } mutationResolver struct{ *Resolver }
type queryResolver struct{ *Resolver } nodeResolver struct{ *Resolver }
type subClusterResolver struct{ *Resolver } queryResolver struct{ *Resolver }
subClusterResolver struct{ *Resolver }
)

View File

@@ -236,4 +236,3 @@ func (ccms *CCMetricStore) buildNodeQueries(
return queries, assignedScope, nil return queries, assignedScope, nil
} }

View File

@@ -3,6 +3,12 @@
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
// This file implements the cleanup (archiving or deletion) of old checkpoint files.
//
// The CleanUp worker runs on a timer equal to RetentionInMemory. In "archive" mode
// it converts checkpoint files older than the retention window into per-cluster
// Parquet files and then deletes the originals. In "delete" mode it simply removes
// old checkpoint files.
package metricstore package metricstore
import ( import (
@@ -19,8 +25,12 @@ import (
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
) )
// Worker for either Archiving or Deleting files // CleanUp starts a background worker that periodically removes or archives
// checkpoint files older than the configured retention window.
//
// In "archive" mode, old checkpoint files are converted to Parquet and stored
// under Keys.Cleanup.RootDir. In "delete" mode they are simply removed.
// The cleanup interval equals Keys.RetentionInMemory.
func CleanUp(wg *sync.WaitGroup, ctx context.Context) { func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
if Keys.Cleanup.Mode == "archive" { if Keys.Cleanup.Mode == "archive" {
cclog.Info("[METRICSTORE]> enable archive cleanup to parquet") cclog.Info("[METRICSTORE]> enable archive cleanup to parquet")

View File

@@ -86,11 +86,12 @@ var (
// Checkpointing starts a background worker that periodically saves metric data to disk. // Checkpointing starts a background worker that periodically saves metric data to disk.
// //
// Checkpoints are written every 12 hours (hardcoded). // The checkpoint interval is read from Keys.CheckpointInterval (default: 12 hours).
// //
// Format behaviour: // Format behaviour:
// - "json": Periodic checkpointing every checkpointInterval // - "json": Writes a JSON snapshot file per host every interval
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval // - "wal": Writes a binary snapshot file per host every interval, then rotates
// the current.wal files for all successfully checkpointed hosts
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
lastCheckpointMu.Lock() lastCheckpointMu.Lock()
lastCheckpoint = time.Now() lastCheckpoint = time.Now()
@@ -99,7 +100,6 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore() ms := GetMemoryStore()
wg.Go(func() { wg.Go(func() {
d := 12 * time.Hour // default checkpoint interval d := 12 * time.Hour // default checkpoint interval
if Keys.CheckpointInterval != "" { if Keys.CheckpointInterval != "" {
parsed, err := time.ParseDuration(Keys.CheckpointInterval) parsed, err := time.ParseDuration(Keys.CheckpointInterval)
@@ -126,10 +126,15 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339)) cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339))
if Keys.Checkpoints.FileFormat == "wal" { if Keys.Checkpoints.FileFormat == "wal" {
// Pause WAL writes: the binary snapshot captures all in-memory
// data, so WAL records written during checkpoint are redundant
// and would be deleted during rotation anyway.
walCheckpointActive.Store(true)
n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix()) n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
if err != nil { if err != nil {
cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error()) cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error())
} else { }
if n > 0 {
cclog.Infof("[METRICSTORE]> done: %d binary snapshot files created", n) cclog.Infof("[METRICSTORE]> done: %d binary snapshot files created", n)
lastCheckpointMu.Lock() lastCheckpointMu.Lock()
lastCheckpoint = now lastCheckpoint = now
@@ -137,6 +142,8 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
// Rotate WAL files for successfully checkpointed hosts. // Rotate WAL files for successfully checkpointed hosts.
RotateWALFiles(hostDirs) RotateWALFiles(hostDirs)
} }
walCheckpointActive.Store(false)
walDropped.Store(0)
} else { } else {
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix()) n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
if err != nil { if err != nil {

View File

@@ -61,9 +61,12 @@ const (
// Fields: // Fields:
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal" // - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
// - RootDir: Filesystem path for checkpoint files (created if missing) // - RootDir: Filesystem path for checkpoint files (created if missing)
// - MaxWALSize: Maximum size in bytes for a single host's WAL file; 0 = unlimited (default).
// When exceeded the WAL is force-rotated to prevent unbounded disk growth.
type Checkpoints struct { type Checkpoints struct {
FileFormat string `json:"file-format"` FileFormat string `json:"file-format"`
RootDir string `json:"directory"` RootDir string `json:"directory"`
MaxWALSize int64 `json:"max-wal-size,omitempty"`
} }
// Debug provides development and profiling options. // Debug provides development and profiling options.

View File

@@ -24,6 +24,11 @@ const configSchema = `{
"directory": { "directory": {
"description": "Path in which the checkpointed files should be placed.", "description": "Path in which the checkpointed files should be placed.",
"type": "string" "type": "string"
},
"max-wal-size": {
"description": "Maximum size in bytes for a single host's WAL file. When exceeded the WAL is force-rotated to prevent unbounded disk growth. Only applies when file-format is 'wal'. 0 means unlimited (default).",
"type": "integer",
"minimum": 0
} }
} }
}, },

View File

@@ -8,7 +8,7 @@
// //
// The package organizes metrics in a tree structure (cluster → host → component) and // The package organizes metrics in a tree structure (cluster → host → component) and
// provides concurrent read/write access to metric data with configurable aggregation strategies. // provides concurrent read/write access to metric data with configurable aggregation strategies.
// Background goroutines handle periodic checkpointing (JSON or Avro format), archiving old data, // Background goroutines handle periodic checkpointing (JSON or WAL/binary format), archiving old data,
// and enforcing retention policies. // and enforcing retention policies.
// //
// Key features: // Key features:
@@ -16,7 +16,7 @@
// - Hierarchical data organization (selectors) // - Hierarchical data organization (selectors)
// - Concurrent checkpoint/archive workers // - Concurrent checkpoint/archive workers
// - Support for sum and average aggregation // - Support for sum and average aggregation
// - NATS integration for metric ingestion // - NATS integration for metric ingestion via InfluxDB line protocol
package metricstore package metricstore
import ( import (
@@ -113,7 +113,8 @@ type MemoryStore struct {
// 6. Optionally subscribes to NATS for real-time metric ingestion // 6. Optionally subscribes to NATS for real-time metric ingestion
// //
// Parameters: // Parameters:
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig) // - rawConfig: JSON configuration for the metric store (see MetricStoreConfig); may be nil to use defaults
// - metrics: Map of metric names to their configurations (frequency and aggregation strategy)
// - wg: WaitGroup that will be incremented for each background goroutine started // - wg: WaitGroup that will be incremented for each background goroutine started
// //
// The function will call cclog.Fatal on critical errors during initialization. // The function will call cclog.Fatal on critical errors during initialization.

View File

@@ -99,6 +99,11 @@ var walStagingWg sync.WaitGroup
// SendWALMessage from sending on a closed channel (which panics in Go). // SendWALMessage from sending on a closed channel (which panics in Go).
var walShuttingDown atomic.Bool var walShuttingDown atomic.Bool
// walCheckpointActive is set during binary checkpoint writes.
// While active, SendWALMessage skips sending (returns true) because the
// snapshot captures all in-memory data, making WAL writes redundant.
var walCheckpointActive atomic.Bool
// WALMessage represents a single metric write to be appended to the WAL. // WALMessage represents a single metric write to be appended to the WAL.
// Cluster and Node are NOT stored in the WAL record (inferred from file path). // Cluster and Node are NOT stored in the WAL record (inferred from file path).
type WALMessage struct { type WALMessage struct {
@@ -122,6 +127,7 @@ type walFileState struct {
f *os.File f *os.File
w *bufio.Writer w *bufio.Writer
dirty bool dirty bool
size int64 // approximate bytes written (tracked from open + writes)
} }
// walFlushInterval controls how often dirty WAL files are flushed to disk. // walFlushInterval controls how often dirty WAL files are flushed to disk.
@@ -145,6 +151,9 @@ func SendWALMessage(msg *WALMessage) bool {
if walShardChs == nil || walShuttingDown.Load() { if walShardChs == nil || walShuttingDown.Load() {
return false return false
} }
if walCheckpointActive.Load() {
return true // Data safe in memory; snapshot will capture it
}
shard := walShardIndex(msg.Cluster, msg.Node) shard := walShardIndex(msg.Cluster, msg.Node)
select { select {
case walShardChs[shard] <- msg: case walShardChs[shard] <- msg:
@@ -214,7 +223,11 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
// Write file header magic if file is new (empty). // Write file header magic if file is new (empty).
info, err := f.Stat() info, err := f.Stat()
if err == nil && info.Size() == 0 { var fileSize int64
if err == nil {
fileSize = info.Size()
}
if err == nil && fileSize == 0 {
var hdr [4]byte var hdr [4]byte
binary.LittleEndian.PutUint32(hdr[:], walFileMagic) binary.LittleEndian.PutUint32(hdr[:], walFileMagic)
if _, err := w.Write(hdr[:]); err != nil { if _, err := w.Write(hdr[:]); err != nil {
@@ -222,9 +235,10 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
f.Close() f.Close()
return nil return nil
} }
fileSize = 4
} }
ws = &walFileState{f: f, w: w} ws = &walFileState{f: f, w: w, size: fileSize}
hostFiles[hostDir] = ws hostFiles[hostDir] = ws
return ws return ws
} }
@@ -235,9 +249,30 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
if ws == nil { if ws == nil {
return return
} }
if err := writeWALRecordDirect(ws.w, msg); err != nil {
// Enforce max WAL size: force-rotate before writing if limit is exceeded.
// The in-memory store still holds the data; only crash-recovery coverage is lost.
if maxSize := Keys.Checkpoints.MaxWALSize; maxSize > 0 && ws.size >= maxSize {
cclog.Warnf("[METRICSTORE]> WAL: force-rotating %s (size %d >= limit %d)",
hostDir, ws.size, maxSize)
ws.w.Flush()
ws.f.Close()
walPath := path.Join(hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err)
}
delete(hostFiles, hostDir)
ws = getOrOpenWAL(hostDir)
if ws == nil {
return
}
}
n, err := writeWALRecordDirect(ws.w, msg)
if err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
} }
ws.size += int64(n)
ws.dirty = true ws.dirty = true
} }
@@ -331,6 +366,7 @@ func RotateWALFiles(hostDirs []string) {
if walShardRotateChs == nil || walShuttingDown.Load() { if walShardRotateChs == nil || walShuttingDown.Load() {
return return
} }
deadline := time.After(2 * time.Minute)
dones := make([]chan struct{}, 0, len(hostDirs)) dones := make([]chan struct{}, 0, len(hostDirs))
for _, dir := range hostDirs { for _, dir := range hostDirs {
done := make(chan struct{}) done := make(chan struct{})
@@ -338,16 +374,18 @@ func RotateWALFiles(hostDirs []string) {
select { select {
case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}: case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}:
dones = append(dones, done) dones = append(dones, done)
default: case <-deadline:
// Channel full or goroutine not consuming — skip this host. cclog.Warnf("[METRICSTORE]> WAL rotation send timed out, %d of %d hosts remaining",
cclog.Warnf("[METRICSTORE]> WAL rotation skipped for %s (channel full)", dir) len(hostDirs)-len(dones), len(hostDirs))
goto waitDones
} }
} }
waitDones:
for _, done := range dones { for _, done := range dones {
select { select {
case <-done: case <-done:
case <-time.After(30 * time.Second): case <-time.After(30 * time.Second):
cclog.Warn("[METRICSTORE]> WAL rotation timed out, continuing") cclog.Warn("[METRICSTORE]> WAL rotation completion timed out, continuing")
return return
} }
} }
@@ -362,8 +400,9 @@ func walShardIndexFromDir(hostDir string) int {
return walShardIndex(cluster, node) return walShardIndex(cluster, node)
} }
// RotateWALFiles sends rotation requests for the given host directories // RotateWALFilesAfterShutdown directly removes current.wal files for the given
// and blocks until all rotations complete. // host directories. Used after shutdown, when WALStaging goroutines have already
// exited and the channel-based RotateWALFiles is no longer safe to call.
func RotateWALFilesAfterShutdown(hostDirs []string) { func RotateWALFilesAfterShutdown(hostDirs []string) {
for _, dir := range hostDirs { for _, dir := range hostDirs {
walPath := path.Join(dir, "current.wal") walPath := path.Join(dir, "current.wal")
@@ -376,7 +415,8 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
// writeWALRecordDirect encodes a WAL record into a contiguous buffer first, // writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
// then writes it to the bufio.Writer in a single call. This prevents partial // then writes it to the bufio.Writer in a single call. This prevents partial
// records in the write buffer if a write error occurs mid-record (e.g. disk full). // records in the write buffer if a write error occurs mid-record (e.g. disk full).
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error { // Returns the number of bytes written and any error.
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) (int, error) {
// Compute payload and total record size. // Compute payload and total record size.
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4 payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
for _, s := range msg.Selector { for _, s := range msg.Selector {
@@ -430,8 +470,8 @@ func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
binary.LittleEndian.PutUint32(buf[p:p+4], crc) binary.LittleEndian.PutUint32(buf[p:p+4], crc)
// Single atomic write to the buffered writer. // Single atomic write to the buffered writer.
_, err := w.Write(buf) n, err := w.Write(buf)
return err return n, err
} }
// readWALRecord reads one WAL record from the reader. // readWALRecord reads one WAL record from the reader.
@@ -696,11 +736,6 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
atomic.AddInt32(&errs, 1) atomic.AddInt32(&errs, 1)
} else { } else {
atomic.AddInt32(&n, 1) atomic.AddInt32(&n, 1)
// Delete WAL immediately after successful snapshot.
walPath := path.Join(wi.hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err)
}
successMu.Lock() successMu.Lock()
successDirs = append(successDirs, wi.hostDir) successDirs = append(successDirs, wi.hostDir)
successMu.Unlock() successMu.Unlock()

View File

@@ -57,7 +57,7 @@
let entries = $state([]); let entries = $state([]);
let loading = $state(false); let loading = $state(false);
let error = $state(null); let error = $state(null);
let timer = $state(null); let timer = null;
function levelColor(priority) { function levelColor(priority) {
if (priority <= 2) return "danger"; if (priority <= 2) return "danger";