mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-30 20:47:31 +02:00
Compare commits
12 Commits
release/v1
...
hotfix
| Author | SHA1 | Date | |
|---|---|---|---|
| 641dc0e3b8 | |||
| b734c1a92a | |||
| c5fe3c5cd9 | |||
| e2910b18b3 | |||
| ed236ec539 | |||
| 82c514b11a | |||
| 66707bbf15 | |||
| fc47b12fed | |||
| 937984d11f | |||
| 3d99aec185 | |||
| 280289185a | |||
| cc3d03bb5b |
38
.golangci.yml
Normal file
38
.golangci.yml
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
version: "2"
|
||||||
|
|
||||||
|
run:
|
||||||
|
timeout: 5m
|
||||||
|
|
||||||
|
formatters:
|
||||||
|
enable:
|
||||||
|
- gofumpt # gofumpt formatter
|
||||||
|
|
||||||
|
settings:
|
||||||
|
gofumpt:
|
||||||
|
module-path: github.com/ClusterCockpit/cc-backend
|
||||||
|
|
||||||
|
linters:
|
||||||
|
enable:
|
||||||
|
- staticcheck # staticcheck = true
|
||||||
|
- unparam # unusedparams = true
|
||||||
|
- nilnil # nilness = true (catches nil returns of nil interface values)
|
||||||
|
- govet # base vet; nilness + unusedwrite via govet analyzers
|
||||||
|
|
||||||
|
settings:
|
||||||
|
staticcheck:
|
||||||
|
checks: ["ST1003"]
|
||||||
|
|
||||||
|
govet:
|
||||||
|
enable:
|
||||||
|
- nilness
|
||||||
|
- unusedwrite
|
||||||
|
|
||||||
|
exclusions:
|
||||||
|
paths:
|
||||||
|
- .git
|
||||||
|
- .vscode
|
||||||
|
- .idea
|
||||||
|
- node_modules
|
||||||
|
- internal/graph/generated # gqlgen generated code
|
||||||
|
- internal/api/docs\.go # swaggo generated code
|
||||||
|
- _test\.go # test files excluded from all linters
|
||||||
10
Makefile
10
Makefile
@@ -36,7 +36,7 @@ SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \
|
|||||||
$(wildcard $(FRONTEND)/src/header/*.svelte) \
|
$(wildcard $(FRONTEND)/src/header/*.svelte) \
|
||||||
$(wildcard $(FRONTEND)/src/job/*.svelte)
|
$(wildcard $(FRONTEND)/src/job/*.svelte)
|
||||||
|
|
||||||
.PHONY: clean distclean test tags frontend swagger graphql $(TARGET)
|
.PHONY: clean distclean fmt lint test tags frontend swagger graphql $(TARGET)
|
||||||
|
|
||||||
.NOTPARALLEL:
|
.NOTPARALLEL:
|
||||||
|
|
||||||
@@ -75,6 +75,14 @@ test:
|
|||||||
@go vet ./...
|
@go vet ./...
|
||||||
@go test ./...
|
@go test ./...
|
||||||
|
|
||||||
|
fmt:
|
||||||
|
$(info ===> FORMAT)
|
||||||
|
@gofumpt -l -w .
|
||||||
|
|
||||||
|
lint:
|
||||||
|
$(info ===> LINT)
|
||||||
|
@golangci-lint run ./...
|
||||||
|
|
||||||
tags:
|
tags:
|
||||||
$(info ===> TAGS)
|
$(info ===> TAGS)
|
||||||
@ctags -R
|
@ctags -R
|
||||||
|
|||||||
20
README.md
20
README.md
@@ -100,6 +100,26 @@ the following targets:
|
|||||||
frontend source files will result in a complete rebuild.
|
frontend source files will result in a complete rebuild.
|
||||||
- `make clean`: Clean go build cache and remove binary.
|
- `make clean`: Clean go build cache and remove binary.
|
||||||
- `make test`: Run the tests that are also run in the GitHub workflow setup.
|
- `make test`: Run the tests that are also run in the GitHub workflow setup.
|
||||||
|
- `make fmt`: Format all Go source files using
|
||||||
|
[gofumpt](https://github.com/mvdan/gofumpt), a stricter superset of `gofmt`.
|
||||||
|
Requires `gofumpt` to be installed (see below).
|
||||||
|
- `make lint`: Run [golangci-lint](https://golangci-lint.run/) with the project
|
||||||
|
configuration in `.golangci.yml`. Requires `golangci-lint` to be installed
|
||||||
|
(see below).
|
||||||
|
|
||||||
|
### Installing development tools
|
||||||
|
|
||||||
|
`gofumpt` and `golangci-lint` are not part of the Go module and must be
|
||||||
|
installed separately:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
# Formatter (gofumpt)
|
||||||
|
go install mvdan.cc/gofumpt@latest
|
||||||
|
|
||||||
|
# Linter (golangci-lint) — use the official install script to get a
|
||||||
|
# pre-built binary; installing via `go install` is not supported upstream.
|
||||||
|
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/HEAD/install.sh | sh -s -- -b $(go env GOPATH)/bin
|
||||||
|
```
|
||||||
|
|
||||||
A common workflow for setting up cc-backend from scratch is:
|
A common workflow for setting up cc-backend from scratch is:
|
||||||
|
|
||||||
|
|||||||
@@ -19,6 +19,16 @@ This is also the default.
|
|||||||
|
|
||||||
### Bug fixes
|
### Bug fixes
|
||||||
|
|
||||||
|
- **WAL not rotated on partial checkpoint failure**: When binary checkpointing
|
||||||
|
failed for some hosts, WAL files for successfully checkpointed hosts were not
|
||||||
|
rotated and the checkpoint timestamp was not advanced. Partial successes now
|
||||||
|
correctly advance the checkpoint and rotate WAL files for completed hosts.
|
||||||
|
- **Unbounded WAL file growth**: If binary checkpointing consistently failed for
|
||||||
|
a host, its `current.wal` file grew without limit until disk exhaustion. A new
|
||||||
|
`max-wal-size` configuration option (in the `checkpoints` block) allows setting
|
||||||
|
a per-host WAL size cap in bytes. When exceeded, the WAL is force-rotated.
|
||||||
|
Defaults to 0 (unlimited) for backward compatibility.
|
||||||
|
|
||||||
- **Doubleranged filter fixes**: Range filters now correctly handle zero as a
|
- **Doubleranged filter fixes**: Range filters now correctly handle zero as a
|
||||||
boundary value. Improved validation and UI text for "more than equal" and
|
boundary value. Improved validation and UI text for "more than equal" and
|
||||||
"less than equal" range selections.
|
"less than equal" range selections.
|
||||||
@@ -50,6 +60,14 @@ This is also the default.
|
|||||||
- **Explicit node state queries in node view**: Node health and scheduler state
|
- **Explicit node state queries in node view**: Node health and scheduler state
|
||||||
are now fetched independently from metric data for fresher status information.
|
are now fetched independently from metric data for fresher status information.
|
||||||
|
|
||||||
|
### New tools
|
||||||
|
|
||||||
|
- **binaryCheckpointReader**: New utility tool (`tools/binaryCheckpointReader`)
|
||||||
|
that reads `.wal` or `.bin` checkpoint files produced by the metricstore
|
||||||
|
WAL/snapshot system and dumps their contents to a human-readable `.txt` file.
|
||||||
|
Useful for debugging and inspecting checkpoint data. Usage:
|
||||||
|
`go run ./tools/binaryCheckpointReader <file.wal|file.bin>`
|
||||||
|
|
||||||
### Logging improvements
|
### Logging improvements
|
||||||
|
|
||||||
- **Reduced tagger log noise**: Missing metrics and expression evaluation errors
|
- **Reduced tagger log noise**: Missing metrics and expression evaluation errors
|
||||||
|
|||||||
49
gopls.json
Normal file
49
gopls.json
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
{
|
||||||
|
"gofumpt": true,
|
||||||
|
"codelenses": {
|
||||||
|
"gc_details": false,
|
||||||
|
"generate": true,
|
||||||
|
"regenerate_cgo": true,
|
||||||
|
"run_govulncheck": true,
|
||||||
|
"test": true,
|
||||||
|
"tidy": true,
|
||||||
|
"upgrade_dependency": true,
|
||||||
|
"vendor": true
|
||||||
|
},
|
||||||
|
"hints": {
|
||||||
|
"assignVariableTypes": true,
|
||||||
|
"compositeLiteralFields": true,
|
||||||
|
"compositeLiteralTypes": true,
|
||||||
|
"constantValues": true,
|
||||||
|
"functionTypeParameters": true,
|
||||||
|
"parameterNames": true,
|
||||||
|
"rangeVariableTypes": true
|
||||||
|
},
|
||||||
|
"analyses": {
|
||||||
|
"nilness": true,
|
||||||
|
"unusedparams": true,
|
||||||
|
"unusedwrite": true,
|
||||||
|
"useany": true
|
||||||
|
},
|
||||||
|
"usePlaceholders": true,
|
||||||
|
"completeUnimported": true,
|
||||||
|
"staticcheck": true,
|
||||||
|
"directoryFilters": [
|
||||||
|
"-.git",
|
||||||
|
"-.vscode",
|
||||||
|
"-.idea",
|
||||||
|
"-.vscode-test",
|
||||||
|
"-node_modules",
|
||||||
|
"-web/frontend",
|
||||||
|
"-web/templates",
|
||||||
|
"-var",
|
||||||
|
"-api",
|
||||||
|
"-configs",
|
||||||
|
"-init",
|
||||||
|
"-internal/repository/migrations",
|
||||||
|
"-internal/repository/testdata",
|
||||||
|
"-internal/importer/testdata",
|
||||||
|
"-pkg/archive/testdata"
|
||||||
|
],
|
||||||
|
"semanticTokens": true
|
||||||
|
}
|
||||||
@@ -164,12 +164,17 @@ func (auth *Authentication) AuthViaSession(
|
|||||||
return nil, errors.New("invalid session data")
|
return nil, errors.New("invalid session data")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
authSourceInt, ok := session.Values["authSource"].(int)
|
||||||
|
if !ok {
|
||||||
|
authSourceInt = int(schema.AuthViaLocalPassword)
|
||||||
|
}
|
||||||
|
|
||||||
return &schema.User{
|
return &schema.User{
|
||||||
Username: username,
|
Username: username,
|
||||||
Projects: projects,
|
Projects: projects,
|
||||||
Roles: roles,
|
Roles: roles,
|
||||||
AuthType: schema.AuthSession,
|
AuthType: schema.AuthSession,
|
||||||
AuthSource: -1,
|
AuthSource: schema.AuthSource(authSourceInt),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -319,10 +324,11 @@ func (auth *Authentication) SaveSession(rw http.ResponseWriter, r *http.Request,
|
|||||||
}
|
}
|
||||||
session.Options.Secure = false
|
session.Options.Secure = false
|
||||||
}
|
}
|
||||||
session.Options.SameSite = http.SameSiteStrictMode
|
session.Options.SameSite = http.SameSiteLaxMode
|
||||||
session.Values["username"] = user.Username
|
session.Values["username"] = user.Username
|
||||||
session.Values["projects"] = user.Projects
|
session.Values["projects"] = user.Projects
|
||||||
session.Values["roles"] = user.Roles
|
session.Values["roles"] = user.Roles
|
||||||
|
session.Values["authSource"] = int(user.AuthSource)
|
||||||
if err := auth.sessionStore.Save(r, rw, session); err != nil {
|
if err := auth.sessionStore.Save(r, rw, session); err != nil {
|
||||||
cclog.Warnf("session save failed: %s", err.Error())
|
cclog.Warnf("session save failed: %s", err.Error())
|
||||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||||
|
|||||||
@@ -1072,10 +1072,12 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
|
|||||||
// SubCluster returns generated.SubClusterResolver implementation.
|
// SubCluster returns generated.SubClusterResolver implementation.
|
||||||
func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} }
|
func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} }
|
||||||
|
|
||||||
type clusterResolver struct{ *Resolver }
|
type (
|
||||||
type jobResolver struct{ *Resolver }
|
clusterResolver struct{ *Resolver }
|
||||||
type metricValueResolver struct{ *Resolver }
|
jobResolver struct{ *Resolver }
|
||||||
type mutationResolver struct{ *Resolver }
|
metricValueResolver struct{ *Resolver }
|
||||||
type nodeResolver struct{ *Resolver }
|
mutationResolver struct{ *Resolver }
|
||||||
type queryResolver struct{ *Resolver }
|
nodeResolver struct{ *Resolver }
|
||||||
type subClusterResolver struct{ *Resolver }
|
queryResolver struct{ *Resolver }
|
||||||
|
subClusterResolver struct{ *Resolver }
|
||||||
|
)
|
||||||
|
|||||||
@@ -236,4 +236,3 @@ func (ccms *CCMetricStore) buildNodeQueries(
|
|||||||
|
|
||||||
return queries, assignedScope, nil
|
return queries, assignedScope, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -63,10 +63,10 @@ func DefaultConfig() *RepositoryConfig {
|
|||||||
MaxIdleConnections: 4,
|
MaxIdleConnections: 4,
|
||||||
ConnectionMaxLifetime: time.Hour,
|
ConnectionMaxLifetime: time.Hour,
|
||||||
ConnectionMaxIdleTime: 10 * time.Minute,
|
ConnectionMaxIdleTime: 10 * time.Minute,
|
||||||
MinRunningJobDuration: 600, // 10 minutes
|
MinRunningJobDuration: 600, // 10 minutes
|
||||||
DbCacheSizeMB: 2048, // 2GB per connection
|
DbCacheSizeMB: 2048, // 2GB per connection
|
||||||
DbSoftHeapLimitMB: 16384, // 16GB process-wide
|
DbSoftHeapLimitMB: 16384, // 16GB process-wide
|
||||||
BusyTimeoutMs: 60000, // 60 seconds
|
BusyTimeoutMs: 60000, // 60 seconds
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -133,10 +133,10 @@ func (pt *prefixedTarget) WriteFile(name string, data []byte) error {
|
|||||||
// ClusterAwareParquetWriter organizes Parquet output by cluster.
|
// ClusterAwareParquetWriter organizes Parquet output by cluster.
|
||||||
// Each cluster gets its own subdirectory with a cluster.json config file.
|
// Each cluster gets its own subdirectory with a cluster.json config file.
|
||||||
type ClusterAwareParquetWriter struct {
|
type ClusterAwareParquetWriter struct {
|
||||||
target ParquetTarget
|
target ParquetTarget
|
||||||
maxSizeMB int
|
maxSizeMB int
|
||||||
writers map[string]*ParquetWriter
|
writers map[string]*ParquetWriter
|
||||||
clusterCfgs map[string]*schema.Cluster
|
clusterCfgs map[string]*schema.Cluster
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClusterAwareParquetWriter creates a writer that routes jobs to per-cluster ParquetWriters.
|
// NewClusterAwareParquetWriter creates a writer that routes jobs to per-cluster ParquetWriters.
|
||||||
|
|||||||
@@ -3,6 +3,12 @@
|
|||||||
// Use of this source code is governed by a MIT-style
|
// Use of this source code is governed by a MIT-style
|
||||||
// license that can be found in the LICENSE file.
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
// This file implements the cleanup (archiving or deletion) of old checkpoint files.
|
||||||
|
//
|
||||||
|
// The CleanUp worker runs on a timer equal to RetentionInMemory. In "archive" mode
|
||||||
|
// it converts checkpoint files older than the retention window into per-cluster
|
||||||
|
// Parquet files and then deletes the originals. In "delete" mode it simply removes
|
||||||
|
// old checkpoint files.
|
||||||
package metricstore
|
package metricstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -19,8 +25,12 @@ import (
|
|||||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Worker for either Archiving or Deleting files
|
// CleanUp starts a background worker that periodically removes or archives
|
||||||
|
// checkpoint files older than the configured retention window.
|
||||||
|
//
|
||||||
|
// In "archive" mode, old checkpoint files are converted to Parquet and stored
|
||||||
|
// under Keys.Cleanup.RootDir. In "delete" mode they are simply removed.
|
||||||
|
// The cleanup interval equals Keys.RetentionInMemory.
|
||||||
func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
|
func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
|
||||||
if Keys.Cleanup.Mode == "archive" {
|
if Keys.Cleanup.Mode == "archive" {
|
||||||
cclog.Info("[METRICSTORE]> enable archive cleanup to parquet")
|
cclog.Info("[METRICSTORE]> enable archive cleanup to parquet")
|
||||||
|
|||||||
@@ -86,11 +86,12 @@ var (
|
|||||||
|
|
||||||
// Checkpointing starts a background worker that periodically saves metric data to disk.
|
// Checkpointing starts a background worker that periodically saves metric data to disk.
|
||||||
//
|
//
|
||||||
// Checkpoints are written every 12 hours (hardcoded).
|
// The checkpoint interval is read from Keys.CheckpointInterval (default: 12 hours).
|
||||||
//
|
//
|
||||||
// Format behaviour:
|
// Format behaviour:
|
||||||
// - "json": Periodic checkpointing every checkpointInterval
|
// - "json": Writes a JSON snapshot file per host every interval
|
||||||
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
|
// - "wal": Writes a binary snapshot file per host every interval, then rotates
|
||||||
|
// the current.wal files for all successfully checkpointed hosts
|
||||||
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
||||||
lastCheckpointMu.Lock()
|
lastCheckpointMu.Lock()
|
||||||
lastCheckpoint = time.Now()
|
lastCheckpoint = time.Now()
|
||||||
@@ -99,7 +100,6 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
ms := GetMemoryStore()
|
ms := GetMemoryStore()
|
||||||
|
|
||||||
wg.Go(func() {
|
wg.Go(func() {
|
||||||
|
|
||||||
d := 12 * time.Hour // default checkpoint interval
|
d := 12 * time.Hour // default checkpoint interval
|
||||||
if Keys.CheckpointInterval != "" {
|
if Keys.CheckpointInterval != "" {
|
||||||
parsed, err := time.ParseDuration(Keys.CheckpointInterval)
|
parsed, err := time.ParseDuration(Keys.CheckpointInterval)
|
||||||
@@ -126,10 +126,15 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339))
|
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339))
|
||||||
|
|
||||||
if Keys.Checkpoints.FileFormat == "wal" {
|
if Keys.Checkpoints.FileFormat == "wal" {
|
||||||
|
// Pause WAL writes: the binary snapshot captures all in-memory
|
||||||
|
// data, so WAL records written during checkpoint are redundant
|
||||||
|
// and would be deleted during rotation anyway.
|
||||||
|
walCheckpointActive.Store(true)
|
||||||
n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
|
n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error())
|
cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error())
|
||||||
} else {
|
}
|
||||||
|
if n > 0 {
|
||||||
cclog.Infof("[METRICSTORE]> done: %d binary snapshot files created", n)
|
cclog.Infof("[METRICSTORE]> done: %d binary snapshot files created", n)
|
||||||
lastCheckpointMu.Lock()
|
lastCheckpointMu.Lock()
|
||||||
lastCheckpoint = now
|
lastCheckpoint = now
|
||||||
@@ -137,6 +142,8 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
// Rotate WAL files for successfully checkpointed hosts.
|
// Rotate WAL files for successfully checkpointed hosts.
|
||||||
RotateWALFiles(hostDirs)
|
RotateWALFiles(hostDirs)
|
||||||
}
|
}
|
||||||
|
walCheckpointActive.Store(false)
|
||||||
|
walDropped.Store(0)
|
||||||
} else {
|
} else {
|
||||||
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
|
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -59,11 +59,14 @@ const (
|
|||||||
// Checkpoints configures periodic persistence of in-memory metric data.
|
// Checkpoints configures periodic persistence of in-memory metric data.
|
||||||
//
|
//
|
||||||
// Fields:
|
// Fields:
|
||||||
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
|
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
|
||||||
// - RootDir: Filesystem path for checkpoint files (created if missing)
|
// - RootDir: Filesystem path for checkpoint files (created if missing)
|
||||||
|
// - MaxWALSize: Maximum size in bytes for a single host's WAL file; 0 = unlimited (default).
|
||||||
|
// When exceeded the WAL is force-rotated to prevent unbounded disk growth.
|
||||||
type Checkpoints struct {
|
type Checkpoints struct {
|
||||||
FileFormat string `json:"file-format"`
|
FileFormat string `json:"file-format"`
|
||||||
RootDir string `json:"directory"`
|
RootDir string `json:"directory"`
|
||||||
|
MaxWALSize int64 `json:"max-wal-size,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Debug provides development and profiling options.
|
// Debug provides development and profiling options.
|
||||||
|
|||||||
@@ -24,6 +24,11 @@ const configSchema = `{
|
|||||||
"directory": {
|
"directory": {
|
||||||
"description": "Path in which the checkpointed files should be placed.",
|
"description": "Path in which the checkpointed files should be placed.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
|
},
|
||||||
|
"max-wal-size": {
|
||||||
|
"description": "Maximum size in bytes for a single host's WAL file. When exceeded the WAL is force-rotated to prevent unbounded disk growth. Only applies when file-format is 'wal'. 0 means unlimited (default).",
|
||||||
|
"type": "integer",
|
||||||
|
"minimum": 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
//
|
//
|
||||||
// The package organizes metrics in a tree structure (cluster → host → component) and
|
// The package organizes metrics in a tree structure (cluster → host → component) and
|
||||||
// provides concurrent read/write access to metric data with configurable aggregation strategies.
|
// provides concurrent read/write access to metric data with configurable aggregation strategies.
|
||||||
// Background goroutines handle periodic checkpointing (JSON or Avro format), archiving old data,
|
// Background goroutines handle periodic checkpointing (JSON or WAL/binary format), archiving old data,
|
||||||
// and enforcing retention policies.
|
// and enforcing retention policies.
|
||||||
//
|
//
|
||||||
// Key features:
|
// Key features:
|
||||||
@@ -16,7 +16,7 @@
|
|||||||
// - Hierarchical data organization (selectors)
|
// - Hierarchical data organization (selectors)
|
||||||
// - Concurrent checkpoint/archive workers
|
// - Concurrent checkpoint/archive workers
|
||||||
// - Support for sum and average aggregation
|
// - Support for sum and average aggregation
|
||||||
// - NATS integration for metric ingestion
|
// - NATS integration for metric ingestion via InfluxDB line protocol
|
||||||
package metricstore
|
package metricstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -113,7 +113,8 @@ type MemoryStore struct {
|
|||||||
// 6. Optionally subscribes to NATS for real-time metric ingestion
|
// 6. Optionally subscribes to NATS for real-time metric ingestion
|
||||||
//
|
//
|
||||||
// Parameters:
|
// Parameters:
|
||||||
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig)
|
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig); may be nil to use defaults
|
||||||
|
// - metrics: Map of metric names to their configurations (frequency and aggregation strategy)
|
||||||
// - wg: WaitGroup that will be incremented for each background goroutine started
|
// - wg: WaitGroup that will be incremented for each background goroutine started
|
||||||
//
|
//
|
||||||
// The function will call cclog.Fatal on critical errors during initialization.
|
// The function will call cclog.Fatal on critical errors during initialization.
|
||||||
|
|||||||
@@ -99,6 +99,11 @@ var walStagingWg sync.WaitGroup
|
|||||||
// SendWALMessage from sending on a closed channel (which panics in Go).
|
// SendWALMessage from sending on a closed channel (which panics in Go).
|
||||||
var walShuttingDown atomic.Bool
|
var walShuttingDown atomic.Bool
|
||||||
|
|
||||||
|
// walCheckpointActive is set during binary checkpoint writes.
|
||||||
|
// While active, SendWALMessage skips sending (returns true) because the
|
||||||
|
// snapshot captures all in-memory data, making WAL writes redundant.
|
||||||
|
var walCheckpointActive atomic.Bool
|
||||||
|
|
||||||
// WALMessage represents a single metric write to be appended to the WAL.
|
// WALMessage represents a single metric write to be appended to the WAL.
|
||||||
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
|
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
|
||||||
type WALMessage struct {
|
type WALMessage struct {
|
||||||
@@ -122,6 +127,7 @@ type walFileState struct {
|
|||||||
f *os.File
|
f *os.File
|
||||||
w *bufio.Writer
|
w *bufio.Writer
|
||||||
dirty bool
|
dirty bool
|
||||||
|
size int64 // approximate bytes written (tracked from open + writes)
|
||||||
}
|
}
|
||||||
|
|
||||||
// walFlushInterval controls how often dirty WAL files are flushed to disk.
|
// walFlushInterval controls how often dirty WAL files are flushed to disk.
|
||||||
@@ -145,6 +151,9 @@ func SendWALMessage(msg *WALMessage) bool {
|
|||||||
if walShardChs == nil || walShuttingDown.Load() {
|
if walShardChs == nil || walShuttingDown.Load() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
if walCheckpointActive.Load() {
|
||||||
|
return true // Data safe in memory; snapshot will capture it
|
||||||
|
}
|
||||||
shard := walShardIndex(msg.Cluster, msg.Node)
|
shard := walShardIndex(msg.Cluster, msg.Node)
|
||||||
select {
|
select {
|
||||||
case walShardChs[shard] <- msg:
|
case walShardChs[shard] <- msg:
|
||||||
@@ -214,7 +223,11 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
|
|
||||||
// Write file header magic if file is new (empty).
|
// Write file header magic if file is new (empty).
|
||||||
info, err := f.Stat()
|
info, err := f.Stat()
|
||||||
if err == nil && info.Size() == 0 {
|
var fileSize int64
|
||||||
|
if err == nil {
|
||||||
|
fileSize = info.Size()
|
||||||
|
}
|
||||||
|
if err == nil && fileSize == 0 {
|
||||||
var hdr [4]byte
|
var hdr [4]byte
|
||||||
binary.LittleEndian.PutUint32(hdr[:], walFileMagic)
|
binary.LittleEndian.PutUint32(hdr[:], walFileMagic)
|
||||||
if _, err := w.Write(hdr[:]); err != nil {
|
if _, err := w.Write(hdr[:]); err != nil {
|
||||||
@@ -222,9 +235,10 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
f.Close()
|
f.Close()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
fileSize = 4
|
||||||
}
|
}
|
||||||
|
|
||||||
ws = &walFileState{f: f, w: w}
|
ws = &walFileState{f: f, w: w, size: fileSize}
|
||||||
hostFiles[hostDir] = ws
|
hostFiles[hostDir] = ws
|
||||||
return ws
|
return ws
|
||||||
}
|
}
|
||||||
@@ -235,9 +249,30 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
if ws == nil {
|
if ws == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := writeWALRecordDirect(ws.w, msg); err != nil {
|
|
||||||
|
// Enforce max WAL size: force-rotate before writing if limit is exceeded.
|
||||||
|
// The in-memory store still holds the data; only crash-recovery coverage is lost.
|
||||||
|
if maxSize := Keys.Checkpoints.MaxWALSize; maxSize > 0 && ws.size >= maxSize {
|
||||||
|
cclog.Warnf("[METRICSTORE]> WAL: force-rotating %s (size %d >= limit %d)",
|
||||||
|
hostDir, ws.size, maxSize)
|
||||||
|
ws.w.Flush()
|
||||||
|
ws.f.Close()
|
||||||
|
walPath := path.Join(hostDir, "current.wal")
|
||||||
|
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
|
||||||
|
cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err)
|
||||||
|
}
|
||||||
|
delete(hostFiles, hostDir)
|
||||||
|
ws = getOrOpenWAL(hostDir)
|
||||||
|
if ws == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := writeWALRecordDirect(ws.w, msg)
|
||||||
|
if err != nil {
|
||||||
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
|
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
|
||||||
}
|
}
|
||||||
|
ws.size += int64(n)
|
||||||
ws.dirty = true
|
ws.dirty = true
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -331,6 +366,7 @@ func RotateWALFiles(hostDirs []string) {
|
|||||||
if walShardRotateChs == nil || walShuttingDown.Load() {
|
if walShardRotateChs == nil || walShuttingDown.Load() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
deadline := time.After(2 * time.Minute)
|
||||||
dones := make([]chan struct{}, 0, len(hostDirs))
|
dones := make([]chan struct{}, 0, len(hostDirs))
|
||||||
for _, dir := range hostDirs {
|
for _, dir := range hostDirs {
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
@@ -338,16 +374,18 @@ func RotateWALFiles(hostDirs []string) {
|
|||||||
select {
|
select {
|
||||||
case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}:
|
case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}:
|
||||||
dones = append(dones, done)
|
dones = append(dones, done)
|
||||||
default:
|
case <-deadline:
|
||||||
// Channel full or goroutine not consuming — skip this host.
|
cclog.Warnf("[METRICSTORE]> WAL rotation send timed out, %d of %d hosts remaining",
|
||||||
cclog.Warnf("[METRICSTORE]> WAL rotation skipped for %s (channel full)", dir)
|
len(hostDirs)-len(dones), len(hostDirs))
|
||||||
|
goto waitDones
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
waitDones:
|
||||||
for _, done := range dones {
|
for _, done := range dones {
|
||||||
select {
|
select {
|
||||||
case <-done:
|
case <-done:
|
||||||
case <-time.After(30 * time.Second):
|
case <-time.After(30 * time.Second):
|
||||||
cclog.Warn("[METRICSTORE]> WAL rotation timed out, continuing")
|
cclog.Warn("[METRICSTORE]> WAL rotation completion timed out, continuing")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -362,8 +400,9 @@ func walShardIndexFromDir(hostDir string) int {
|
|||||||
return walShardIndex(cluster, node)
|
return walShardIndex(cluster, node)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RotateWALFiles sends rotation requests for the given host directories
|
// RotateWALFilesAfterShutdown directly removes current.wal files for the given
|
||||||
// and blocks until all rotations complete.
|
// host directories. Used after shutdown, when WALStaging goroutines have already
|
||||||
|
// exited and the channel-based RotateWALFiles is no longer safe to call.
|
||||||
func RotateWALFilesAfterShutdown(hostDirs []string) {
|
func RotateWALFilesAfterShutdown(hostDirs []string) {
|
||||||
for _, dir := range hostDirs {
|
for _, dir := range hostDirs {
|
||||||
walPath := path.Join(dir, "current.wal")
|
walPath := path.Join(dir, "current.wal")
|
||||||
@@ -376,7 +415,8 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
|
|||||||
// writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
|
// writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
|
||||||
// then writes it to the bufio.Writer in a single call. This prevents partial
|
// then writes it to the bufio.Writer in a single call. This prevents partial
|
||||||
// records in the write buffer if a write error occurs mid-record (e.g. disk full).
|
// records in the write buffer if a write error occurs mid-record (e.g. disk full).
|
||||||
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
|
// Returns the number of bytes written and any error.
|
||||||
|
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) (int, error) {
|
||||||
// Compute payload and total record size.
|
// Compute payload and total record size.
|
||||||
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
|
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
|
||||||
for _, s := range msg.Selector {
|
for _, s := range msg.Selector {
|
||||||
@@ -430,8 +470,8 @@ func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
|
|||||||
binary.LittleEndian.PutUint32(buf[p:p+4], crc)
|
binary.LittleEndian.PutUint32(buf[p:p+4], crc)
|
||||||
|
|
||||||
// Single atomic write to the buffered writer.
|
// Single atomic write to the buffered writer.
|
||||||
_, err := w.Write(buf)
|
n, err := w.Write(buf)
|
||||||
return err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// readWALRecord reads one WAL record from the reader.
|
// readWALRecord reads one WAL record from the reader.
|
||||||
@@ -696,11 +736,6 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
|
|||||||
atomic.AddInt32(&errs, 1)
|
atomic.AddInt32(&errs, 1)
|
||||||
} else {
|
} else {
|
||||||
atomic.AddInt32(&n, 1)
|
atomic.AddInt32(&n, 1)
|
||||||
// Delete WAL immediately after successful snapshot.
|
|
||||||
walPath := path.Join(wi.hostDir, "current.wal")
|
|
||||||
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
|
|
||||||
cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err)
|
|
||||||
}
|
|
||||||
successMu.Lock()
|
successMu.Lock()
|
||||||
successDirs = append(successDirs, wi.hostDir)
|
successDirs = append(successDirs, wi.hostDir)
|
||||||
successMu.Unlock()
|
successMu.Unlock()
|
||||||
|
|||||||
@@ -57,7 +57,7 @@
|
|||||||
let entries = $state([]);
|
let entries = $state([]);
|
||||||
let loading = $state(false);
|
let loading = $state(false);
|
||||||
let error = $state(null);
|
let error = $state(null);
|
||||||
let timer = $state(null);
|
let timer = null;
|
||||||
|
|
||||||
function levelColor(priority) {
|
function levelColor(priority) {
|
||||||
if (priority <= 2) return "danger";
|
if (priority <= 2) return "danger";
|
||||||
|
|||||||
Reference in New Issue
Block a user