diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 00000000..2210eb00 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,38 @@ +version: "2" + +run: + timeout: 5m + +formatters: + enable: + - gofumpt # gofumpt formatter + + settings: + gofumpt: + module-path: github.com/ClusterCockpit/cc-backend + +linters: + enable: + - staticcheck # staticcheck = true + - unparam # unusedparams = true + - nilnil # nilness = true (catches nil returns of nil interface values) + - govet # base vet; nilness + unusedwrite via govet analyzers + + settings: + staticcheck: + checks: ["ST1003"] + + govet: + enable: + - nilness + - unusedwrite + +exclusions: + paths: + - .git + - .vscode + - .idea + - node_modules + - internal/graph/generated # gqlgen generated code + - internal/api/docs\.go # swaggo generated code + - _test\.go # test files excluded from all linters diff --git a/Makefile b/Makefile index 4de1623a..d139583d 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,7 @@ SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \ $(wildcard $(FRONTEND)/src/header/*.svelte) \ $(wildcard $(FRONTEND)/src/job/*.svelte) -.PHONY: clean distclean test tags frontend swagger graphql $(TARGET) +.PHONY: clean distclean fmt lint test tags frontend swagger graphql $(TARGET) .NOTPARALLEL: @@ -75,6 +75,14 @@ test: @go vet ./... @go test ./... +fmt: + $(info ===> FORMAT) + @gofumpt -l -w . + +lint: + $(info ===> LINT) + @golangci-lint run ./... + tags: $(info ===> TAGS) @ctags -R diff --git a/README.md b/README.md index 030c9bd3..80a21a39 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,26 @@ the following targets: frontend source files will result in a complete rebuild. - `make clean`: Clean go build cache and remove binary. - `make test`: Run the tests that are also run in the GitHub workflow setup. +- `make fmt`: Format all Go source files using + [gofumpt](https://github.com/mvdan/gofumpt), a stricter superset of `gofmt`. + Requires `gofumpt` to be installed (see below). +- `make lint`: Run [golangci-lint](https://golangci-lint.run/) with the project + configuration in `.golangci.yml`. Requires `golangci-lint` to be installed + (see below). + +### Installing development tools + +`gofumpt` and `golangci-lint` are not part of the Go module and must be +installed separately: + +```sh +# Formatter (gofumpt) +go install mvdan.cc/gofumpt@latest + +# Linter (golangci-lint) — use the official install script to get a +# pre-built binary; installing via `go install` is not supported upstream. +curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/HEAD/install.sh | sh -s -- -b $(go env GOPATH)/bin +``` A common workflow for setting up cc-backend from scratch is: diff --git a/ReleaseNotes.md b/ReleaseNotes.md index 20e66322..48bf82ec 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -19,6 +19,16 @@ This is also the default. ### Bug fixes +- **WAL not rotated on partial checkpoint failure**: When binary checkpointing + failed for some hosts, WAL files for successfully checkpointed hosts were not + rotated and the checkpoint timestamp was not advanced. Partial successes now + correctly advance the checkpoint and rotate WAL files for completed hosts. +- **Unbounded WAL file growth**: If binary checkpointing consistently failed for + a host, its `current.wal` file grew without limit until disk exhaustion. A new + `max-wal-size` configuration option (in the `checkpoints` block) allows setting + a per-host WAL size cap in bytes. When exceeded, the WAL is force-rotated. + Defaults to 0 (unlimited) for backward compatibility. + - **Doubleranged filter fixes**: Range filters now correctly handle zero as a boundary value. Improved validation and UI text for "more than equal" and "less than equal" range selections. @@ -50,6 +60,14 @@ This is also the default. - **Explicit node state queries in node view**: Node health and scheduler state are now fetched independently from metric data for fresher status information. +### New tools + +- **binaryCheckpointReader**: New utility tool (`tools/binaryCheckpointReader`) + that reads `.wal` or `.bin` checkpoint files produced by the metricstore + WAL/snapshot system and dumps their contents to a human-readable `.txt` file. + Useful for debugging and inspecting checkpoint data. Usage: + `go run ./tools/binaryCheckpointReader ` + ### Logging improvements - **Reduced tagger log noise**: Missing metrics and expression evaluation errors diff --git a/gopls.json b/gopls.json new file mode 100644 index 00000000..cea2d908 --- /dev/null +++ b/gopls.json @@ -0,0 +1,49 @@ +{ + "gofumpt": true, + "codelenses": { + "gc_details": false, + "generate": true, + "regenerate_cgo": true, + "run_govulncheck": true, + "test": true, + "tidy": true, + "upgrade_dependency": true, + "vendor": true + }, + "hints": { + "assignVariableTypes": true, + "compositeLiteralFields": true, + "compositeLiteralTypes": true, + "constantValues": true, + "functionTypeParameters": true, + "parameterNames": true, + "rangeVariableTypes": true + }, + "analyses": { + "nilness": true, + "unusedparams": true, + "unusedwrite": true, + "useany": true + }, + "usePlaceholders": true, + "completeUnimported": true, + "staticcheck": true, + "directoryFilters": [ + "-.git", + "-.vscode", + "-.idea", + "-.vscode-test", + "-node_modules", + "-web/frontend", + "-web/templates", + "-var", + "-api", + "-configs", + "-init", + "-internal/repository/migrations", + "-internal/repository/testdata", + "-internal/importer/testdata", + "-pkg/archive/testdata" + ], + "semanticTokens": true +} diff --git a/internal/auth/auth.go b/internal/auth/auth.go index 327e48a3..d1c004bd 100644 --- a/internal/auth/auth.go +++ b/internal/auth/auth.go @@ -164,12 +164,17 @@ func (auth *Authentication) AuthViaSession( return nil, errors.New("invalid session data") } + authSourceInt, ok := session.Values["authSource"].(int) + if !ok { + authSourceInt = int(schema.AuthViaLocalPassword) + } + return &schema.User{ Username: username, Projects: projects, Roles: roles, AuthType: schema.AuthSession, - AuthSource: -1, + AuthSource: schema.AuthSource(authSourceInt), }, nil } @@ -319,10 +324,11 @@ func (auth *Authentication) SaveSession(rw http.ResponseWriter, r *http.Request, } session.Options.Secure = false } - session.Options.SameSite = http.SameSiteStrictMode + session.Options.SameSite = http.SameSiteLaxMode session.Values["username"] = user.Username session.Values["projects"] = user.Projects session.Values["roles"] = user.Roles + session.Values["authSource"] = int(user.AuthSource) if err := auth.sessionStore.Save(r, rw, session); err != nil { cclog.Warnf("session save failed: %s", err.Error()) http.Error(rw, err.Error(), http.StatusInternalServerError) diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 9cfb808e..898ea84e 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -1072,10 +1072,12 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } // SubCluster returns generated.SubClusterResolver implementation. func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} } -type clusterResolver struct{ *Resolver } -type jobResolver struct{ *Resolver } -type metricValueResolver struct{ *Resolver } -type mutationResolver struct{ *Resolver } -type nodeResolver struct{ *Resolver } -type queryResolver struct{ *Resolver } -type subClusterResolver struct{ *Resolver } +type ( + clusterResolver struct{ *Resolver } + jobResolver struct{ *Resolver } + metricValueResolver struct{ *Resolver } + mutationResolver struct{ *Resolver } + nodeResolver struct{ *Resolver } + queryResolver struct{ *Resolver } + subClusterResolver struct{ *Resolver } +) diff --git a/internal/metricstoreclient/cc-metric-store-queries.go b/internal/metricstoreclient/cc-metric-store-queries.go index 1119d70c..fa2bea25 100644 --- a/internal/metricstoreclient/cc-metric-store-queries.go +++ b/internal/metricstoreclient/cc-metric-store-queries.go @@ -236,4 +236,3 @@ func (ccms *CCMetricStore) buildNodeQueries( return queries, assignedScope, nil } - diff --git a/internal/repository/config.go b/internal/repository/config.go index a201bb6b..f932836e 100644 --- a/internal/repository/config.go +++ b/internal/repository/config.go @@ -63,10 +63,10 @@ func DefaultConfig() *RepositoryConfig { MaxIdleConnections: 4, ConnectionMaxLifetime: time.Hour, ConnectionMaxIdleTime: 10 * time.Minute, - MinRunningJobDuration: 600, // 10 minutes - DbCacheSizeMB: 2048, // 2GB per connection - DbSoftHeapLimitMB: 16384, // 16GB process-wide - BusyTimeoutMs: 60000, // 60 seconds + MinRunningJobDuration: 600, // 10 minutes + DbCacheSizeMB: 2048, // 2GB per connection + DbSoftHeapLimitMB: 16384, // 16GB process-wide + BusyTimeoutMs: 60000, // 60 seconds } } diff --git a/pkg/archive/parquet/writer.go b/pkg/archive/parquet/writer.go index bfe4490f..11cdad9e 100644 --- a/pkg/archive/parquet/writer.go +++ b/pkg/archive/parquet/writer.go @@ -133,10 +133,10 @@ func (pt *prefixedTarget) WriteFile(name string, data []byte) error { // ClusterAwareParquetWriter organizes Parquet output by cluster. // Each cluster gets its own subdirectory with a cluster.json config file. type ClusterAwareParquetWriter struct { - target ParquetTarget - maxSizeMB int - writers map[string]*ParquetWriter - clusterCfgs map[string]*schema.Cluster + target ParquetTarget + maxSizeMB int + writers map[string]*ParquetWriter + clusterCfgs map[string]*schema.Cluster } // NewClusterAwareParquetWriter creates a writer that routes jobs to per-cluster ParquetWriters. diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index 3b92a3e0..3938ac82 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -3,6 +3,12 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +// This file implements the cleanup (archiving or deletion) of old checkpoint files. +// +// The CleanUp worker runs on a timer equal to RetentionInMemory. In "archive" mode +// it converts checkpoint files older than the retention window into per-cluster +// Parquet files and then deletes the originals. In "delete" mode it simply removes +// old checkpoint files. package metricstore import ( @@ -19,8 +25,12 @@ import ( cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" ) -// Worker for either Archiving or Deleting files - +// CleanUp starts a background worker that periodically removes or archives +// checkpoint files older than the configured retention window. +// +// In "archive" mode, old checkpoint files are converted to Parquet and stored +// under Keys.Cleanup.RootDir. In "delete" mode they are simply removed. +// The cleanup interval equals Keys.RetentionInMemory. func CleanUp(wg *sync.WaitGroup, ctx context.Context) { if Keys.Cleanup.Mode == "archive" { cclog.Info("[METRICSTORE]> enable archive cleanup to parquet") diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index 3627a67b..c2b86415 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -86,11 +86,12 @@ var ( // Checkpointing starts a background worker that periodically saves metric data to disk. // -// Checkpoints are written every 12 hours (hardcoded). +// The checkpoint interval is read from Keys.CheckpointInterval (default: 12 hours). // // Format behaviour: -// - "json": Periodic checkpointing every checkpointInterval -// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval +// - "json": Writes a JSON snapshot file per host every interval +// - "wal": Writes a binary snapshot file per host every interval, then rotates +// the current.wal files for all successfully checkpointed hosts func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { lastCheckpointMu.Lock() lastCheckpoint = time.Now() @@ -99,7 +100,6 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { ms := GetMemoryStore() wg.Go(func() { - d := 12 * time.Hour // default checkpoint interval if Keys.CheckpointInterval != "" { parsed, err := time.ParseDuration(Keys.CheckpointInterval) @@ -126,10 +126,15 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339)) if Keys.Checkpoints.FileFormat == "wal" { + // Pause WAL writes: the binary snapshot captures all in-memory + // data, so WAL records written during checkpoint are redundant + // and would be deleted during rotation anyway. + walCheckpointActive.Store(true) n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix()) if err != nil { cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error()) - } else { + } + if n > 0 { cclog.Infof("[METRICSTORE]> done: %d binary snapshot files created", n) lastCheckpointMu.Lock() lastCheckpoint = now @@ -137,6 +142,8 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { // Rotate WAL files for successfully checkpointed hosts. RotateWALFiles(hostDirs) } + walCheckpointActive.Store(false) + walDropped.Store(0) } else { n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix()) if err != nil { diff --git a/pkg/metricstore/config.go b/pkg/metricstore/config.go index f6c56672..b24af844 100644 --- a/pkg/metricstore/config.go +++ b/pkg/metricstore/config.go @@ -59,11 +59,14 @@ const ( // Checkpoints configures periodic persistence of in-memory metric data. // // Fields: -// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal" -// - RootDir: Filesystem path for checkpoint files (created if missing) +// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal" +// - RootDir: Filesystem path for checkpoint files (created if missing) +// - MaxWALSize: Maximum size in bytes for a single host's WAL file; 0 = unlimited (default). +// When exceeded the WAL is force-rotated to prevent unbounded disk growth. type Checkpoints struct { FileFormat string `json:"file-format"` RootDir string `json:"directory"` + MaxWALSize int64 `json:"max-wal-size,omitempty"` } // Debug provides development and profiling options. diff --git a/pkg/metricstore/configSchema.go b/pkg/metricstore/configSchema.go index 00bfc4b7..3aeef4bd 100644 --- a/pkg/metricstore/configSchema.go +++ b/pkg/metricstore/configSchema.go @@ -24,6 +24,11 @@ const configSchema = `{ "directory": { "description": "Path in which the checkpointed files should be placed.", "type": "string" + }, + "max-wal-size": { + "description": "Maximum size in bytes for a single host's WAL file. When exceeded the WAL is force-rotated to prevent unbounded disk growth. Only applies when file-format is 'wal'. 0 means unlimited (default).", + "type": "integer", + "minimum": 0 } } }, diff --git a/pkg/metricstore/metricstore.go b/pkg/metricstore/metricstore.go index 9ba69c55..845472b2 100644 --- a/pkg/metricstore/metricstore.go +++ b/pkg/metricstore/metricstore.go @@ -8,7 +8,7 @@ // // The package organizes metrics in a tree structure (cluster → host → component) and // provides concurrent read/write access to metric data with configurable aggregation strategies. -// Background goroutines handle periodic checkpointing (JSON or Avro format), archiving old data, +// Background goroutines handle periodic checkpointing (JSON or WAL/binary format), archiving old data, // and enforcing retention policies. // // Key features: @@ -16,7 +16,7 @@ // - Hierarchical data organization (selectors) // - Concurrent checkpoint/archive workers // - Support for sum and average aggregation -// - NATS integration for metric ingestion +// - NATS integration for metric ingestion via InfluxDB line protocol package metricstore import ( @@ -113,7 +113,8 @@ type MemoryStore struct { // 6. Optionally subscribes to NATS for real-time metric ingestion // // Parameters: -// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig) +// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig); may be nil to use defaults +// - metrics: Map of metric names to their configurations (frequency and aggregation strategy) // - wg: WaitGroup that will be incremented for each background goroutine started // // The function will call cclog.Fatal on critical errors during initialization. diff --git a/pkg/metricstore/walCheckpoint.go b/pkg/metricstore/walCheckpoint.go index 01de32e9..b7c7ffb2 100644 --- a/pkg/metricstore/walCheckpoint.go +++ b/pkg/metricstore/walCheckpoint.go @@ -99,6 +99,11 @@ var walStagingWg sync.WaitGroup // SendWALMessage from sending on a closed channel (which panics in Go). var walShuttingDown atomic.Bool +// walCheckpointActive is set during binary checkpoint writes. +// While active, SendWALMessage skips sending (returns true) because the +// snapshot captures all in-memory data, making WAL writes redundant. +var walCheckpointActive atomic.Bool + // WALMessage represents a single metric write to be appended to the WAL. // Cluster and Node are NOT stored in the WAL record (inferred from file path). type WALMessage struct { @@ -122,6 +127,7 @@ type walFileState struct { f *os.File w *bufio.Writer dirty bool + size int64 // approximate bytes written (tracked from open + writes) } // walFlushInterval controls how often dirty WAL files are flushed to disk. @@ -145,6 +151,9 @@ func SendWALMessage(msg *WALMessage) bool { if walShardChs == nil || walShuttingDown.Load() { return false } + if walCheckpointActive.Load() { + return true // Data safe in memory; snapshot will capture it + } shard := walShardIndex(msg.Cluster, msg.Node) select { case walShardChs[shard] <- msg: @@ -214,7 +223,11 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { // Write file header magic if file is new (empty). info, err := f.Stat() - if err == nil && info.Size() == 0 { + var fileSize int64 + if err == nil { + fileSize = info.Size() + } + if err == nil && fileSize == 0 { var hdr [4]byte binary.LittleEndian.PutUint32(hdr[:], walFileMagic) if _, err := w.Write(hdr[:]); err != nil { @@ -222,9 +235,10 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { f.Close() return nil } + fileSize = 4 } - ws = &walFileState{f: f, w: w} + ws = &walFileState{f: f, w: w, size: fileSize} hostFiles[hostDir] = ws return ws } @@ -235,9 +249,30 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) { if ws == nil { return } - if err := writeWALRecordDirect(ws.w, msg); err != nil { + + // Enforce max WAL size: force-rotate before writing if limit is exceeded. + // The in-memory store still holds the data; only crash-recovery coverage is lost. + if maxSize := Keys.Checkpoints.MaxWALSize; maxSize > 0 && ws.size >= maxSize { + cclog.Warnf("[METRICSTORE]> WAL: force-rotating %s (size %d >= limit %d)", + hostDir, ws.size, maxSize) + ws.w.Flush() + ws.f.Close() + walPath := path.Join(hostDir, "current.wal") + if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { + cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err) + } + delete(hostFiles, hostDir) + ws = getOrOpenWAL(hostDir) + if ws == nil { + return + } + } + + n, err := writeWALRecordDirect(ws.w, msg) + if err != nil { cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) } + ws.size += int64(n) ws.dirty = true } @@ -331,6 +366,7 @@ func RotateWALFiles(hostDirs []string) { if walShardRotateChs == nil || walShuttingDown.Load() { return } + deadline := time.After(2 * time.Minute) dones := make([]chan struct{}, 0, len(hostDirs)) for _, dir := range hostDirs { done := make(chan struct{}) @@ -338,16 +374,18 @@ func RotateWALFiles(hostDirs []string) { select { case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}: dones = append(dones, done) - default: - // Channel full or goroutine not consuming — skip this host. - cclog.Warnf("[METRICSTORE]> WAL rotation skipped for %s (channel full)", dir) + case <-deadline: + cclog.Warnf("[METRICSTORE]> WAL rotation send timed out, %d of %d hosts remaining", + len(hostDirs)-len(dones), len(hostDirs)) + goto waitDones } } +waitDones: for _, done := range dones { select { case <-done: case <-time.After(30 * time.Second): - cclog.Warn("[METRICSTORE]> WAL rotation timed out, continuing") + cclog.Warn("[METRICSTORE]> WAL rotation completion timed out, continuing") return } } @@ -362,8 +400,9 @@ func walShardIndexFromDir(hostDir string) int { return walShardIndex(cluster, node) } -// RotateWALFiles sends rotation requests for the given host directories -// and blocks until all rotations complete. +// RotateWALFilesAfterShutdown directly removes current.wal files for the given +// host directories. Used after shutdown, when WALStaging goroutines have already +// exited and the channel-based RotateWALFiles is no longer safe to call. func RotateWALFilesAfterShutdown(hostDirs []string) { for _, dir := range hostDirs { walPath := path.Join(dir, "current.wal") @@ -376,7 +415,8 @@ func RotateWALFilesAfterShutdown(hostDirs []string) { // writeWALRecordDirect encodes a WAL record into a contiguous buffer first, // then writes it to the bufio.Writer in a single call. This prevents partial // records in the write buffer if a write error occurs mid-record (e.g. disk full). -func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error { +// Returns the number of bytes written and any error. +func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) (int, error) { // Compute payload and total record size. payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4 for _, s := range msg.Selector { @@ -430,8 +470,8 @@ func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error { binary.LittleEndian.PutUint32(buf[p:p+4], crc) // Single atomic write to the buffered writer. - _, err := w.Write(buf) - return err + n, err := w.Write(buf) + return n, err } // readWALRecord reads one WAL record from the reader. @@ -696,11 +736,6 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string atomic.AddInt32(&errs, 1) } else { atomic.AddInt32(&n, 1) - // Delete WAL immediately after successful snapshot. - walPath := path.Join(wi.hostDir, "current.wal") - if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) { - cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err) - } successMu.Lock() successDirs = append(successDirs, wi.hostDir) successMu.Unlock() diff --git a/web/frontend/src/Logs.root.svelte b/web/frontend/src/Logs.root.svelte index ccadabce..9dfd7578 100644 --- a/web/frontend/src/Logs.root.svelte +++ b/web/frontend/src/Logs.root.svelte @@ -57,7 +57,7 @@ let entries = $state([]); let loading = $state(false); let error = $state(null); - let timer = $state(null); + let timer = null; function levelColor(priority) { if (priority <= 2) return "danger";