20 Commits

Author SHA1 Message Date
Jan Eitzinger
3001086647 Merge pull request #544 from ClusterCockpit/hotfix
Update ReleaseNotes
2026-04-01 14:02:14 +02:00
573f7d144b Update ReleaseNotes
Entire-Checkpoint: cd686356ea80
2026-04-01 13:58:26 +02:00
Jan Eitzinger
38cbc33fb0 Merge pull request #543 from ClusterCockpit/hotfix
Hotfix
2026-04-01 13:53:50 +02:00
43807ae12a feat: Also submit projects array via oidc token
Entire-Checkpoint: 2064482d97e1
2026-04-01 13:46:21 +02:00
31a8a11f1b fix: Always request oidc roles from token
Entire-Checkpoint: bfdbffd7aae0
2026-04-01 12:36:37 +02:00
84fe61b3e0 fix: allow all role changes on SyncUser and UpdateUser callback
Entire-Checkpoint: 496bace0120e
2026-04-01 11:09:50 +02:00
1f04e0a1ce fix: oidc role extraction
Entire-Checkpoint: bbe9ad3cf817
2026-04-01 11:03:19 +02:00
Jan Eitzinger
a101f215dc Merge pull request #542 from ClusterCockpit/hotfix
Hotfix
2026-03-31 07:23:14 +02:00
641dc0e3b8 Run gofumpt 2026-03-30 16:49:27 +02:00
b734c1a92a And another update
Entire-Checkpoint: 9bb66d18af6d
2026-03-30 16:48:12 +02:00
c5fe3c5cd9 Update golangci settings
Entire-Checkpoint: b9544ef2c54f
2026-03-30 16:46:30 +02:00
e2910b18b3 Fix golangci config
Entire-Checkpoint: 1a908bd95cfa
2026-03-30 16:29:15 +02:00
ed236ec539 Add Make targets for formatting and linting
Add configuration and document usage in README

Entire-Checkpoint: 53425877e242
2026-03-30 16:23:12 +02:00
82c514b11a Ease samesite cookie settings
Entire-Checkpoint: 2fe286e23a4a
2026-03-30 16:10:15 +02:00
66707bbf15 Update metricstore documentation
Entire-Checkpoint: 99f20c1edd90
2026-03-29 21:38:04 +02:00
fc47b12fed fix: Pause WAL writes during binary checkpoint to prevent message drops
WAL writes during checkpoint are redundant since the binary snapshot
captures all in-memory data. Pausing eliminates channel saturation
(1.4M+ dropped messages) caused by disk I/O contention between
checkpoint writes and WAL staging. Also removes direct WAL file
deletion in checkpoint workers that raced with the staging goroutine.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Entire-Checkpoint: 34d698f40bac
2026-03-29 11:13:39 +02:00
937984d11f fix: WAL rotation skipped for all nodes due to non-blocking send on small channel
RotateWALFiles used a non-blocking send (select/default) on rotation
channels buffered at 64. With thousands of nodes and few shards, the
channel fills instantly and nearly all hosts are skipped, leaving WAL
files unrotated indefinitely.

Replace with a blocking send using a shared 2-minute deadline so the
checkpoint goroutine waits for the staging goroutine to drain the
channel instead of immediately giving up.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Entire-Checkpoint: a1ec897216fa
2026-03-28 06:55:45 +01:00
3d99aec185 fix: Log viewer auto-refresh
Entire-Checkpoint: 0fc6e5388e08
2026-03-28 06:45:03 +01:00
280289185a Add checkpointReader to ReleaseNotes
Entire-Checkpoint: ea34ae75e21a
2026-03-28 06:28:07 +01:00
cc3d03bb5b fix: Unbound growth of wal files in case of checkpointing error
Entire-Checkpoint: 95a89a7127c5
2026-03-28 06:26:21 +01:00
19 changed files with 349 additions and 66 deletions

38
.golangci.yml Normal file
View File

@@ -0,0 +1,38 @@
version: "2"
run:
timeout: 5m
formatters:
enable:
- gofumpt # gofumpt formatter
settings:
gofumpt:
module-path: github.com/ClusterCockpit/cc-backend
linters:
enable:
- staticcheck # staticcheck = true
- unparam # unusedparams = true
- nilnil # nilness = true (catches nil returns of nil interface values)
- govet # base vet; nilness + unusedwrite via govet analyzers
settings:
staticcheck:
checks: ["ST1003"]
govet:
enable:
- nilness
- unusedwrite
exclusions:
paths:
- .git
- .vscode
- .idea
- node_modules
- internal/graph/generated # gqlgen generated code
- internal/api/docs\.go # swaggo generated code
- _test\.go # test files excluded from all linters

View File

@@ -36,7 +36,7 @@ SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \
$(wildcard $(FRONTEND)/src/header/*.svelte) \
$(wildcard $(FRONTEND)/src/job/*.svelte)
.PHONY: clean distclean test tags frontend swagger graphql $(TARGET)
.PHONY: clean distclean fmt lint test tags frontend swagger graphql $(TARGET)
.NOTPARALLEL:
@@ -75,6 +75,14 @@ test:
@go vet ./...
@go test ./...
fmt:
$(info ===> FORMAT)
@gofumpt -l -w .
lint:
$(info ===> LINT)
@golangci-lint run ./...
tags:
$(info ===> TAGS)
@ctags -R

View File

@@ -100,6 +100,26 @@ the following targets:
frontend source files will result in a complete rebuild.
- `make clean`: Clean go build cache and remove binary.
- `make test`: Run the tests that are also run in the GitHub workflow setup.
- `make fmt`: Format all Go source files using
[gofumpt](https://github.com/mvdan/gofumpt), a stricter superset of `gofmt`.
Requires `gofumpt` to be installed (see below).
- `make lint`: Run [golangci-lint](https://golangci-lint.run/) with the project
configuration in `.golangci.yml`. Requires `golangci-lint` to be installed
(see below).
### Installing development tools
`gofumpt` and `golangci-lint` are not part of the Go module and must be
installed separately:
```sh
# Formatter (gofumpt)
go install mvdan.cc/gofumpt@latest
# Linter (golangci-lint) — use the official install script to get a
# pre-built binary; installing via `go install` is not supported upstream.
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/HEAD/install.sh | sh -s -- -b $(go env GOPATH)/bin
```
A common workflow for setting up cc-backend from scratch is:

View File

@@ -19,6 +19,36 @@ This is also the default.
### Bug fixes
- **OIDC role extraction**: Fixed role extraction from OIDC tokens where roles
were not correctly parsed from the token claims. Roles are now always
requested from the token regardless of other configuration.
- **OIDC user sync role changes**: `SyncUser` and `UpdateUser` callbacks now
allow all role changes, removing a restriction that prevented role updates
during OIDC-driven user synchronization.
- **OIDC projects array**: Projects array from the OIDC token is now submitted
and applied when syncing user attributes.
- **WAL message drops during checkpoint**: WAL writes are now paused during
binary checkpoint creation. Previously, disk I/O contention between
checkpoint writes and WAL staging caused over 1.4 million dropped messages
per checkpoint cycle.
- **WAL rotation skipped for all nodes**: `RotateWALFiles` used a non-blocking
send on a small channel. With thousands of nodes, the channel filled instantly
and nearly all hosts were skipped, leaving WAL files unrotated. Replaced with
a blocking send using a shared 2-minute deadline.
- **Log viewer auto-refresh**: Fixed the log viewer component not auto-refreshing
correctly.
- **SameSite cookie setting**: Relaxed the SameSite cookie attribute to improve
compatibility with OIDC redirect flows.
- **WAL not rotated on partial checkpoint failure**: When binary checkpointing
failed for some hosts, WAL files for successfully checkpointed hosts were not
rotated and the checkpoint timestamp was not advanced. Partial successes now
correctly advance the checkpoint and rotate WAL files for completed hosts.
- **Unbounded WAL file growth**: If binary checkpointing consistently failed for
a host, its `current.wal` file grew without limit until disk exhaustion. A new
`max-wal-size` configuration option (in the `checkpoints` block) allows setting
a per-host WAL size cap in bytes. When exceeded, the WAL is force-rotated.
Defaults to 0 (unlimited) for backward compatibility.
- **Doubleranged filter fixes**: Range filters now correctly handle zero as a
boundary value. Improved validation and UI text for "more than equal" and
"less than equal" range selections.
@@ -50,6 +80,20 @@ This is also the default.
- **Explicit node state queries in node view**: Node health and scheduler state
are now fetched independently from metric data for fresher status information.
### Development tooling
- **Make targets for formatting and linting**: New `make fmt` and `make lint`
targets using `gofumpt` and `golangci-lint`. Configuration added in
`.golangci.yml` and `gopls.json`.
### New tools
- **binaryCheckpointReader**: New utility tool (`tools/binaryCheckpointReader`)
that reads `.wal` or `.bin` checkpoint files produced by the metricstore
WAL/snapshot system and dumps their contents to a human-readable `.txt` file.
Useful for debugging and inspecting checkpoint data. Usage:
`go run ./tools/binaryCheckpointReader <file.wal|file.bin>`
### Logging improvements
- **Reduced tagger log noise**: Missing metrics and expression evaluation errors

49
gopls.json Normal file
View File

@@ -0,0 +1,49 @@
{
"gofumpt": true,
"codelenses": {
"gc_details": false,
"generate": true,
"regenerate_cgo": true,
"run_govulncheck": true,
"test": true,
"tidy": true,
"upgrade_dependency": true,
"vendor": true
},
"hints": {
"assignVariableTypes": true,
"compositeLiteralFields": true,
"compositeLiteralTypes": true,
"constantValues": true,
"functionTypeParameters": true,
"parameterNames": true,
"rangeVariableTypes": true
},
"analyses": {
"nilness": true,
"unusedparams": true,
"unusedwrite": true,
"useany": true
},
"usePlaceholders": true,
"completeUnimported": true,
"staticcheck": true,
"directoryFilters": [
"-.git",
"-.vscode",
"-.idea",
"-.vscode-test",
"-node_modules",
"-web/frontend",
"-web/templates",
"-var",
"-api",
"-configs",
"-init",
"-internal/repository/migrations",
"-internal/repository/testdata",
"-internal/importer/testdata",
"-pkg/archive/testdata"
],
"semanticTokens": true
}

View File

@@ -164,12 +164,17 @@ func (auth *Authentication) AuthViaSession(
return nil, errors.New("invalid session data")
}
authSourceInt, ok := session.Values["authSource"].(int)
if !ok {
authSourceInt = int(schema.AuthViaLocalPassword)
}
return &schema.User{
Username: username,
Projects: projects,
Roles: roles,
AuthType: schema.AuthSession,
AuthSource: -1,
AuthSource: schema.AuthSource(authSourceInt),
}, nil
}
@@ -319,10 +324,11 @@ func (auth *Authentication) SaveSession(rw http.ResponseWriter, r *http.Request,
}
session.Options.Secure = false
}
session.Options.SameSite = http.SameSiteStrictMode
session.Options.SameSite = http.SameSiteLaxMode
session.Values["username"] = user.Username
session.Values["projects"] = user.Projects
session.Values["roles"] = user.Roles
session.Values["authSource"] = int(user.AuthSource)
if err := auth.sessionStore.Save(r, rw, session); err != nil {
cclog.Warnf("session save failed: %s", err.Error())
http.Error(rw, err.Error(), http.StatusInternalServerError)

View File

@@ -79,7 +79,7 @@ func NewOIDC(a *Authentication) *OIDC {
ClientID: clientID,
ClientSecret: clientSecret,
Endpoint: provider.Endpoint(),
Scopes: []string{oidc.ScopeOpenID, "profile"},
Scopes: []string{oidc.ScopeOpenID, "profile", "roles"},
}
oa := &OIDC{provider: provider, client: client, clientID: clientID, authentication: a}
@@ -162,36 +162,76 @@ func (oa *OIDC) OAuth2Callback(rw http.ResponseWriter, r *http.Request) {
return
}
projects := make([]string, 0)
// projects is populated below from ID token claims
var projects []string
// Extract custom claims from userinfo
var claims struct {
// Extract profile claims from userinfo (username, name)
var userInfoClaims struct {
Username string `json:"preferred_username"`
Name string `json:"name"`
}
if err := userInfo.Claims(&userInfoClaims); err != nil {
cclog.Errorf("failed to extract userinfo claims: %s", err.Error())
http.Error(rw, "Failed to extract user claims", http.StatusInternalServerError)
return
}
// Extract role claims from the ID token.
// Keycloak includes realm_access and resource_access in the ID token (JWT),
// but NOT in the UserInfo endpoint response by default.
var idTokenClaims struct {
Username string `json:"preferred_username"`
Name string `json:"name"`
// Keycloak realm-level roles
RealmAccess struct {
Roles []string `json:"roles"`
} `json:"realm_access"`
// Keycloak client-level roles
ResourceAccess struct {
Client struct {
Roles []string `json:"roles"`
} `json:"clustercockpit"`
// Keycloak client-level roles: map from client-id to role list
ResourceAccess map[string]struct {
Roles []string `json:"roles"`
} `json:"resource_access"`
// Custom multi-valued user attribute mapped via a Keycloak User Attribute mapper
Projects []string `json:"projects"`
}
if err := userInfo.Claims(&claims); err != nil {
cclog.Errorf("failed to extract claims: %s", err.Error())
http.Error(rw, "Failed to extract user claims", http.StatusInternalServerError)
if err := idToken.Claims(&idTokenClaims); err != nil {
cclog.Errorf("failed to extract ID token claims: %s", err.Error())
http.Error(rw, "Failed to extract ID token claims", http.StatusInternalServerError)
return
}
if claims.Username == "" {
cclog.Debugf("OIDC userinfo claims: username=%q name=%q", userInfoClaims.Username, userInfoClaims.Name)
cclog.Debugf("OIDC ID token realm_access roles: %v", idTokenClaims.RealmAccess.Roles)
cclog.Debugf("OIDC ID token resource_access: %v", idTokenClaims.ResourceAccess)
cclog.Debugf("OIDC ID token projects: %v", idTokenClaims.Projects)
projects = idTokenClaims.Projects
if projects == nil {
projects = []string{}
}
// Prefer username from userInfo; fall back to ID token claim
username := userInfoClaims.Username
if username == "" {
username = idTokenClaims.Username
}
name := userInfoClaims.Name
if name == "" {
name = idTokenClaims.Name
}
if username == "" {
http.Error(rw, "Username claim missing from OIDC provider", http.StatusBadRequest)
return
}
// Merge roles from both client-level and realm-level access
oidcRoles := append(claims.ResourceAccess.Client.Roles, claims.RealmAccess.Roles...)
// Collect roles from realm_access (realm roles) in the ID token
oidcRoles := append([]string{}, idTokenClaims.RealmAccess.Roles...)
// Also collect roles from resource_access (client roles) for all clients
for clientID, access := range idTokenClaims.ResourceAccess {
cclog.Debugf("OIDC ID token resource_access[%q] roles: %v", clientID, access.Roles)
oidcRoles = append(oidcRoles, access.Roles...)
}
roleSet := make(map[string]bool)
for _, r := range oidcRoles {
@@ -217,8 +257,8 @@ func (oa *OIDC) OAuth2Callback(rw http.ResponseWriter, r *http.Request) {
}
user := &schema.User{
Username: claims.Username,
Name: claims.Name,
Username: username,
Name: name,
Roles: roles,
Projects: projects,
AuthSource: schema.AuthViaOIDC,

View File

@@ -1072,10 +1072,12 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
// SubCluster returns generated.SubClusterResolver implementation.
func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} }
type clusterResolver struct{ *Resolver }
type jobResolver struct{ *Resolver }
type metricValueResolver struct{ *Resolver }
type mutationResolver struct{ *Resolver }
type nodeResolver struct{ *Resolver }
type queryResolver struct{ *Resolver }
type subClusterResolver struct{ *Resolver }
type (
clusterResolver struct{ *Resolver }
jobResolver struct{ *Resolver }
metricValueResolver struct{ *Resolver }
mutationResolver struct{ *Resolver }
nodeResolver struct{ *Resolver }
queryResolver struct{ *Resolver }
subClusterResolver struct{ *Resolver }
)

View File

@@ -236,4 +236,3 @@ func (ccms *CCMetricStore) buildNodeQueries(
return queries, assignedScope, nil
}

View File

@@ -63,10 +63,10 @@ func DefaultConfig() *RepositoryConfig {
MaxIdleConnections: 4,
ConnectionMaxLifetime: time.Hour,
ConnectionMaxIdleTime: 10 * time.Minute,
MinRunningJobDuration: 600, // 10 minutes
DbCacheSizeMB: 2048, // 2GB per connection
DbSoftHeapLimitMB: 16384, // 16GB process-wide
BusyTimeoutMs: 60000, // 60 seconds
MinRunningJobDuration: 600, // 10 minutes
DbCacheSizeMB: 2048, // 2GB per connection
DbSoftHeapLimitMB: 16384, // 16GB process-wide
BusyTimeoutMs: 60000, // 60 seconds
}
}

View File

@@ -14,6 +14,7 @@ import (
"path/filepath"
"reflect"
"runtime"
"sort"
"strings"
"sync"
@@ -210,6 +211,12 @@ func (r *UserRepository) AddUserIfNotExists(user *schema.User) error {
return err
}
func sortedRoles(roles []string) []string {
cp := append([]string{}, roles...)
sort.Strings(cp)
return cp
}
func (r *UserRepository) UpdateUser(dbUser *schema.User, user *schema.User) error {
// user contains updated info -> Apply to dbUser
// --- Simple Name Update ---
@@ -279,6 +286,15 @@ func (r *UserRepository) UpdateUser(dbUser *schema.User, user *schema.User) erro
}
}
// --- Fallback: sync any remaining role differences not covered above ---
// This handles admin role assignment/removal and any other combinations that
// the specific branches above do not cover (e.g. user→admin, admin→user).
if !reflect.DeepEqual(sortedRoles(dbUser.Roles), sortedRoles(user.Roles)) {
if err := updateRoles(user.Roles); err != nil {
return err
}
}
return nil
}

View File

@@ -133,10 +133,10 @@ func (pt *prefixedTarget) WriteFile(name string, data []byte) error {
// ClusterAwareParquetWriter organizes Parquet output by cluster.
// Each cluster gets its own subdirectory with a cluster.json config file.
type ClusterAwareParquetWriter struct {
target ParquetTarget
maxSizeMB int
writers map[string]*ParquetWriter
clusterCfgs map[string]*schema.Cluster
target ParquetTarget
maxSizeMB int
writers map[string]*ParquetWriter
clusterCfgs map[string]*schema.Cluster
}
// NewClusterAwareParquetWriter creates a writer that routes jobs to per-cluster ParquetWriters.

View File

@@ -3,6 +3,12 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
// This file implements the cleanup (archiving or deletion) of old checkpoint files.
//
// The CleanUp worker runs on a timer equal to RetentionInMemory. In "archive" mode
// it converts checkpoint files older than the retention window into per-cluster
// Parquet files and then deletes the originals. In "delete" mode it simply removes
// old checkpoint files.
package metricstore
import (
@@ -19,8 +25,12 @@ import (
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
)
// Worker for either Archiving or Deleting files
// CleanUp starts a background worker that periodically removes or archives
// checkpoint files older than the configured retention window.
//
// In "archive" mode, old checkpoint files are converted to Parquet and stored
// under Keys.Cleanup.RootDir. In "delete" mode they are simply removed.
// The cleanup interval equals Keys.RetentionInMemory.
func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
if Keys.Cleanup.Mode == "archive" {
cclog.Info("[METRICSTORE]> enable archive cleanup to parquet")

View File

@@ -86,11 +86,12 @@ var (
// Checkpointing starts a background worker that periodically saves metric data to disk.
//
// Checkpoints are written every 12 hours (hardcoded).
// The checkpoint interval is read from Keys.CheckpointInterval (default: 12 hours).
//
// Format behaviour:
// - "json": Periodic checkpointing every checkpointInterval
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
// - "json": Writes a JSON snapshot file per host every interval
// - "wal": Writes a binary snapshot file per host every interval, then rotates
// the current.wal files for all successfully checkpointed hosts
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
lastCheckpointMu.Lock()
lastCheckpoint = time.Now()
@@ -99,7 +100,6 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore()
wg.Go(func() {
d := 12 * time.Hour // default checkpoint interval
if Keys.CheckpointInterval != "" {
parsed, err := time.ParseDuration(Keys.CheckpointInterval)
@@ -126,10 +126,15 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339))
if Keys.Checkpoints.FileFormat == "wal" {
// Pause WAL writes: the binary snapshot captures all in-memory
// data, so WAL records written during checkpoint are redundant
// and would be deleted during rotation anyway.
walCheckpointActive.Store(true)
n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
if err != nil {
cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error())
} else {
}
if n > 0 {
cclog.Infof("[METRICSTORE]> done: %d binary snapshot files created", n)
lastCheckpointMu.Lock()
lastCheckpoint = now
@@ -137,6 +142,8 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
// Rotate WAL files for successfully checkpointed hosts.
RotateWALFiles(hostDirs)
}
walCheckpointActive.Store(false)
walDropped.Store(0)
} else {
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
if err != nil {

View File

@@ -59,11 +59,14 @@ const (
// Checkpoints configures periodic persistence of in-memory metric data.
//
// Fields:
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
// - RootDir: Filesystem path for checkpoint files (created if missing)
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
// - RootDir: Filesystem path for checkpoint files (created if missing)
// - MaxWALSize: Maximum size in bytes for a single host's WAL file; 0 = unlimited (default).
// When exceeded the WAL is force-rotated to prevent unbounded disk growth.
type Checkpoints struct {
FileFormat string `json:"file-format"`
RootDir string `json:"directory"`
MaxWALSize int64 `json:"max-wal-size,omitempty"`
}
// Debug provides development and profiling options.

View File

@@ -24,6 +24,11 @@ const configSchema = `{
"directory": {
"description": "Path in which the checkpointed files should be placed.",
"type": "string"
},
"max-wal-size": {
"description": "Maximum size in bytes for a single host's WAL file. When exceeded the WAL is force-rotated to prevent unbounded disk growth. Only applies when file-format is 'wal'. 0 means unlimited (default).",
"type": "integer",
"minimum": 0
}
}
},

View File

@@ -8,7 +8,7 @@
//
// The package organizes metrics in a tree structure (cluster → host → component) and
// provides concurrent read/write access to metric data with configurable aggregation strategies.
// Background goroutines handle periodic checkpointing (JSON or Avro format), archiving old data,
// Background goroutines handle periodic checkpointing (JSON or WAL/binary format), archiving old data,
// and enforcing retention policies.
//
// Key features:
@@ -16,7 +16,7 @@
// - Hierarchical data organization (selectors)
// - Concurrent checkpoint/archive workers
// - Support for sum and average aggregation
// - NATS integration for metric ingestion
// - NATS integration for metric ingestion via InfluxDB line protocol
package metricstore
import (
@@ -113,7 +113,8 @@ type MemoryStore struct {
// 6. Optionally subscribes to NATS for real-time metric ingestion
//
// Parameters:
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig)
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig); may be nil to use defaults
// - metrics: Map of metric names to their configurations (frequency and aggregation strategy)
// - wg: WaitGroup that will be incremented for each background goroutine started
//
// The function will call cclog.Fatal on critical errors during initialization.

View File

@@ -99,6 +99,11 @@ var walStagingWg sync.WaitGroup
// SendWALMessage from sending on a closed channel (which panics in Go).
var walShuttingDown atomic.Bool
// walCheckpointActive is set during binary checkpoint writes.
// While active, SendWALMessage skips sending (returns true) because the
// snapshot captures all in-memory data, making WAL writes redundant.
var walCheckpointActive atomic.Bool
// WALMessage represents a single metric write to be appended to the WAL.
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
type WALMessage struct {
@@ -122,6 +127,7 @@ type walFileState struct {
f *os.File
w *bufio.Writer
dirty bool
size int64 // approximate bytes written (tracked from open + writes)
}
// walFlushInterval controls how often dirty WAL files are flushed to disk.
@@ -145,6 +151,9 @@ func SendWALMessage(msg *WALMessage) bool {
if walShardChs == nil || walShuttingDown.Load() {
return false
}
if walCheckpointActive.Load() {
return true // Data safe in memory; snapshot will capture it
}
shard := walShardIndex(msg.Cluster, msg.Node)
select {
case walShardChs[shard] <- msg:
@@ -214,7 +223,11 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
// Write file header magic if file is new (empty).
info, err := f.Stat()
if err == nil && info.Size() == 0 {
var fileSize int64
if err == nil {
fileSize = info.Size()
}
if err == nil && fileSize == 0 {
var hdr [4]byte
binary.LittleEndian.PutUint32(hdr[:], walFileMagic)
if _, err := w.Write(hdr[:]); err != nil {
@@ -222,9 +235,10 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
f.Close()
return nil
}
fileSize = 4
}
ws = &walFileState{f: f, w: w}
ws = &walFileState{f: f, w: w, size: fileSize}
hostFiles[hostDir] = ws
return ws
}
@@ -235,9 +249,30 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
if ws == nil {
return
}
if err := writeWALRecordDirect(ws.w, msg); err != nil {
// Enforce max WAL size: force-rotate before writing if limit is exceeded.
// The in-memory store still holds the data; only crash-recovery coverage is lost.
if maxSize := Keys.Checkpoints.MaxWALSize; maxSize > 0 && ws.size >= maxSize {
cclog.Warnf("[METRICSTORE]> WAL: force-rotating %s (size %d >= limit %d)",
hostDir, ws.size, maxSize)
ws.w.Flush()
ws.f.Close()
walPath := path.Join(hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err)
}
delete(hostFiles, hostDir)
ws = getOrOpenWAL(hostDir)
if ws == nil {
return
}
}
n, err := writeWALRecordDirect(ws.w, msg)
if err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
}
ws.size += int64(n)
ws.dirty = true
}
@@ -331,6 +366,7 @@ func RotateWALFiles(hostDirs []string) {
if walShardRotateChs == nil || walShuttingDown.Load() {
return
}
deadline := time.After(2 * time.Minute)
dones := make([]chan struct{}, 0, len(hostDirs))
for _, dir := range hostDirs {
done := make(chan struct{})
@@ -338,16 +374,18 @@ func RotateWALFiles(hostDirs []string) {
select {
case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}:
dones = append(dones, done)
default:
// Channel full or goroutine not consuming — skip this host.
cclog.Warnf("[METRICSTORE]> WAL rotation skipped for %s (channel full)", dir)
case <-deadline:
cclog.Warnf("[METRICSTORE]> WAL rotation send timed out, %d of %d hosts remaining",
len(hostDirs)-len(dones), len(hostDirs))
goto waitDones
}
}
waitDones:
for _, done := range dones {
select {
case <-done:
case <-time.After(30 * time.Second):
cclog.Warn("[METRICSTORE]> WAL rotation timed out, continuing")
cclog.Warn("[METRICSTORE]> WAL rotation completion timed out, continuing")
return
}
}
@@ -362,8 +400,9 @@ func walShardIndexFromDir(hostDir string) int {
return walShardIndex(cluster, node)
}
// RotateWALFiles sends rotation requests for the given host directories
// and blocks until all rotations complete.
// RotateWALFilesAfterShutdown directly removes current.wal files for the given
// host directories. Used after shutdown, when WALStaging goroutines have already
// exited and the channel-based RotateWALFiles is no longer safe to call.
func RotateWALFilesAfterShutdown(hostDirs []string) {
for _, dir := range hostDirs {
walPath := path.Join(dir, "current.wal")
@@ -376,7 +415,8 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
// writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
// then writes it to the bufio.Writer in a single call. This prevents partial
// records in the write buffer if a write error occurs mid-record (e.g. disk full).
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
// Returns the number of bytes written and any error.
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) (int, error) {
// Compute payload and total record size.
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
for _, s := range msg.Selector {
@@ -430,8 +470,8 @@ func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
binary.LittleEndian.PutUint32(buf[p:p+4], crc)
// Single atomic write to the buffered writer.
_, err := w.Write(buf)
return err
n, err := w.Write(buf)
return n, err
}
// readWALRecord reads one WAL record from the reader.
@@ -696,11 +736,6 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
atomic.AddInt32(&errs, 1)
} else {
atomic.AddInt32(&n, 1)
// Delete WAL immediately after successful snapshot.
walPath := path.Join(wi.hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err)
}
successMu.Lock()
successDirs = append(successDirs, wi.hostDir)
successMu.Unlock()

View File

@@ -57,7 +57,7 @@
let entries = $state([]);
let loading = $state(false);
let error = $state(null);
let timer = $state(null);
let timer = null;
function levelColor(priority) {
if (priority <= 2) return "danger";