mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-30 20:47:31 +02:00
Compare commits
12 Commits
release/v1
...
hotfix
| Author | SHA1 | Date | |
|---|---|---|---|
| 641dc0e3b8 | |||
| b734c1a92a | |||
| c5fe3c5cd9 | |||
| e2910b18b3 | |||
| ed236ec539 | |||
| 82c514b11a | |||
| 66707bbf15 | |||
| fc47b12fed | |||
| 937984d11f | |||
| 3d99aec185 | |||
| 280289185a | |||
| cc3d03bb5b |
38
.golangci.yml
Normal file
38
.golangci.yml
Normal file
@@ -0,0 +1,38 @@
|
||||
version: "2"
|
||||
|
||||
run:
|
||||
timeout: 5m
|
||||
|
||||
formatters:
|
||||
enable:
|
||||
- gofumpt # gofumpt formatter
|
||||
|
||||
settings:
|
||||
gofumpt:
|
||||
module-path: github.com/ClusterCockpit/cc-backend
|
||||
|
||||
linters:
|
||||
enable:
|
||||
- staticcheck # staticcheck = true
|
||||
- unparam # unusedparams = true
|
||||
- nilnil # nilness = true (catches nil returns of nil interface values)
|
||||
- govet # base vet; nilness + unusedwrite via govet analyzers
|
||||
|
||||
settings:
|
||||
staticcheck:
|
||||
checks: ["ST1003"]
|
||||
|
||||
govet:
|
||||
enable:
|
||||
- nilness
|
||||
- unusedwrite
|
||||
|
||||
exclusions:
|
||||
paths:
|
||||
- .git
|
||||
- .vscode
|
||||
- .idea
|
||||
- node_modules
|
||||
- internal/graph/generated # gqlgen generated code
|
||||
- internal/api/docs\.go # swaggo generated code
|
||||
- _test\.go # test files excluded from all linters
|
||||
10
Makefile
10
Makefile
@@ -36,7 +36,7 @@ SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \
|
||||
$(wildcard $(FRONTEND)/src/header/*.svelte) \
|
||||
$(wildcard $(FRONTEND)/src/job/*.svelte)
|
||||
|
||||
.PHONY: clean distclean test tags frontend swagger graphql $(TARGET)
|
||||
.PHONY: clean distclean fmt lint test tags frontend swagger graphql $(TARGET)
|
||||
|
||||
.NOTPARALLEL:
|
||||
|
||||
@@ -75,6 +75,14 @@ test:
|
||||
@go vet ./...
|
||||
@go test ./...
|
||||
|
||||
fmt:
|
||||
$(info ===> FORMAT)
|
||||
@gofumpt -l -w .
|
||||
|
||||
lint:
|
||||
$(info ===> LINT)
|
||||
@golangci-lint run ./...
|
||||
|
||||
tags:
|
||||
$(info ===> TAGS)
|
||||
@ctags -R
|
||||
|
||||
20
README.md
20
README.md
@@ -100,6 +100,26 @@ the following targets:
|
||||
frontend source files will result in a complete rebuild.
|
||||
- `make clean`: Clean go build cache and remove binary.
|
||||
- `make test`: Run the tests that are also run in the GitHub workflow setup.
|
||||
- `make fmt`: Format all Go source files using
|
||||
[gofumpt](https://github.com/mvdan/gofumpt), a stricter superset of `gofmt`.
|
||||
Requires `gofumpt` to be installed (see below).
|
||||
- `make lint`: Run [golangci-lint](https://golangci-lint.run/) with the project
|
||||
configuration in `.golangci.yml`. Requires `golangci-lint` to be installed
|
||||
(see below).
|
||||
|
||||
### Installing development tools
|
||||
|
||||
`gofumpt` and `golangci-lint` are not part of the Go module and must be
|
||||
installed separately:
|
||||
|
||||
```sh
|
||||
# Formatter (gofumpt)
|
||||
go install mvdan.cc/gofumpt@latest
|
||||
|
||||
# Linter (golangci-lint) — use the official install script to get a
|
||||
# pre-built binary; installing via `go install` is not supported upstream.
|
||||
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/HEAD/install.sh | sh -s -- -b $(go env GOPATH)/bin
|
||||
```
|
||||
|
||||
A common workflow for setting up cc-backend from scratch is:
|
||||
|
||||
|
||||
@@ -19,6 +19,16 @@ This is also the default.
|
||||
|
||||
### Bug fixes
|
||||
|
||||
- **WAL not rotated on partial checkpoint failure**: When binary checkpointing
|
||||
failed for some hosts, WAL files for successfully checkpointed hosts were not
|
||||
rotated and the checkpoint timestamp was not advanced. Partial successes now
|
||||
correctly advance the checkpoint and rotate WAL files for completed hosts.
|
||||
- **Unbounded WAL file growth**: If binary checkpointing consistently failed for
|
||||
a host, its `current.wal` file grew without limit until disk exhaustion. A new
|
||||
`max-wal-size` configuration option (in the `checkpoints` block) allows setting
|
||||
a per-host WAL size cap in bytes. When exceeded, the WAL is force-rotated.
|
||||
Defaults to 0 (unlimited) for backward compatibility.
|
||||
|
||||
- **Doubleranged filter fixes**: Range filters now correctly handle zero as a
|
||||
boundary value. Improved validation and UI text for "more than equal" and
|
||||
"less than equal" range selections.
|
||||
@@ -50,6 +60,14 @@ This is also the default.
|
||||
- **Explicit node state queries in node view**: Node health and scheduler state
|
||||
are now fetched independently from metric data for fresher status information.
|
||||
|
||||
### New tools
|
||||
|
||||
- **binaryCheckpointReader**: New utility tool (`tools/binaryCheckpointReader`)
|
||||
that reads `.wal` or `.bin` checkpoint files produced by the metricstore
|
||||
WAL/snapshot system and dumps their contents to a human-readable `.txt` file.
|
||||
Useful for debugging and inspecting checkpoint data. Usage:
|
||||
`go run ./tools/binaryCheckpointReader <file.wal|file.bin>`
|
||||
|
||||
### Logging improvements
|
||||
|
||||
- **Reduced tagger log noise**: Missing metrics and expression evaluation errors
|
||||
|
||||
49
gopls.json
Normal file
49
gopls.json
Normal file
@@ -0,0 +1,49 @@
|
||||
{
|
||||
"gofumpt": true,
|
||||
"codelenses": {
|
||||
"gc_details": false,
|
||||
"generate": true,
|
||||
"regenerate_cgo": true,
|
||||
"run_govulncheck": true,
|
||||
"test": true,
|
||||
"tidy": true,
|
||||
"upgrade_dependency": true,
|
||||
"vendor": true
|
||||
},
|
||||
"hints": {
|
||||
"assignVariableTypes": true,
|
||||
"compositeLiteralFields": true,
|
||||
"compositeLiteralTypes": true,
|
||||
"constantValues": true,
|
||||
"functionTypeParameters": true,
|
||||
"parameterNames": true,
|
||||
"rangeVariableTypes": true
|
||||
},
|
||||
"analyses": {
|
||||
"nilness": true,
|
||||
"unusedparams": true,
|
||||
"unusedwrite": true,
|
||||
"useany": true
|
||||
},
|
||||
"usePlaceholders": true,
|
||||
"completeUnimported": true,
|
||||
"staticcheck": true,
|
||||
"directoryFilters": [
|
||||
"-.git",
|
||||
"-.vscode",
|
||||
"-.idea",
|
||||
"-.vscode-test",
|
||||
"-node_modules",
|
||||
"-web/frontend",
|
||||
"-web/templates",
|
||||
"-var",
|
||||
"-api",
|
||||
"-configs",
|
||||
"-init",
|
||||
"-internal/repository/migrations",
|
||||
"-internal/repository/testdata",
|
||||
"-internal/importer/testdata",
|
||||
"-pkg/archive/testdata"
|
||||
],
|
||||
"semanticTokens": true
|
||||
}
|
||||
@@ -164,12 +164,17 @@ func (auth *Authentication) AuthViaSession(
|
||||
return nil, errors.New("invalid session data")
|
||||
}
|
||||
|
||||
authSourceInt, ok := session.Values["authSource"].(int)
|
||||
if !ok {
|
||||
authSourceInt = int(schema.AuthViaLocalPassword)
|
||||
}
|
||||
|
||||
return &schema.User{
|
||||
Username: username,
|
||||
Projects: projects,
|
||||
Roles: roles,
|
||||
AuthType: schema.AuthSession,
|
||||
AuthSource: -1,
|
||||
AuthSource: schema.AuthSource(authSourceInt),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -319,10 +324,11 @@ func (auth *Authentication) SaveSession(rw http.ResponseWriter, r *http.Request,
|
||||
}
|
||||
session.Options.Secure = false
|
||||
}
|
||||
session.Options.SameSite = http.SameSiteStrictMode
|
||||
session.Options.SameSite = http.SameSiteLaxMode
|
||||
session.Values["username"] = user.Username
|
||||
session.Values["projects"] = user.Projects
|
||||
session.Values["roles"] = user.Roles
|
||||
session.Values["authSource"] = int(user.AuthSource)
|
||||
if err := auth.sessionStore.Save(r, rw, session); err != nil {
|
||||
cclog.Warnf("session save failed: %s", err.Error())
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
|
||||
@@ -1072,10 +1072,12 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
|
||||
// SubCluster returns generated.SubClusterResolver implementation.
|
||||
func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} }
|
||||
|
||||
type clusterResolver struct{ *Resolver }
|
||||
type jobResolver struct{ *Resolver }
|
||||
type metricValueResolver struct{ *Resolver }
|
||||
type mutationResolver struct{ *Resolver }
|
||||
type nodeResolver struct{ *Resolver }
|
||||
type queryResolver struct{ *Resolver }
|
||||
type subClusterResolver struct{ *Resolver }
|
||||
type (
|
||||
clusterResolver struct{ *Resolver }
|
||||
jobResolver struct{ *Resolver }
|
||||
metricValueResolver struct{ *Resolver }
|
||||
mutationResolver struct{ *Resolver }
|
||||
nodeResolver struct{ *Resolver }
|
||||
queryResolver struct{ *Resolver }
|
||||
subClusterResolver struct{ *Resolver }
|
||||
)
|
||||
|
||||
@@ -236,4 +236,3 @@ func (ccms *CCMetricStore) buildNodeQueries(
|
||||
|
||||
return queries, assignedScope, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,12 @@
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// This file implements the cleanup (archiving or deletion) of old checkpoint files.
|
||||
//
|
||||
// The CleanUp worker runs on a timer equal to RetentionInMemory. In "archive" mode
|
||||
// it converts checkpoint files older than the retention window into per-cluster
|
||||
// Parquet files and then deletes the originals. In "delete" mode it simply removes
|
||||
// old checkpoint files.
|
||||
package metricstore
|
||||
|
||||
import (
|
||||
@@ -19,8 +25,12 @@ import (
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
)
|
||||
|
||||
// Worker for either Archiving or Deleting files
|
||||
|
||||
// CleanUp starts a background worker that periodically removes or archives
|
||||
// checkpoint files older than the configured retention window.
|
||||
//
|
||||
// In "archive" mode, old checkpoint files are converted to Parquet and stored
|
||||
// under Keys.Cleanup.RootDir. In "delete" mode they are simply removed.
|
||||
// The cleanup interval equals Keys.RetentionInMemory.
|
||||
func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
|
||||
if Keys.Cleanup.Mode == "archive" {
|
||||
cclog.Info("[METRICSTORE]> enable archive cleanup to parquet")
|
||||
|
||||
@@ -86,11 +86,12 @@ var (
|
||||
|
||||
// Checkpointing starts a background worker that periodically saves metric data to disk.
|
||||
//
|
||||
// Checkpoints are written every 12 hours (hardcoded).
|
||||
// The checkpoint interval is read from Keys.CheckpointInterval (default: 12 hours).
|
||||
//
|
||||
// Format behaviour:
|
||||
// - "json": Periodic checkpointing every checkpointInterval
|
||||
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
|
||||
// - "json": Writes a JSON snapshot file per host every interval
|
||||
// - "wal": Writes a binary snapshot file per host every interval, then rotates
|
||||
// the current.wal files for all successfully checkpointed hosts
|
||||
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
||||
lastCheckpointMu.Lock()
|
||||
lastCheckpoint = time.Now()
|
||||
@@ -99,7 +100,6 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
||||
ms := GetMemoryStore()
|
||||
|
||||
wg.Go(func() {
|
||||
|
||||
d := 12 * time.Hour // default checkpoint interval
|
||||
if Keys.CheckpointInterval != "" {
|
||||
parsed, err := time.ParseDuration(Keys.CheckpointInterval)
|
||||
@@ -126,10 +126,15 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
||||
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", from.Format(time.RFC3339))
|
||||
|
||||
if Keys.Checkpoints.FileFormat == "wal" {
|
||||
// Pause WAL writes: the binary snapshot captures all in-memory
|
||||
// data, so WAL records written during checkpoint are redundant
|
||||
// and would be deleted during rotation anyway.
|
||||
walCheckpointActive.Store(true)
|
||||
n, hostDirs, err := ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
|
||||
if err != nil {
|
||||
cclog.Errorf("[METRICSTORE]> binary checkpointing failed: %s", err.Error())
|
||||
} else {
|
||||
}
|
||||
if n > 0 {
|
||||
cclog.Infof("[METRICSTORE]> done: %d binary snapshot files created", n)
|
||||
lastCheckpointMu.Lock()
|
||||
lastCheckpoint = now
|
||||
@@ -137,6 +142,8 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
||||
// Rotate WAL files for successfully checkpointed hosts.
|
||||
RotateWALFiles(hostDirs)
|
||||
}
|
||||
walCheckpointActive.Store(false)
|
||||
walDropped.Store(0)
|
||||
} else {
|
||||
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), now.Unix())
|
||||
if err != nil {
|
||||
|
||||
@@ -61,9 +61,12 @@ const (
|
||||
// Fields:
|
||||
// - FileFormat: "json" (human-readable, periodic) or "wal" (binary snapshot + WAL, crash-safe); default is "wal"
|
||||
// - RootDir: Filesystem path for checkpoint files (created if missing)
|
||||
// - MaxWALSize: Maximum size in bytes for a single host's WAL file; 0 = unlimited (default).
|
||||
// When exceeded the WAL is force-rotated to prevent unbounded disk growth.
|
||||
type Checkpoints struct {
|
||||
FileFormat string `json:"file-format"`
|
||||
RootDir string `json:"directory"`
|
||||
MaxWALSize int64 `json:"max-wal-size,omitempty"`
|
||||
}
|
||||
|
||||
// Debug provides development and profiling options.
|
||||
|
||||
@@ -24,6 +24,11 @@ const configSchema = `{
|
||||
"directory": {
|
||||
"description": "Path in which the checkpointed files should be placed.",
|
||||
"type": "string"
|
||||
},
|
||||
"max-wal-size": {
|
||||
"description": "Maximum size in bytes for a single host's WAL file. When exceeded the WAL is force-rotated to prevent unbounded disk growth. Only applies when file-format is 'wal'. 0 means unlimited (default).",
|
||||
"type": "integer",
|
||||
"minimum": 0
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
//
|
||||
// The package organizes metrics in a tree structure (cluster → host → component) and
|
||||
// provides concurrent read/write access to metric data with configurable aggregation strategies.
|
||||
// Background goroutines handle periodic checkpointing (JSON or Avro format), archiving old data,
|
||||
// Background goroutines handle periodic checkpointing (JSON or WAL/binary format), archiving old data,
|
||||
// and enforcing retention policies.
|
||||
//
|
||||
// Key features:
|
||||
@@ -16,7 +16,7 @@
|
||||
// - Hierarchical data organization (selectors)
|
||||
// - Concurrent checkpoint/archive workers
|
||||
// - Support for sum and average aggregation
|
||||
// - NATS integration for metric ingestion
|
||||
// - NATS integration for metric ingestion via InfluxDB line protocol
|
||||
package metricstore
|
||||
|
||||
import (
|
||||
@@ -113,7 +113,8 @@ type MemoryStore struct {
|
||||
// 6. Optionally subscribes to NATS for real-time metric ingestion
|
||||
//
|
||||
// Parameters:
|
||||
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig)
|
||||
// - rawConfig: JSON configuration for the metric store (see MetricStoreConfig); may be nil to use defaults
|
||||
// - metrics: Map of metric names to their configurations (frequency and aggregation strategy)
|
||||
// - wg: WaitGroup that will be incremented for each background goroutine started
|
||||
//
|
||||
// The function will call cclog.Fatal on critical errors during initialization.
|
||||
|
||||
@@ -99,6 +99,11 @@ var walStagingWg sync.WaitGroup
|
||||
// SendWALMessage from sending on a closed channel (which panics in Go).
|
||||
var walShuttingDown atomic.Bool
|
||||
|
||||
// walCheckpointActive is set during binary checkpoint writes.
|
||||
// While active, SendWALMessage skips sending (returns true) because the
|
||||
// snapshot captures all in-memory data, making WAL writes redundant.
|
||||
var walCheckpointActive atomic.Bool
|
||||
|
||||
// WALMessage represents a single metric write to be appended to the WAL.
|
||||
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
|
||||
type WALMessage struct {
|
||||
@@ -122,6 +127,7 @@ type walFileState struct {
|
||||
f *os.File
|
||||
w *bufio.Writer
|
||||
dirty bool
|
||||
size int64 // approximate bytes written (tracked from open + writes)
|
||||
}
|
||||
|
||||
// walFlushInterval controls how often dirty WAL files are flushed to disk.
|
||||
@@ -145,6 +151,9 @@ func SendWALMessage(msg *WALMessage) bool {
|
||||
if walShardChs == nil || walShuttingDown.Load() {
|
||||
return false
|
||||
}
|
||||
if walCheckpointActive.Load() {
|
||||
return true // Data safe in memory; snapshot will capture it
|
||||
}
|
||||
shard := walShardIndex(msg.Cluster, msg.Node)
|
||||
select {
|
||||
case walShardChs[shard] <- msg:
|
||||
@@ -214,7 +223,11 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
||||
|
||||
// Write file header magic if file is new (empty).
|
||||
info, err := f.Stat()
|
||||
if err == nil && info.Size() == 0 {
|
||||
var fileSize int64
|
||||
if err == nil {
|
||||
fileSize = info.Size()
|
||||
}
|
||||
if err == nil && fileSize == 0 {
|
||||
var hdr [4]byte
|
||||
binary.LittleEndian.PutUint32(hdr[:], walFileMagic)
|
||||
if _, err := w.Write(hdr[:]); err != nil {
|
||||
@@ -222,9 +235,10 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
||||
f.Close()
|
||||
return nil
|
||||
}
|
||||
fileSize = 4
|
||||
}
|
||||
|
||||
ws = &walFileState{f: f, w: w}
|
||||
ws = &walFileState{f: f, w: w, size: fileSize}
|
||||
hostFiles[hostDir] = ws
|
||||
return ws
|
||||
}
|
||||
@@ -235,9 +249,30 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
||||
if ws == nil {
|
||||
return
|
||||
}
|
||||
if err := writeWALRecordDirect(ws.w, msg); err != nil {
|
||||
|
||||
// Enforce max WAL size: force-rotate before writing if limit is exceeded.
|
||||
// The in-memory store still holds the data; only crash-recovery coverage is lost.
|
||||
if maxSize := Keys.Checkpoints.MaxWALSize; maxSize > 0 && ws.size >= maxSize {
|
||||
cclog.Warnf("[METRICSTORE]> WAL: force-rotating %s (size %d >= limit %d)",
|
||||
hostDir, ws.size, maxSize)
|
||||
ws.w.Flush()
|
||||
ws.f.Close()
|
||||
walPath := path.Join(hostDir, "current.wal")
|
||||
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
|
||||
cclog.Errorf("[METRICSTORE]> WAL: remove %s: %v", walPath, err)
|
||||
}
|
||||
delete(hostFiles, hostDir)
|
||||
ws = getOrOpenWAL(hostDir)
|
||||
if ws == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
n, err := writeWALRecordDirect(ws.w, msg)
|
||||
if err != nil {
|
||||
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
|
||||
}
|
||||
ws.size += int64(n)
|
||||
ws.dirty = true
|
||||
}
|
||||
|
||||
@@ -331,6 +366,7 @@ func RotateWALFiles(hostDirs []string) {
|
||||
if walShardRotateChs == nil || walShuttingDown.Load() {
|
||||
return
|
||||
}
|
||||
deadline := time.After(2 * time.Minute)
|
||||
dones := make([]chan struct{}, 0, len(hostDirs))
|
||||
for _, dir := range hostDirs {
|
||||
done := make(chan struct{})
|
||||
@@ -338,16 +374,18 @@ func RotateWALFiles(hostDirs []string) {
|
||||
select {
|
||||
case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}:
|
||||
dones = append(dones, done)
|
||||
default:
|
||||
// Channel full or goroutine not consuming — skip this host.
|
||||
cclog.Warnf("[METRICSTORE]> WAL rotation skipped for %s (channel full)", dir)
|
||||
case <-deadline:
|
||||
cclog.Warnf("[METRICSTORE]> WAL rotation send timed out, %d of %d hosts remaining",
|
||||
len(hostDirs)-len(dones), len(hostDirs))
|
||||
goto waitDones
|
||||
}
|
||||
}
|
||||
waitDones:
|
||||
for _, done := range dones {
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(30 * time.Second):
|
||||
cclog.Warn("[METRICSTORE]> WAL rotation timed out, continuing")
|
||||
cclog.Warn("[METRICSTORE]> WAL rotation completion timed out, continuing")
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -362,8 +400,9 @@ func walShardIndexFromDir(hostDir string) int {
|
||||
return walShardIndex(cluster, node)
|
||||
}
|
||||
|
||||
// RotateWALFiles sends rotation requests for the given host directories
|
||||
// and blocks until all rotations complete.
|
||||
// RotateWALFilesAfterShutdown directly removes current.wal files for the given
|
||||
// host directories. Used after shutdown, when WALStaging goroutines have already
|
||||
// exited and the channel-based RotateWALFiles is no longer safe to call.
|
||||
func RotateWALFilesAfterShutdown(hostDirs []string) {
|
||||
for _, dir := range hostDirs {
|
||||
walPath := path.Join(dir, "current.wal")
|
||||
@@ -376,7 +415,8 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
|
||||
// writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
|
||||
// then writes it to the bufio.Writer in a single call. This prevents partial
|
||||
// records in the write buffer if a write error occurs mid-record (e.g. disk full).
|
||||
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
|
||||
// Returns the number of bytes written and any error.
|
||||
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) (int, error) {
|
||||
// Compute payload and total record size.
|
||||
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
|
||||
for _, s := range msg.Selector {
|
||||
@@ -430,8 +470,8 @@ func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
|
||||
binary.LittleEndian.PutUint32(buf[p:p+4], crc)
|
||||
|
||||
// Single atomic write to the buffered writer.
|
||||
_, err := w.Write(buf)
|
||||
return err
|
||||
n, err := w.Write(buf)
|
||||
return n, err
|
||||
}
|
||||
|
||||
// readWALRecord reads one WAL record from the reader.
|
||||
@@ -696,11 +736,6 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
|
||||
atomic.AddInt32(&errs, 1)
|
||||
} else {
|
||||
atomic.AddInt32(&n, 1)
|
||||
// Delete WAL immediately after successful snapshot.
|
||||
walPath := path.Join(wi.hostDir, "current.wal")
|
||||
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
|
||||
cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err)
|
||||
}
|
||||
successMu.Lock()
|
||||
successDirs = append(successDirs, wi.hostDir)
|
||||
successMu.Unlock()
|
||||
|
||||
@@ -57,7 +57,7 @@
|
||||
let entries = $state([]);
|
||||
let loading = $state(false);
|
||||
let error = $state(null);
|
||||
let timer = $state(null);
|
||||
let timer = null;
|
||||
|
||||
function levelColor(priority) {
|
||||
if (priority <= 2) return "danger";
|
||||
|
||||
Reference in New Issue
Block a user