12 Commits

Author SHA1 Message Date
641dc0e3b8 Run gofumpt 2026-03-30 16:49:27 +02:00
b734c1a92a And another update
Entire-Checkpoint: 9bb66d18af6d
2026-03-30 16:48:12 +02:00
c5fe3c5cd9 Update golangci settings
Entire-Checkpoint: b9544ef2c54f
2026-03-30 16:46:30 +02:00
e2910b18b3 Fix golangci config
Entire-Checkpoint: 1a908bd95cfa
2026-03-30 16:29:15 +02:00
ed236ec539 Add Make targets for formatting and linting
Add configuration and document usage in README

Entire-Checkpoint: 53425877e242
2026-03-30 16:23:12 +02:00
82c514b11a Ease samesite cookie settings
Entire-Checkpoint: 2fe286e23a4a
2026-03-30 16:10:15 +02:00
66707bbf15 Update metricstore documentation
Entire-Checkpoint: 99f20c1edd90
2026-03-29 21:38:04 +02:00
fc47b12fed fix: Pause WAL writes during binary checkpoint to prevent message drops
WAL writes during checkpoint are redundant since the binary snapshot
captures all in-memory data. Pausing eliminates channel saturation
(1.4M+ dropped messages) caused by disk I/O contention between
checkpoint writes and WAL staging. Also removes direct WAL file
deletion in checkpoint workers that raced with the staging goroutine.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Entire-Checkpoint: 34d698f40bac
2026-03-29 11:13:39 +02:00
937984d11f fix: WAL rotation skipped for all nodes due to non-blocking send on small channel
RotateWALFiles used a non-blocking send (select/default) on rotation
channels buffered at 64. With thousands of nodes and few shards, the
channel fills instantly and nearly all hosts are skipped, leaving WAL
files unrotated indefinitely.

Replace with a blocking send using a shared 2-minute deadline so the
checkpoint goroutine waits for the staging goroutine to drain the
channel instead of immediately giving up.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Entire-Checkpoint: a1ec897216fa
2026-03-28 06:55:45 +01:00
3d99aec185 fix: Log viewer auto-refresh
Entire-Checkpoint: 0fc6e5388e08
2026-03-28 06:45:03 +01:00
280289185a Add checkpointReader to ReleaseNotes
Entire-Checkpoint: ea34ae75e21a
2026-03-28 06:28:07 +01:00
cc3d03bb5b fix: Unbound growth of wal files in case of checkpointing error
Entire-Checkpoint: 95a89a7127c5
2026-03-28 06:26:21 +01:00
17 changed files with 250 additions and 49 deletions

38
.golangci.yml Normal file
View File

@@ -0,0 +1,38 @@
version: "2"
run:
timeout: 5m
formatters:
enable:
- gofumpt # gofumpt formatter
settings:
gofumpt:
module-path: github.com/ClusterCockpit/cc-backend
linters:
enable:
- staticcheck # staticcheck = true
- unparam # unusedparams = true
- nilnil # nilness = true (catches nil returns of nil interface values)
- govet # base vet; nilness + unusedwrite via govet analyzers
settings:
staticcheck:
checks: ["ST1003"]
govet:
enable:
- nilness
- unusedwrite
exclusions:
paths:
- .git
- .vscode
- .idea
- node_modules
- internal/graph/generated # gqlgen generated code
- internal/api/docs\.go # swaggo generated code
- _test\.go # test files excluded from all linters

View File

@@ -36,7 +36,7 @@ SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \
$(wildcard $(FRONTEND)/src/header/*.svelte) \
$(wildcard $(FRONTEND)/src/job/*.svelte)
.PHONY: clean distclean test tags frontend swagger graphql $(TARGET)
.PHONY: clean distclean fmt lint test tags frontend swagger graphql $(TARGET)
.NOTPARALLEL:
@@ -75,6 +75,14 @@ test:
@go vet ./...
@go test ./...
fmt:
$(info ===> FORMAT)
@gofumpt -l -w .
lint:
$(info ===> LINT)
@golangci-lint run ./...
tags:
$(info ===> TAGS)
@ctags -R

View File

@@ -100,6 +100,26 @@ the following targets:
frontend source files will result in a complete rebuild.
- `make clean`: Clean go build cache and remove binary.
- `make test`: Run the tests that are also run in the GitHub workflow setup.
- `make fmt`: Format all Go source files using
[gofumpt](https://github.com/mvdan/gofumpt), a stricter superset of `gofmt`.
Requires `gofumpt` to be installed (see below).
- `make lint`: Run [golangci-lint](https://golangci-lint.run/) with the project
configuration in `.golangci.yml`. Requires `golangci-lint` to be installed
(see below).
### Installing development tools
`gofumpt` and `golangci-lint` are not part of the Go module and must be
installed separately:
```sh
# Formatter (gofumpt)
go install mvdan.cc/gofumpt@latest
# Linter (golangci-lint) — use the official install script to get a
# pre-built binary; installing via `go install` is not supported upstream.
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/HEAD/install.sh | sh -s -- -b $(go env GOPATH)/bin
```
A common workflow for setting up cc-backend from scratch is:

View File

@@ -19,6 +19,16 @@ This is also the default.
### Bug fixes
- **WAL not rotated on partial checkpoint failure**: When binary checkpointing
failed for some hosts, WAL files for successfully checkpointed hosts were not
rotated and the checkpoint timestamp was not advanced. Partial successes now
correctly advance the checkpoint and rotate WAL files for completed hosts.
- **Unbounded WAL file growth**: If binary checkpointing consistently failed for
a host, its `current.wal` file grew without limit until disk exhaustion. A new
`max-wal-size` configuration option (in the `checkpoints` block) allows setting
a per-host WAL size cap in bytes. When exceeded, the WAL is force-rotated.
Defaults to 0 (unlimited) for backward compatibility.
- **Doubleranged filter fixes**: Range filters now correctly handle zero as a
boundary value. Improved validation and UI text for "more than equal" and
"less than equal" range selections.
@@ -50,6 +60,14 @@ This is also the default.
- **Explicit node state queries in node view**: Node health and scheduler state
are now fetched independently from metric data for fresher status information.
### New tools
- **binaryCheckpointReader**: New utility tool (`tools/binaryCheckpointReader`)
that reads `.wal` or `.bin` checkpoint files produced by the metricstore
WAL/snapshot system and dumps their contents to a human-readable `.txt` file.
Useful for debugging and inspecting checkpoint data. Usage:
`go run ./tools/binaryCheckpointReader <file.wal|file.bin>`
### Logging improvements
- **Reduced tagger log noise**: Missing metrics and expression evaluation errors

49
gopls.json Normal file
View File

@@ -0,0 +1,49 @@
{
"gofumpt": true,
"codelenses": {
"gc_details": false,
"generate": true,
"regenerate_cgo": true,
"run_govulncheck": true,
"test": true,
"tidy": true,
"upgrade_dependency": true,
"vendor": true
},
"hints": {
"assignVariableTypes": true,
"compositeLiteralFields": true,
"compositeLiteralTypes": true,
"constantValues": true,
"functionTypeParameters": true,
"parameterNames": true,
"rangeVariableTypes": true
},
"analyses": {
"nilness": true,
"unusedparams": true,
"unusedwrite": true,
"useany": true
},
"usePlaceholders": true,
"completeUnimported": true,
"staticcheck": true,
"directoryFilters": [
"-.git",
"-.vscode",
"-.idea",
"-.vscode-test",
"-node_modules",
"-web/frontend",
"-web/templates",
"-var",
"-api",
"-configs",
"-init",
"-internal/repository/migrations",
"-internal/repository/testdata",
"-internal/importer/testdata",
"-pkg/archive/testdata"
],
"semanticTokens": true
}

View File

@@ -164,12 +164,17 @@ func (auth *Authentication) AuthViaSession(
return nil, errors.New("invalid session data")
}
authSourceInt, ok := session.Values["authSource"].(int)
if !ok {
authSourceInt = int(schema.AuthViaLocalPassword)
}
return &schema.User{
Username: username,
Projects: projects,
Roles: roles,
AuthType: schema.AuthSession,
AuthSource: -1,
AuthSource: schema.AuthSource(authSourceInt),
}, nil
}
@@ -319,10 +324,11 @@ func (auth *Authentication) SaveSession(rw http.ResponseWriter, r *http.Request,
}
session.Options.Secure = false
}
session.Options.SameSite = http.SameSiteStrictMode
session.Options.SameSite = http.SameSiteLaxMode
session.Values["username"] = user.Username
session.Values["projects"] = user.Projects
session.Values["roles"] = user.Roles
session.Values["authSource"] = int(user.AuthSource)
if err := auth.sessionStore.Save(r, rw, session); err != nil {
cclog.Warnf("session save failed: %s", err.Error())
http.Error(rw, err.Error(), http.StatusInternalServerError)

View File

@@ -1072,10 +1072,12 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
// SubCluster returns generated.SubClusterResolver implementation.
func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} }
type clusterResolver struct{ *Resolver }
type jobResolver struct{ *Resolver }
type metricValueResolver struct{ *Resolver }
type mutationResolver struct{ *Resolver }
type nodeResolver struct{ *Resolver }
type queryResolver struct{ *Resolver }
type subClusterResolver struct{ *Resolver }
type (
clusterResolver struct{ *Resolver }
jobResolver struct{ *Resolver }
metricValueResolver struct{ *Resolver }
mutationResolver struct{ *Resolver }
nodeResolver struct{ *Resolver }
queryResolver struct{ *Resolver }
subClusterResolver struct{ *Resolver }
)

View File

@@ -236,4 +236,3 @@ func (ccms *CCMetricStore) buildNodeQueries(
return queries, assignedScope, nil
}

View File

@@ -63,10 +63,10 @@ func DefaultConfig() *RepositoryConfig {
MaxIdleConnections: 4,
ConnectionMaxLifetime: time.Hour,
ConnectionMaxIdleTime: 10 * time.Minute,
MinRunningJobDuration: 600, // 10 minutes
DbCacheSizeMB: 2048, // 2GB per connection
DbSoftHeapLimitMB: 16384, // 16GB process-wide
BusyTimeoutMs: 60000, // 60 seconds
MinRunningJobDuration: 600, // 10 minutes
DbCacheSizeMB: 2048, // 2GB per connection
DbSoftHeapLimitMB: 16384, // 16GB process-wide
BusyTimeoutMs: 60000, // 60 seconds
}
}

View File

@@ -133,10 +133,10 @@ func (pt *prefixedTarget) WriteFile(name string, data []byte) error {
// ClusterAwareParquetWriter organizes Parquet output by cluster.
// Each cluster gets its own subdirectory with a cluster.json config file.
type ClusterAwareParquetWriter struct {
target ParquetTarget
maxSizeMB int
writers map[string]*ParquetWriter
clusterCfgs map[string]*schema.Cluster
target ParquetTarget
maxSizeMB int
writers map[string]*ParquetWriter
clusterCfgs map[string]*schema.Cluster
}
// NewClusterAwareParquetWriter creates a writer that routes jobs to per-cluster ParquetWriters.

View File

@@ -3,6 +3,12 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
// This file implements the cleanup (archiving or deletion) of old checkpoint files.
//
// The CleanUp worker runs on a timer equal to RetentionInMemory. In "archive" mode
// it converts checkpoint files older than the retention window into per-cluster
// Parquet files and then deletes the originals. In "delete" mode it simply removes
// old checkpoint files.
package metricstore
import (
@@ -19,8 +25,12 @@ import (
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
)
// Worker for either Archiving or Deleting files
// CleanUp starts a background worker that periodically removes or archives
// checkpoint files older than the configured retention window.
//
// In "archive" mode, old checkpoint files are converted to Parquet and stored
// under Keys.Cleanup.RootDir. In "delete" mode they are simply removed.
// The cleanup interval equals Keys.RetentionInMemory.
func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
if Keys.Cleanup.Mode == "archive" {
cclog.Info("[METRICSTORE]> enable archive cleanup to parquet")

View File

@@ -86,11 +86,12 @@ var (
// Checkpointing starts a background worker that periodically saves metric data to disk.
//
// Checkpoints are written every 12 hours (hardcoded).
// The checkpoint interval is read from Keys.CheckpointInterval (default: 12 hours).
//
// Format behaviour:
// - "json": Periodic checkpointing every checkpointInterval
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
// - "json": Writes a JSON snapshot file per host every interval
// - "wal": Writes a binary snapshot file per host every interval, then rotates
// the current.wal files for all successfully checkpointed hosts
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
lastCheckpointMu.Lock()
lastCheckpoint = time.Now()
@@ -99,7 +100,6 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore()
wg.Go(func() {
d := 12 * time.Hour // default checkpoint interval
if Keys.CheckpointInterval != "" {
parsed, err := time.ParseDuration(Keys.CheckpointInterval)
@@ -126,10 +126,15 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339))
if Keys.Checkpoints.FileFormat == "wal" {
// Pause WAL writes: the binary snapshot captures all in-memory
// data, so WAL records written during checkpoint are redundant
// and would be deleted during rotation anyway.
walCheckpointActive.Store(true)
n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
if err != nil {
cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error())
} else {
}
if n > 0 {
cclog.Infof("[METRICSTORE]> done: %d binary snapshot files created", n)
lastCheckpointMu.Lock()
lastCheckpoint = now
@@ -137,6 +142,8 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
// Rotate WAL files for successfully checkpointed hosts.
RotateWALFiles(hostDirs)
}
walCheckpointActive.Store(false)
walDropped.Store(0)
} else {
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
if err != nil {

View File

@@ -59,11 +59,14 @@ const (
// Checkpoints configures periodic persistence of in-memory metric data.
//
// Fields:
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
// - RootDir: Filesystem path for checkpoint files (created if missing)
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
// - RootDir: Filesystem path for checkpoint files (created if missing)
// - MaxWALSize: Maximum size in bytes for a single host's WAL file; 0 = unlimited (default).
// When exceeded the WAL is force-rotated to prevent unbounded disk growth.
type Checkpoints struct {
FileFormat string `json:"file-format"`
RootDir string `json:"directory"`
MaxWALSize int64 `json:"max-wal-size,omitempty"`
}
// Debug provides development and profiling options.

View File

@@ -24,6 +24,11 @@ const configSchema = `{
"directory": {
"description": "Path in which the checkpointed files should be placed.",
"type": "string"
},
"max-wal-size": {
"description": "Maximum size in bytes for a single host's WAL file. When exceeded the WAL is force-rotated to prevent unbounded disk growth. Only applies when file-format is 'wal'. 0 means unlimited (default).",
"type": "integer",
"minimum": 0
}
}
},

View File

@@ -8,7 +8,7 @@
//
// The package organizes metrics in a tree structure (cluster → host → component) and
// provides concurrent read/write access to metric data with configurable aggregation strategies.
// Background goroutines handle periodic checkpointing (JSON or Avro format), archiving old data,
// Background goroutines handle periodic checkpointing (JSON or WAL/binary format), archiving old data,
// and enforcing retention policies.
//
// Key features:
@@ -16,7 +16,7 @@
// - Hierarchical data organization (selectors)
// - Concurrent checkpoint/archive workers
// - Support for sum and average aggregation
// - NATS integration for metric ingestion
// - NATS integration for metric ingestion via InfluxDB line protocol
package metricstore
import (
@@ -113,7 +113,8 @@ type MemoryStore struct {
// 6. Optionally subscribes to NATS for real-time metric ingestion
//
// Parameters:
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig)
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig); may be nil to use defaults
// - metrics: Map of metric names to their configurations (frequency and aggregation strategy)
// - wg: WaitGroup that will be incremented for each background goroutine started
//
// The function will call cclog.Fatal on critical errors during initialization.

View File

@@ -99,6 +99,11 @@ var walStagingWg sync.WaitGroup
// SendWALMessage from sending on a closed channel (which panics in Go).
var walShuttingDown atomic.Bool
// walCheckpointActive is set during binary checkpoint writes.
// While active, SendWALMessage skips sending (returns true) because the
// snapshot captures all in-memory data, making WAL writes redundant.
var walCheckpointActive atomic.Bool
// WALMessage represents a single metric write to be appended to the WAL.
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
type WALMessage struct {
@@ -122,6 +127,7 @@ type walFileState struct {
f *os.File
w *bufio.Writer
dirty bool
size int64 // approximate bytes written (tracked from open + writes)
}
// walFlushInterval controls how often dirty WAL files are flushed to disk.
@@ -145,6 +151,9 @@ func SendWALMessage(msg *WALMessage) bool {
if walShardChs == nil || walShuttingDown.Load() {
return false
}
if walCheckpointActive.Load() {
return true // Data safe in memory; snapshot will capture it
}
shard := walShardIndex(msg.Cluster, msg.Node)
select {
case walShardChs[shard] <- msg:
@@ -214,7 +223,11 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
// Write file header magic if file is new (empty).
info, err := f.Stat()
if err == nil && info.Size() == 0 {
var fileSize int64
if err == nil {
fileSize = info.Size()
}
if err == nil && fileSize == 0 {
var hdr [4]byte
binary.LittleEndian.PutUint32(hdr[:], walFileMagic)
if _, err := w.Write(hdr[:]); err != nil {
@@ -222,9 +235,10 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
f.Close()
return nil
}
fileSize = 4
}
ws = &walFileState{f: f, w: w}
ws = &walFileState{f: f, w: w, size: fileSize}
hostFiles[hostDir] = ws
return ws
}
@@ -235,9 +249,30 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
if ws == nil {
return
}
if err := writeWALRecordDirect(ws.w, msg); err != nil {
// Enforce max WAL size: force-rotate before writing if limit is exceeded.
// The in-memory store still holds the data; only crash-recovery coverage is lost.
if maxSize := Keys.Checkpoints.MaxWALSize; maxSize > 0 && ws.size >= maxSize {
cclog.Warnf("[METRICSTORE]> WAL: force-rotating %s (size %d >= limit %d)",
hostDir, ws.size, maxSize)
ws.w.Flush()
ws.f.Close()
walPath := path.Join(hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err)
}
delete(hostFiles, hostDir)
ws = getOrOpenWAL(hostDir)
if ws == nil {
return
}
}
n, err := writeWALRecordDirect(ws.w, msg)
if err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
}
ws.size += int64(n)
ws.dirty = true
}
@@ -331,6 +366,7 @@ func RotateWALFiles(hostDirs []string) {
if walShardRotateChs == nil || walShuttingDown.Load() {
return
}
deadline := time.After(2 * time.Minute)
dones := make([]chan struct{}, 0, len(hostDirs))
for _, dir := range hostDirs {
done := make(chan struct{})
@@ -338,16 +374,18 @@ func RotateWALFiles(hostDirs []string) {
select {
case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}:
dones = append(dones, done)
default:
// Channel full or goroutine not consuming — skip this host.
cclog.Warnf("[METRICSTORE]> WAL rotation skipped for %s (channel full)", dir)
case <-deadline:
cclog.Warnf("[METRICSTORE]> WAL rotation send timed out, %d of %d hosts remaining",
len(hostDirs)-len(dones), len(hostDirs))
goto waitDones
}
}
waitDones:
for _, done := range dones {
select {
case <-done:
case <-time.After(30 * time.Second):
cclog.Warn("[METRICSTORE]> WAL rotation timed out, continuing")
cclog.Warn("[METRICSTORE]> WAL rotation completion timed out, continuing")
return
}
}
@@ -362,8 +400,9 @@ func walShardIndexFromDir(hostDir string) int {
return walShardIndex(cluster, node)
}
// RotateWALFiles sends rotation requests for the given host directories
// and blocks until all rotations complete.
// RotateWALFilesAfterShutdown directly removes current.wal files for the given
// host directories. Used after shutdown, when WALStaging goroutines have already
// exited and the channel-based RotateWALFiles is no longer safe to call.
func RotateWALFilesAfterShutdown(hostDirs []string) {
for _, dir := range hostDirs {
walPath := path.Join(dir, "current.wal")
@@ -376,7 +415,8 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
// writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
// then writes it to the bufio.Writer in a single call. This prevents partial
// records in the write buffer if a write error occurs mid-record (e.g. disk full).
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
// Returns the number of bytes written and any error.
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) (int, error) {
// Compute payload and total record size.
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
for _, s := range msg.Selector {
@@ -430,8 +470,8 @@ func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
binary.LittleEndian.PutUint32(buf[p:p+4], crc)
// Single atomic write to the buffered writer.
_, err := w.Write(buf)
return err
n, err := w.Write(buf)
return n, err
}
// readWALRecord reads one WAL record from the reader.
@@ -696,11 +736,6 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
atomic.AddInt32(&errs, 1)
} else {
atomic.AddInt32(&n, 1)
// Delete WAL immediately after successful snapshot.
walPath := path.Join(wi.hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err)
}
successMu.Lock()
successDirs = append(successDirs, wi.hostDir)
successMu.Unlock()

View File

@@ -57,7 +57,7 @@
let entries = $state([]);
let loading = $state(false);
let error = $state(null);
let timer = $state(null);
let timer = null;
function levelColor(priority) {
if (priority <= 2) return "danger";