20 Commits

Author SHA1 Message Date
0ce2fa2fbe fix: checkpoint initialization gap on restarts
Entire-Checkpoint: 3f4d366b037c
2026-03-27 06:59:58 +01:00
Aditya Ujeniya
71fc9efec7 Add binaryCheckpointReader as utility tool 2026-03-26 17:21:23 +01:00
Aditya Ujeniya
6e97ac8b28 Verbose logs for DataDoesNotAlign error in CCMS 2026-03-26 14:13:12 +01:00
97d65a9e5c Fix bugs in WAL journal pipeline
Entire-Checkpoint: 8fe0de4e6ac2
2026-03-26 07:25:36 +01:00
e759810051 Add shutdown timings. Do not drain WAL buffers on shutdown
Entire-Checkpoint: d4b497002f54
2026-03-26 07:02:37 +01:00
b1884fda9d Prepare bug fix release 1.5.3
Entire-Checkpoint: 84d4ab77be71
2026-03-25 07:18:27 +01:00
c267501a1b Reduve noise in tagger logging 2026-03-25 06:53:01 +01:00
a550344f13 Increase server shutdown timeout
Entire-Checkpoint: cf3b472471bd
2026-03-25 06:15:55 +01:00
Christoph Kluge
bd7125a52e review doubleranged filters, fix and improve valeu selection 2026-03-24 15:00:41 +01:00
93a9d732a4 fix: Improve shutdown time
Entire-Checkpoint: a4d012e1edcf
2026-03-24 07:17:34 +01:00
6f7dda53ee Cleanup
Entire-Checkpoint: ed68d32218ac
2026-03-24 07:03:46 +01:00
0325d9e866 fix: Increase throughput for WAL writers
Entire-Checkpoint: ddd40d290c56
2026-03-24 06:53:12 +01:00
3d94b0bf79 Merge branch 'master' into hotfix 2026-03-23 19:14:16 +01:00
Christoph Kluge
d5ea2b4cf5 change: query node states explicitly in node view 2026-03-23 17:23:54 +01:00
45f329e5fb feat: Add command line switch to trigger manual metricstore checkpoint cleanup
Entire-Checkpoint: 29b9d52db89c
2026-03-23 07:58:35 +01:00
Jan Eitzinger
100dd7dacf Merge pull request #533 from ClusterCockpit/hotfix
metricstore archive fixes
2026-03-23 07:20:15 +01:00
192c94a78d fix: Prevent interruption of body lineprotocol parsing on locks
Entire-Checkpoint: ccda3b2ff4cb
2026-03-23 07:12:13 +01:00
e41d1251ba fix: Continue on error
Entire-Checkpoint: 6000eb5a5bb8
2026-03-23 06:37:24 +01:00
586c902044 Restructure metricstore cleanup archiving to stay withinh 32k parquet-go limit
Entire-Checkpoint: 1660b8cf2571
2026-03-23 06:32:24 +01:00
01ec70baa8 Iterate over subCluster MetricConfig directly so that removed metrics are not included
Entire-Checkpoint: efb6f0a96069
2026-03-20 11:39:34 +01:00
27 changed files with 923 additions and 404 deletions

View File

@@ -1,6 +1,6 @@
TARGET = ./cc-backend TARGET = ./cc-backend
FRONTEND = ./web/frontend FRONTEND = ./web/frontend
VERSION = 1.5.2 VERSION = 1.5.3
GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development') GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development')
CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S") CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S")
LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}' LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}'

View File

@@ -1,4 +1,4 @@
# `cc-backend` version 1.5.2 # `cc-backend` version 1.5.3
Supports job archive version 3 and database version 11. Supports job archive version 3 and database version 11.
@@ -15,6 +15,47 @@ While we are confident that the memory issue with the metricstore cleanup move
policy is fixed, it is still recommended to use delete policy for cleanup. policy is fixed, it is still recommended to use delete policy for cleanup.
This is also the default. This is also the default.
## Changes in 1.5.3
### Bug fixes
- **Doubleranged filter fixes**: Range filters now correctly handle zero as a
boundary value. Improved validation and UI text for "more than equal" and
"less than equal" range selections.
- **Lineprotocol body parsing interrupted**: Switched from `ReadTimeout` to
`ReadHeaderTimeout` so that long-running metric submissions are no longer
cut off mid-stream.
- **Checkpoint archiving continues on error**: A single cluster's archiving
failure no longer aborts the entire cleanup operation. Errors are collected
and reported per cluster.
- **Parquet row group overflow**: Added periodic flush during checkpoint
archiving to prevent exceeding the parquet-go 32k column-write limit.
- **Removed metrics excluded from subcluster config**: Metrics removed from a
subcluster are no longer returned by `GetMetricConfigSubCluster`.
### MetricStore performance
- **WAL writer throughput**: Decoupled WAL file flushing from message processing
using a periodic 5-second batch flush (up to 4096 messages per cycle),
significantly increasing metric ingestion throughput.
- **Improved shutdown time**: HTTP shutdown timeout reduced; metricstore and
archiver now shut down concurrently. Overall shutdown deadline raised to
60 seconds.
### New features
- **Manual checkpoint cleanup flag**: New `-cleanup-checkpoints` CLI flag
triggers checkpoint cleanup without starting the server, useful for
maintenance windows or automated cleanup scripts.
- **Explicit node state queries in node view**: Node health and scheduler state
are now fetched independently from metric data for fresher status information.
### Logging improvements
- **Reduced tagger log noise**: Missing metrics and expression evaluation errors
in the job classification tagger are now logged at debug level instead of
error level.
## Changes in 1.5.2 ## Changes in 1.5.2
### Bug fixes ### Bug fixes

View File

@@ -11,7 +11,8 @@ import "flag"
var ( var (
flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB, flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB,
flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags, flagOptimizeDB bool flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags, flagOptimizeDB,
flagCleanupCheckpoints bool
flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string
) )
@@ -28,6 +29,7 @@ func cliInit() {
flag.BoolVar(&flagApplyTags, "apply-tags", false, "Run taggers on all completed jobs and exit") flag.BoolVar(&flagApplyTags, "apply-tags", false, "Run taggers on all completed jobs and exit")
flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit") flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit")
flag.BoolVar(&flagOptimizeDB, "optimize-db", false, "Optimize database: run VACUUM to reclaim space, then ANALYZE to update query planner statistics") flag.BoolVar(&flagOptimizeDB, "optimize-db", false, "Optimize database: run VACUUM to reclaim space, then ANALYZE to update query planner statistics")
flag.BoolVar(&flagCleanupCheckpoints, "cleanup-checkpoints", false, "Clean up old checkpoint files (delete or archive) based on retention settings, then exit")
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: <username>:[admin,support,manager,api,user]:<password>") flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: <username>:[admin,support,manager,api,user]:<password>")

View File

@@ -14,6 +14,7 @@ import (
"fmt" "fmt"
"os" "os"
"os/signal" "os/signal"
goruntime "runtime"
"runtime/debug" "runtime/debug"
"strings" "strings"
"sync" "sync"
@@ -536,6 +537,43 @@ func run() error {
return err return err
} }
// Handle checkpoint cleanup
if flagCleanupCheckpoints {
mscfg := ccconf.GetPackageConfig("metric-store")
if mscfg == nil {
return fmt.Errorf("metric-store configuration required for checkpoint cleanup")
}
if err := json.Unmarshal(mscfg, &metricstore.Keys); err != nil {
return fmt.Errorf("decoding metric-store config: %w", err)
}
if metricstore.Keys.NumWorkers <= 0 {
metricstore.Keys.NumWorkers = min(goruntime.NumCPU()/2+1, metricstore.DefaultMaxWorkers)
}
d, err := time.ParseDuration(metricstore.Keys.RetentionInMemory)
if err != nil {
return fmt.Errorf("parsing retention-in-memory: %w", err)
}
from := time.Now().Add(-d)
deleteMode := metricstore.Keys.Cleanup == nil || metricstore.Keys.Cleanup.Mode != "archive"
cleanupDir := ""
if !deleteMode {
cleanupDir = metricstore.Keys.Cleanup.RootDir
}
cclog.Infof("Cleaning up checkpoints older than %s...", from.Format(time.RFC3339))
n, err := metricstore.CleanupCheckpoints(
metricstore.Keys.Checkpoints.RootDir, cleanupDir, from.Unix(), deleteMode)
if err != nil {
return fmt.Errorf("checkpoint cleanup: %w", err)
}
if deleteMode {
cclog.Exitf("Cleanup done: %d checkpoint files deleted.", n)
} else {
cclog.Exitf("Cleanup done: %d checkpoint files archived to parquet.", n)
}
}
// Exit if start server is not requested // Exit if start server is not requested
if !flagServer { if !flagServer {
cclog.Exit("No errors, server flag not set. Exiting cc-backend.") cclog.Exit("No errors, server flag not set. Exiting cc-backend.")

View File

@@ -18,6 +18,7 @@ import (
"net/http" "net/http"
"os" "os"
"strings" "strings"
"sync"
"time" "time"
"github.com/99designs/gqlgen/graphql" "github.com/99designs/gqlgen/graphql"
@@ -344,17 +345,17 @@ func (s *Server) init() error {
// Server timeout defaults (in seconds) // Server timeout defaults (in seconds)
const ( const (
defaultReadTimeout = 20 defaultReadHeaderTimeout = 20
defaultWriteTimeout = 20 defaultWriteTimeout = 20
) )
func (s *Server) Start(ctx context.Context) error { func (s *Server) Start(ctx context.Context) error {
// Use configurable timeouts with defaults // Use configurable timeouts with defaults
readTimeout := time.Duration(defaultReadTimeout) * time.Second readHeaderTimeout := time.Duration(defaultReadHeaderTimeout) * time.Second
writeTimeout := time.Duration(defaultWriteTimeout) * time.Second writeTimeout := time.Duration(defaultWriteTimeout) * time.Second
s.server = &http.Server{ s.server = &http.Server{
ReadTimeout: readTimeout, ReadHeaderTimeout: readHeaderTimeout,
WriteTimeout: writeTimeout, WriteTimeout: writeTimeout,
Handler: s.router, Handler: s.router,
Addr: config.Keys.Addr, Addr: config.Keys.Addr,
@@ -399,16 +400,6 @@ func (s *Server) Start(ctx context.Context) error {
return fmt.Errorf("dropping privileges: %w", err) return fmt.Errorf("dropping privileges: %w", err)
} }
// Handle context cancellation for graceful shutdown
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := s.server.Shutdown(shutdownCtx); err != nil {
cclog.Errorf("Server shutdown error: %v", err)
}
}()
if err = s.server.Serve(listener); err != nil && err != http.ErrServerClosed { if err = s.server.Serve(listener); err != nil && err != http.ErrServerClosed {
return fmt.Errorf("server failed: %w", err) return fmt.Errorf("server failed: %w", err)
} }
@@ -416,29 +407,53 @@ func (s *Server) Start(ctx context.Context) error {
} }
func (s *Server) Shutdown(ctx context.Context) { func (s *Server) Shutdown(ctx context.Context) {
// Create a shutdown context with timeout shutdownStart := time.Now()
shutdownCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
natsStart := time.Now()
nc := nats.GetClient() nc := nats.GetClient()
if nc != nil { if nc != nil {
nc.Close() nc.Close()
} }
cclog.Infof("Shutdown: NATS closed (%v)", time.Since(natsStart))
// First shut down the server gracefully (waiting for all ongoing requests) httpStart := time.Now()
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := s.server.Shutdown(shutdownCtx); err != nil { if err := s.server.Shutdown(shutdownCtx); err != nil {
cclog.Errorf("Server shutdown error: %v", err) cclog.Errorf("Server shutdown error: %v", err)
} }
cclog.Infof("Shutdown: HTTP server stopped (%v)", time.Since(httpStart))
// Archive all the metric store data // Run metricstore and archiver shutdown concurrently.
ms := metricstore.GetMemoryStore() // They are independent: metricstore writes .bin snapshots,
// archiver flushes pending job archives.
storeStart := time.Now()
done := make(chan struct{})
go func() {
defer close(done)
var wg sync.WaitGroup
if ms != nil { if ms := metricstore.GetMemoryStore(); ms != nil {
wg.Go(func() {
metricstore.Shutdown() metricstore.Shutdown()
})
} }
// Shutdown archiver with 10 second timeout for fast shutdown wg.Go(func() {
if err := archiver.Shutdown(10 * time.Second); err != nil { if err := archiver.Shutdown(10 * time.Second); err != nil {
cclog.Warnf("Archiver shutdown: %v", err) cclog.Warnf("Archiver shutdown: %v", err)
} }
})
wg.Wait()
}()
select {
case <-done:
cclog.Infof("Shutdown: metricstore + archiver completed (%v)", time.Since(storeStart))
case <-time.After(60 * time.Second):
cclog.Warnf("Shutdown deadline exceeded after %v, forcing exit", time.Since(shutdownStart))
}
cclog.Infof("Shutdown: total time %v", time.Since(shutdownStart))
} }

View File

@@ -280,11 +280,11 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
// buildIntCondition creates clauses for integer range filters, using BETWEEN only if required. // buildIntCondition creates clauses for integer range filters, using BETWEEN only if required.
func buildIntCondition(field string, cond *config.IntRange, query sq.SelectBuilder) sq.SelectBuilder { func buildIntCondition(field string, cond *config.IntRange, query sq.SelectBuilder) sq.SelectBuilder {
if cond.From != 1 && cond.To != 0 { if cond.From > 0 && cond.To > 0 {
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To) return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
} else if cond.From != 1 && cond.To == 0 { } else if cond.From > 0 && cond.To == 0 {
return query.Where(field+" >= ?", cond.From) return query.Where(field+" >= ?", cond.From)
} else if cond.From == 1 && cond.To != 0 { } else if cond.From == 0 && cond.To > 0 {
return query.Where(field+" <= ?", cond.To) return query.Where(field+" <= ?", cond.To)
} else { } else {
return query return query
@@ -293,11 +293,11 @@ func buildIntCondition(field string, cond *config.IntRange, query sq.SelectBuild
// buildFloatCondition creates a clauses for float range filters, using BETWEEN only if required. // buildFloatCondition creates a clauses for float range filters, using BETWEEN only if required.
func buildFloatCondition(field string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder { func buildFloatCondition(field string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder {
if cond.From != 1.0 && cond.To != 0.0 { if cond.From > 0.0 && cond.To > 0.0 {
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To) return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
} else if cond.From != 1.0 && cond.To == 0.0 { } else if cond.From > 0.0 && cond.To == 0.0 {
return query.Where(field+" >= ?", cond.From) return query.Where(field+" >= ?", cond.From)
} else if cond.From == 1.0 && cond.To != 0.0 { } else if cond.From == 0.0 && cond.To > 0.0 {
return query.Where(field+" <= ?", cond.To) return query.Where(field+" <= ?", cond.To)
} else { } else {
return query return query
@@ -339,11 +339,11 @@ func buildTimeCondition(field string, cond *config.TimeRange, query sq.SelectBui
// buildFloatJSONCondition creates a filter on a numeric field within the footprint JSON column, using BETWEEN only if required. // buildFloatJSONCondition creates a filter on a numeric field within the footprint JSON column, using BETWEEN only if required.
func buildFloatJSONCondition(jsonField string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder { func buildFloatJSONCondition(jsonField string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder {
query = query.Where("JSON_VALID(footprint)") query = query.Where("JSON_VALID(footprint)")
if cond.From != 1.0 && cond.To != 0.0 { if cond.From > 0.0 && cond.To > 0.0 {
return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") BETWEEN ? AND ?", cond.From, cond.To) return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") BETWEEN ? AND ?", cond.From, cond.To)
} else if cond.From != 1.0 && cond.To == 0.0 { } else if cond.From > 0.0 && cond.To == 0.0 {
return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") >= ?", cond.From) return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") >= ?", cond.From)
} else if cond.From == 1.0 && cond.To != 0.0 { } else if cond.From == 0.0 && cond.To > 0.0 {
return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") <= ?", cond.To) return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") <= ?", cond.To)
} else { } else {
return query return query

View File

@@ -308,7 +308,7 @@ func buildFilterPresets(query url.Values) map[string]any {
if parts[0] == "lessthan" { if parts[0] == "lessthan" {
lt, lte := strconv.Atoi(parts[1]) lt, lte := strconv.Atoi(parts[1])
if lte == nil { if lte == nil {
filterPresets["numNodes"] = map[string]int{"from": 1, "to": lt} filterPresets["numNodes"] = map[string]int{"from": 0, "to": lt}
} }
} else if parts[0] == "morethan" { } else if parts[0] == "morethan" {
mt, mte := strconv.Atoi(parts[1]) mt, mte := strconv.Atoi(parts[1])
@@ -330,7 +330,7 @@ func buildFilterPresets(query url.Values) map[string]any {
if parts[0] == "lessthan" { if parts[0] == "lessthan" {
lt, lte := strconv.Atoi(parts[1]) lt, lte := strconv.Atoi(parts[1])
if lte == nil { if lte == nil {
filterPresets["numHWThreads"] = map[string]int{"from": 1, "to": lt} filterPresets["numHWThreads"] = map[string]int{"from": 0, "to": lt}
} }
} else if parts[0] == "morethan" { } else if parts[0] == "morethan" {
mt, mte := strconv.Atoi(parts[1]) mt, mte := strconv.Atoi(parts[1])
@@ -352,7 +352,7 @@ func buildFilterPresets(query url.Values) map[string]any {
if parts[0] == "lessthan" { if parts[0] == "lessthan" {
lt, lte := strconv.Atoi(parts[1]) lt, lte := strconv.Atoi(parts[1])
if lte == nil { if lte == nil {
filterPresets["numAccelerators"] = map[string]int{"from": 1, "to": lt} filterPresets["numAccelerators"] = map[string]int{"from": 0, "to": lt}
} }
} else if parts[0] == "morethan" { } else if parts[0] == "morethan" {
mt, mte := strconv.Atoi(parts[1]) mt, mte := strconv.Atoi(parts[1])
@@ -408,7 +408,7 @@ func buildFilterPresets(query url.Values) map[string]any {
if parts[0] == "lessthan" { if parts[0] == "lessthan" {
lt, lte := strconv.Atoi(parts[1]) lt, lte := strconv.Atoi(parts[1])
if lte == nil { if lte == nil {
filterPresets["energy"] = map[string]int{"from": 1, "to": lt} filterPresets["energy"] = map[string]int{"from": 0, "to": lt}
} }
} else if parts[0] == "morethan" { } else if parts[0] == "morethan" {
mt, mte := strconv.Atoi(parts[1]) mt, mte := strconv.Atoi(parts[1])
@@ -434,7 +434,7 @@ func buildFilterPresets(query url.Values) map[string]any {
if lte == nil { if lte == nil {
statEntry := map[string]any{ statEntry := map[string]any{
"field": parts[0], "field": parts[0],
"from": 1, "from": 0,
"to": lt, "to": lt,
} }
statList = append(statList, statEntry) statList = append(statList, statEntry)

View File

@@ -363,7 +363,7 @@ func (t *JobClassTagger) Match(job *schema.Job) {
for _, m := range ri.metrics { for _, m := range ri.metrics {
stats, ok := jobStats[m] stats, ok := jobStats[m]
if !ok { if !ok {
cclog.Errorf("job classification: missing metric '%s' for rule %s on job %d", m, tag, job.JobID) cclog.Debugf("job classification: missing metric '%s' for rule %s on job %d", m, tag, job.JobID)
skipRule = true skipRule = true
break break
} }
@@ -388,7 +388,7 @@ func (t *JobClassTagger) Match(job *schema.Job) {
for _, r := range ri.requirements { for _, r := range ri.requirements {
ok, err := expr.Run(r, env) ok, err := expr.Run(r, env)
if err != nil { if err != nil {
cclog.Errorf("error running requirement for rule %s: %#v", tag, err) cclog.Debugf("error running requirement for rule %s: %#v", tag, err)
requirementsMet = false requirementsMet = false
break break
} }
@@ -407,7 +407,7 @@ func (t *JobClassTagger) Match(job *schema.Job) {
for _, v := range ri.variables { for _, v := range ri.variables {
value, err := expr.Run(v.expr, env) value, err := expr.Run(v.expr, env)
if err != nil { if err != nil {
cclog.Errorf("error evaluating variable %s for rule %s: %#v", v.name, tag, err) cclog.Debugf("error evaluating variable %s for rule %s: %#v", v.name, tag, err)
varError = true varError = true
break break
} }

View File

@@ -198,25 +198,12 @@ func GetSubCluster(cluster, subcluster string) (*schema.SubCluster, error) {
func GetMetricConfigSubCluster(cluster, subcluster string) map[string]*schema.Metric { func GetMetricConfigSubCluster(cluster, subcluster string) map[string]*schema.Metric {
metrics := make(map[string]*schema.Metric) metrics := make(map[string]*schema.Metric)
for _, c := range Clusters { sc, err := GetSubCluster(cluster, subcluster)
if c.Name == cluster { if err != nil {
for _, m := range c.MetricConfig { return metrics
for _, s := range m.SubClusters {
if s.Name == subcluster {
metrics[m.Name] = &schema.Metric{
Name: m.Name,
Unit: s.Unit,
Peak: s.Peak,
Normal: s.Normal,
Caution: s.Caution,
Alert: s.Alert,
}
break
}
} }
_, ok := metrics[m.Name] for _, m := range sc.MetricConfig {
if !ok {
metrics[m.Name] = &schema.Metric{ metrics[m.Name] = &schema.Metric{
Name: m.Name, Name: m.Name,
Unit: m.Unit, Unit: m.Unit,
@@ -226,10 +213,6 @@ func GetMetricConfigSubCluster(cluster, subcluster string) map[string]*schema.Me
Alert: m.Alert, Alert: m.Alert,
} }
} }
}
break
}
}
return metrics return metrics
} }

View File

@@ -37,3 +37,27 @@ func TestClusterConfig(t *testing.T) {
// spew.Dump(archive.GlobalMetricList) // spew.Dump(archive.GlobalMetricList)
// t.Fail() // t.Fail()
} }
func TestGetMetricConfigSubClusterRespectsRemovedMetrics(t *testing.T) {
if err := archive.Init(json.RawMessage(`{"kind": "file","path": "testdata/archive"}`)); err != nil {
t.Fatal(err)
}
sc, err := archive.GetSubCluster("fritz", "spr2tb")
if err != nil {
t.Fatal(err)
}
metrics := archive.GetMetricConfigSubCluster("fritz", "spr2tb")
if len(metrics) != len(sc.MetricConfig) {
t.Fatalf("GetMetricConfigSubCluster() returned %d metrics, want %d", len(metrics), len(sc.MetricConfig))
}
if _, ok := metrics["flops_any"]; ok {
t.Fatalf("GetMetricConfigSubCluster() returned removed metric flops_any for subcluster spr2tb")
}
if _, ok := metrics["cpu_power"]; !ok {
t.Fatalf("GetMetricConfigSubCluster() missing active metric cpu_power for subcluster spr2tb")
}
}

View File

@@ -11,6 +11,7 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -22,6 +23,7 @@ import (
func CleanUp(wg *sync.WaitGroup, ctx context.Context) { func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
if Keys.Cleanup.Mode == "archive" { if Keys.Cleanup.Mode == "archive" {
cclog.Info("[METRICSTORE]> enable archive cleanup to parquet")
// Run as Archiver // Run as Archiver
cleanUpWorker(wg, ctx, cleanUpWorker(wg, ctx,
Keys.RetentionInMemory, Keys.RetentionInMemory,
@@ -43,7 +45,6 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
// cleanUpWorker takes simple values to configure what it does // cleanUpWorker takes simple values to configure what it does
func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) { func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) {
wg.Go(func() { wg.Go(func() {
d, err := time.ParseDuration(interval) d, err := time.ParseDuration(interval)
if err != nil { if err != nil {
cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err) cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err)
@@ -52,6 +53,16 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod
return return
} }
// Account for checkpoint span: files named {from}.bin contain data up to
// from+checkpointInterval. Subtract the checkpoint interval so we don't
// delete files whose data still falls within the retention window.
checkpointSpan := 12 * time.Hour
if Keys.CheckpointInterval != "" {
if parsed, err := time.ParseDuration(Keys.CheckpointInterval); err == nil {
checkpointSpan = parsed
}
}
ticker := time.NewTicker(d) ticker := time.NewTicker(d)
defer ticker.Stop() defer ticker.Stop()
@@ -60,7 +71,7 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod
case <-ctx.Done(): case <-ctx.Done():
return return
case <-ticker.C: case <-ticker.C:
t := time.Now().Add(-d) t := time.Now().Add(-d).Add(-checkpointSpan)
cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339)) cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339))
n, err := CleanupCheckpoints(Keys.Checkpoints.RootDir, cleanupDir, t.Unix(), delete) n, err := CleanupCheckpoints(Keys.Checkpoints.RootDir, cleanupDir, t.Unix(), delete)
@@ -181,6 +192,7 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
} }
totalFiles := 0 totalFiles := 0
var clusterErrors []string
for _, clusterEntry := range clusterEntries { for _, clusterEntry := range clusterEntries {
if !clusterEntry.IsDir() { if !clusterEntry.IsDir() {
@@ -190,7 +202,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
cluster := clusterEntry.Name() cluster := clusterEntry.Name()
hostEntries, err := os.ReadDir(filepath.Join(checkpointsDir, cluster)) hostEntries, err := os.ReadDir(filepath.Join(checkpointsDir, cluster))
if err != nil { if err != nil {
return totalFiles, err cclog.Errorf("[METRICSTORE]> error reading host entries for cluster %s: %s", cluster, err.Error())
clusterErrors = append(clusterErrors, cluster)
continue
} }
// Workers load checkpoint files from disk; main thread writes to parquet. // Workers load checkpoint files from disk; main thread writes to parquet.
@@ -255,7 +269,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
// Drain results channel to unblock workers // Drain results channel to unblock workers
for range results { for range results {
} }
return totalFiles, fmt.Errorf("creating parquet writer for cluster %s: %w", cluster, err) cclog.Errorf("[METRICSTORE]> error creating parquet writer for cluster %s: %s", cluster, err.Error())
clusterErrors = append(clusterErrors, cluster)
continue
} }
type deleteItem struct { type deleteItem struct {
@@ -275,6 +291,12 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
break break
} }
} }
// Flush once per host to keep row group count within parquet limits.
if writeErr == nil {
if err := writer.FlushRowGroup(); err != nil {
writeErr = err
}
}
} }
// Always track files for deletion (even if write failed, we still drain) // Always track files for deletion (even if write failed, we still drain)
toDelete = append(toDelete, deleteItem{dir: r.dir, files: r.files}) toDelete = append(toDelete, deleteItem{dir: r.dir, files: r.files})
@@ -285,7 +307,10 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
} }
if errs > 0 { if errs > 0 {
return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster) cclog.Errorf("[METRICSTORE]> %d errors reading checkpoints for cluster %s", errs, cluster)
clusterErrors = append(clusterErrors, cluster)
os.Remove(parquetFile)
continue
} }
if writer.count == 0 { if writer.count == 0 {
@@ -296,7 +321,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
if writeErr != nil { if writeErr != nil {
os.Remove(parquetFile) os.Remove(parquetFile)
return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, writeErr) cclog.Errorf("[METRICSTORE]> error writing parquet archive for cluster %s: %s", cluster, writeErr.Error())
clusterErrors = append(clusterErrors, cluster)
continue
} }
// Delete archived checkpoint files // Delete archived checkpoint files
@@ -316,5 +343,10 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
} }
cclog.Infof("[METRICSTORE]> archiving checkpoints completed in %s (%d files)", time.Since(startTime).Round(time.Millisecond), totalFiles) cclog.Infof("[METRICSTORE]> archiving checkpoints completed in %s (%d files)", time.Since(startTime).Round(time.Millisecond), totalFiles)
if len(clusterErrors) > 0 {
return totalFiles, fmt.Errorf("archiving failed for clusters: %s", strings.Join(clusterErrors, ", "))
}
return totalFiles, nil return totalFiles, nil
} }

View File

@@ -146,7 +146,9 @@ var (
// ErrDataDoesNotAlign indicates that aggregated data from child scopes // ErrDataDoesNotAlign indicates that aggregated data from child scopes
// does not align with the parent scope's expected timestamps/intervals. // does not align with the parent scope's expected timestamps/intervals.
ErrDataDoesNotAlign error = errors.New("[METRICSTORE]> data from lower granularities does not align") ErrDataDoesNotAlignMissingFront error = errors.New("[METRICSTORE]> data from lower granularities does not align (missing data prior to start of the buffers)")
ErrDataDoesNotAlignMissingBack error = errors.New("[METRICSTORE]> data from lower granularities does not align (missing data after the end of the buffers)")
ErrDataDoesNotAlignDataLenMismatch error = errors.New("[METRICSTORE]> data from lower granularities does not align (collected data length is different than expected data length)")
) )
// buffer stores time-series data for a single metric at a specific hierarchical level. // buffer stores time-series data for a single metric at a specific hierarchical level.

View File

@@ -86,14 +86,16 @@ var (
// Checkpointing starts a background worker that periodically saves metric data to disk. // Checkpointing starts a background worker that periodically saves metric data to disk.
// //
// Checkpoints are written every 12 hours (hardcoded). // restoreFrom is the earliest timestamp of data loaded from checkpoint files at startup.
// The first periodic checkpoint after restart will cover [restoreFrom, now], ensuring that
// loaded data is re-persisted before old checkpoint files are cleaned up.
// //
// Format behaviour: // Format behaviour:
// - "json": Periodic checkpointing every checkpointInterval // - "json": Periodic checkpointing every checkpointInterval
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval // - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { func Checkpointing(wg *sync.WaitGroup, ctx context.Context, restoreFrom time.Time) {
lastCheckpointMu.Lock() lastCheckpointMu.Lock()
lastCheckpoint = time.Now() lastCheckpoint = restoreFrom
lastCheckpointMu.Unlock() lastCheckpointMu.Unlock()
ms := GetMemoryStore() ms := GetMemoryStore()
@@ -337,25 +339,35 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
return ErrNoNewArchiveData return ErrNoNewArchiveData
} }
filepath := path.Join(dir, fmt.Sprintf("%d.json", from)) finalPath := path.Join(dir, fmt.Sprintf("%d.json", from))
f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) tmpPath := finalPath + ".tmp"
f, err := os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
if err != nil && os.IsNotExist(err) { if err != nil && os.IsNotExist(err) {
err = os.MkdirAll(dir, CheckpointDirPerms) err = os.MkdirAll(dir, CheckpointDirPerms)
if err == nil { if err == nil {
f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) f, err = os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
} }
} }
if err != nil { if err != nil {
return err return err
} }
defer f.Close()
bw := bufio.NewWriter(f) bw := bufio.NewWriter(f)
if err = json.NewEncoder(bw).Encode(cf); err != nil { if err = json.NewEncoder(bw).Encode(cf); err != nil {
f.Close()
os.Remove(tmpPath)
return err return err
} }
return bw.Flush() if err = bw.Flush(); err != nil {
f.Close()
os.Remove(tmpPath)
return err
}
f.Close()
return os.Rename(tmpPath, finalPath)
} }
// enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs. // enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs.
@@ -470,7 +482,7 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
data: metric.Data[0:n:n], data: metric.Data[0:n:n],
prev: nil, prev: nil,
next: nil, next: nil,
archived: true, archived: false,
} }
minfo, ok := m.Metrics[name] minfo, ok := m.Metrics[name]

View File

@@ -170,7 +170,7 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W
ctx, shutdown := context.WithCancel(context.Background()) ctx, shutdown := context.WithCancel(context.Background())
Retention(wg, ctx) Retention(wg, ctx)
Checkpointing(wg, ctx) Checkpointing(wg, ctx, restoreFrom)
CleanUp(wg, ctx) CleanUp(wg, ctx)
WALStaging(wg, ctx) WALStaging(wg, ctx)
MemoryUsageTracker(wg, ctx) MemoryUsageTracker(wg, ctx)
@@ -271,19 +271,32 @@ func (ms *MemoryStore) SetNodeProvider(provider NodeProvider) {
// //
// Note: This function blocks until the final checkpoint is written. // Note: This function blocks until the final checkpoint is written.
func Shutdown() { func Shutdown() {
totalStart := time.Now()
shutdownFuncMu.Lock() shutdownFuncMu.Lock()
defer shutdownFuncMu.Unlock() defer shutdownFuncMu.Unlock()
if shutdownFunc != nil { if shutdownFunc != nil {
shutdownFunc() shutdownFunc()
} }
cclog.Infof("[METRICSTORE]> Background workers cancelled (%v)", time.Since(totalStart))
if Keys.Checkpoints.FileFormat == "wal" { if Keys.Checkpoints.FileFormat == "wal" {
// Signal producers to stop sending before closing channels,
// preventing send-on-closed-channel panics from in-flight NATS workers.
walShuttingDown.Store(true)
// Brief grace period for in-flight DecodeLine calls to complete.
time.Sleep(100 * time.Millisecond)
for _, ch := range walShardChs { for _, ch := range walShardChs {
close(ch) close(ch)
} }
drainStart := time.Now()
WaitForWALStagingDrain()
cclog.Infof("[METRICSTORE]> WAL staging goroutines exited (%v)", time.Since(drainStart))
} }
cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir) cclog.Infof("[METRICSTORE]> Writing checkpoint to '%s'...", Keys.Checkpoints.RootDir)
checkpointStart := time.Now()
var files int var files int
var err error var err error
@@ -294,19 +307,16 @@ func Shutdown() {
lastCheckpointMu.Unlock() lastCheckpointMu.Unlock()
if Keys.Checkpoints.FileFormat == "wal" { if Keys.Checkpoints.FileFormat == "wal" {
var hostDirs []string // WAL files are deleted per-host inside ToCheckpointWAL workers.
files, hostDirs, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix()) files, _, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
if err == nil {
RotateWALFilesAfterShutdown(hostDirs)
}
} else { } else {
files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix()) files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
} }
if err != nil { if err != nil {
cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s\n", err.Error()) cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s", err.Error())
} }
cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files) cclog.Infof("[METRICSTORE]> Done! (%d files written in %v, total shutdown: %v)", files, time.Since(checkpointStart), time.Since(totalStart))
} }
// Retention starts a background goroutine that periodically frees old metric data. // Retention starts a background goroutine that periodically frees old metric data.
@@ -702,16 +712,16 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso
} else if from != cfrom || to != cto || len(data) != len(cdata) { } else if from != cfrom || to != cto || len(data) != len(cdata) {
missingfront, missingback := int((from-cfrom)/minfo.Frequency), int((to-cto)/minfo.Frequency) missingfront, missingback := int((from-cfrom)/minfo.Frequency), int((to-cto)/minfo.Frequency)
if missingfront != 0 { if missingfront != 0 {
return ErrDataDoesNotAlign return ErrDataDoesNotAlignMissingFront
} }
newlen := len(cdata) - missingback newlen := len(cdata) - missingback
if newlen < 1 { if newlen < 1 {
return ErrDataDoesNotAlign return ErrDataDoesNotAlignMissingBack
} }
cdata = cdata[0:newlen] cdata = cdata[0:newlen]
if len(cdata) != len(data) { if len(cdata) != len(data) {
return ErrDataDoesNotAlign return ErrDataDoesNotAlignDataLenMismatch
} }
from, to = cfrom, cto from, to = cfrom, cto

View File

@@ -99,7 +99,7 @@ func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) {
// WriteCheckpointFile streams a CheckpointFile tree directly to Parquet rows, // WriteCheckpointFile streams a CheckpointFile tree directly to Parquet rows,
// writing metrics in sorted order without materializing all rows in memory. // writing metrics in sorted order without materializing all rows in memory.
// Produces one row group per call (typically one host's data). // Call FlushRowGroup() after writing all checkpoint files for a host.
func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string) error { func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string) error {
w.writeLevel(cf, cluster, hostname, scope, scopeID) w.writeLevel(cf, cluster, hostname, scope, scopeID)
@@ -112,10 +112,15 @@ func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster,
w.batch = w.batch[:0] w.batch = w.batch[:0]
} }
return nil
}
// FlushRowGroup flushes the current row group to the Parquet file.
// Should be called once per host after all checkpoint files for that host are written.
func (w *parquetArchiveWriter) FlushRowGroup() error {
if err := w.writer.Flush(); err != nil { if err := w.writer.Flush(); err != nil {
return fmt.Errorf("flushing parquet row group: %w", err) return fmt.Errorf("flushing parquet row group: %w", err)
} }
return nil return nil
} }

View File

@@ -91,8 +91,10 @@ func (m *MemoryStore) Stats(selector util.Selector, metric string, from, to int6
if n == 0 { if n == 0 {
from, to = cfrom, cto from, to = cfrom, cto
} else if from != cfrom || to != cto { } else if from != cfrom {
return ErrDataDoesNotAlign return ErrDataDoesNotAlignMissingFront
} else if to != cto {
return ErrDataDoesNotAlignMissingBack
} }
samples += stats.Samples samples += stats.Samples

View File

@@ -69,6 +69,7 @@ import (
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema" "github.com/ClusterCockpit/cc-lib/v2/schema"
@@ -91,6 +92,13 @@ var walShardRotateChs []chan walRotateReq
// walNumShards stores the number of shards (set during WALStaging init). // walNumShards stores the number of shards (set during WALStaging init).
var walNumShards int var walNumShards int
// walStagingWg tracks WALStaging goroutine exits for shutdown synchronization.
var walStagingWg sync.WaitGroup
// walShuttingDown is set before closing shard channels to prevent
// SendWALMessage from sending on a closed channel (which panics in Go).
var walShuttingDown atomic.Bool
// WALMessage represents a single metric write to be appended to the WAL. // WALMessage represents a single metric write to be appended to the WAL.
// Cluster and Node are NOT stored in the WAL record (inferred from file path). // Cluster and Node are NOT stored in the WAL record (inferred from file path).
type WALMessage struct { type WALMessage struct {
@@ -113,8 +121,14 @@ type walRotateReq struct {
type walFileState struct { type walFileState struct {
f *os.File f *os.File
w *bufio.Writer w *bufio.Writer
dirty bool
} }
// walFlushInterval controls how often dirty WAL files are flushed to disk.
// Decoupling flushes from message processing lets the consumer run at memory
// speed, amortizing syscall overhead across many writes.
const walFlushInterval = 5 * time.Second
// walShardIndex computes which shard a message belongs to based on cluster+node. // walShardIndex computes which shard a message belongs to based on cluster+node.
// Uses FNV-1a hash for fast, well-distributed mapping. // Uses FNV-1a hash for fast, well-distributed mapping.
func walShardIndex(cluster, node string) int { func walShardIndex(cluster, node string) int {
@@ -126,9 +140,9 @@ func walShardIndex(cluster, node string) int {
} }
// SendWALMessage routes a WAL message to the appropriate shard channel. // SendWALMessage routes a WAL message to the appropriate shard channel.
// Returns false if the channel is full (message dropped). // Returns false if the channel is full or shutdown is in progress.
func SendWALMessage(msg *WALMessage) bool { func SendWALMessage(msg *WALMessage) bool {
if walShardChs == nil { if walShardChs == nil || walShuttingDown.Load() {
return false return false
} }
shard := walShardIndex(msg.Cluster, msg.Node) shard := walShardIndex(msg.Cluster, msg.Node)
@@ -164,7 +178,9 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
msgCh := walShardChs[i] msgCh := walShardChs[i]
rotateCh := walShardRotateChs[i] rotateCh := walShardRotateChs[i]
walStagingWg.Add(1)
wg.Go(func() { wg.Go(func() {
defer walStagingWg.Done()
hostFiles := make(map[string]*walFileState) hostFiles := make(map[string]*walFileState)
defer func() { defer func() {
@@ -222,6 +238,7 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
if err := writeWALRecordDirect(ws.w, msg); err != nil { if err := writeWALRecordDirect(ws.w, msg); err != nil {
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err) cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
} }
ws.dirty = true
} }
processRotate := func(req walRotateReq) { processRotate := func(req walRotateReq) {
@@ -238,58 +255,57 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
close(req.done) close(req.done)
} }
flushAll := func() { flushDirty := func() {
for _, ws := range hostFiles { for _, ws := range hostFiles {
if ws.f != nil { if ws.dirty {
ws.w.Flush() ws.w.Flush()
ws.dirty = false
} }
} }
} }
drain := func() { ticker := time.NewTicker(walFlushInterval)
for { defer ticker.Stop()
// drainBatch processes up to 4096 pending messages without blocking.
// Returns false if the channel was closed.
drainBatch := func() bool {
for range 4096 {
select { select {
case msg, ok := <-msgCh: case msg, ok := <-msgCh:
if !ok { if !ok {
return flushDirty()
return false
} }
processMsg(msg) processMsg(msg)
case req := <-rotateCh: case req := <-rotateCh:
processRotate(req) processRotate(req)
default: default:
flushAll() return true
return
} }
} }
return true
} }
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
drain() // On shutdown, skip draining buffered messages — a full binary
// checkpoint will be written from in-memory state, making
// buffered WAL records redundant.
flushDirty()
return return
case msg, ok := <-msgCh: case msg, ok := <-msgCh:
if !ok { if !ok {
return return
} }
processMsg(msg) processMsg(msg)
if !drainBatch() {
// Drain up to 256 more messages without blocking to batch writes.
for range 256 {
select {
case msg, ok := <-msgCh:
if !ok {
return return
} }
processMsg(msg) // No flush here — timer handles periodic flushing.
case req := <-rotateCh: case <-ticker.C:
processRotate(req) flushDirty()
default:
goto flushed
}
}
flushed:
flushAll()
case req := <-rotateCh: case req := <-rotateCh:
processRotate(req) processRotate(req)
} }
@@ -298,23 +314,42 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
} }
} }
// WaitForWALStagingDrain blocks until all WALStaging goroutines have exited.
// Must be called after closing walShardChs to ensure all file handles are
// flushed and closed before checkpoint writes begin.
func WaitForWALStagingDrain() {
walStagingWg.Wait()
}
// RotateWALFiles sends rotation requests for the given host directories // RotateWALFiles sends rotation requests for the given host directories
// and blocks until all rotations complete. Each request is routed to the // and blocks until all rotations complete. Each request is routed to the
// shard that owns the host directory. // shard that owns the host directory.
//
// If shutdown is in progress (WAL staging goroutines may have exited),
// rotation is skipped to avoid deadlocking on abandoned channels.
func RotateWALFiles(hostDirs []string) { func RotateWALFiles(hostDirs []string) {
if walShardRotateChs == nil { if walShardRotateChs == nil || walShuttingDown.Load() {
return return
} }
dones := make([]chan struct{}, len(hostDirs)) dones := make([]chan struct{}, 0, len(hostDirs))
for i, dir := range hostDirs { for _, dir := range hostDirs {
dones[i] = make(chan struct{}) done := make(chan struct{})
// Extract cluster/node from hostDir to find the right shard.
// hostDir = rootDir/cluster/node
shard := walShardIndexFromDir(dir) shard := walShardIndexFromDir(dir)
walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: dones[i]} select {
case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}:
dones = append(dones, done)
default:
// Channel full or goroutine not consuming — skip this host.
cclog.Warnf("[METRICSTORE]> WAL rotation skipped for %s (channel full)", dir)
}
} }
for _, done := range dones { for _, done := range dones {
<-done select {
case <-done:
case <-time.After(30 * time.Second):
cclog.Warn("[METRICSTORE]> WAL rotation timed out, continuing")
return
}
} }
} }
@@ -338,141 +373,64 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
} }
} }
// writeWALRecordDirect encodes a WAL record directly into the bufio.Writer, // writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
// avoiding heap allocations by using a stack-allocated scratch buffer for // then writes it to the bufio.Writer in a single call. This prevents partial
// the fixed-size header/trailer and computing CRC inline. // records in the write buffer if a write error occurs mid-record (e.g. disk full).
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error { func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
// Compute payload size. // Compute payload and total record size.
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4 payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
for _, s := range msg.Selector { for _, s := range msg.Selector {
payloadSize += 1 + len(s) payloadSize += 1 + len(s)
} }
// Total: 8 (header) + payload + 4 (CRC).
totalSize := 8 + payloadSize + 4
// Write magic + payload length (8 bytes header). // Use stack buffer for typical small records, heap-allocate only for large ones.
var hdr [8]byte var stackBuf [256]byte
binary.LittleEndian.PutUint32(hdr[0:4], walRecordMagic) var buf []byte
binary.LittleEndian.PutUint32(hdr[4:8], uint32(payloadSize)) if totalSize <= len(stackBuf) {
if _, err := w.Write(hdr[:]); err != nil { buf = stackBuf[:totalSize]
return err } else {
buf = make([]byte, totalSize)
} }
// We need to compute CRC over the payload as we write it. // Header: magic + payload length.
crc := crc32.NewIEEE() binary.LittleEndian.PutUint32(buf[0:4], walRecordMagic)
binary.LittleEndian.PutUint32(buf[4:8], uint32(payloadSize))
// Payload starts at offset 8.
p := 8
// Timestamp (8 bytes). // Timestamp (8 bytes).
var scratch [8]byte binary.LittleEndian.PutUint64(buf[p:p+8], uint64(msg.Timestamp))
binary.LittleEndian.PutUint64(scratch[:8], uint64(msg.Timestamp)) p += 8
crc.Write(scratch[:8])
if _, err := w.Write(scratch[:8]); err != nil {
return err
}
// Metric name length (2 bytes) + metric name. // Metric name length (2 bytes) + metric name.
binary.LittleEndian.PutUint16(scratch[:2], uint16(len(msg.MetricName))) binary.LittleEndian.PutUint16(buf[p:p+2], uint16(len(msg.MetricName)))
crc.Write(scratch[:2]) p += 2
if _, err := w.Write(scratch[:2]); err != nil { p += copy(buf[p:], msg.MetricName)
return err
}
nameBytes := []byte(msg.MetricName)
crc.Write(nameBytes)
if _, err := w.Write(nameBytes); err != nil {
return err
}
// Selector count (1 byte). // Selector count (1 byte).
scratch[0] = byte(len(msg.Selector)) buf[p] = byte(len(msg.Selector))
crc.Write(scratch[:1]) p++
if _, err := w.Write(scratch[:1]); err != nil {
return err
}
// Selectors (1-byte length + bytes each). // Selectors (1-byte length + bytes each).
for _, sel := range msg.Selector { for _, sel := range msg.Selector {
scratch[0] = byte(len(sel)) buf[p] = byte(len(sel))
crc.Write(scratch[:1]) p++
if _, err := w.Write(scratch[:1]); err != nil { p += copy(buf[p:], sel)
return err
}
selBytes := []byte(sel)
crc.Write(selBytes)
if _, err := w.Write(selBytes); err != nil {
return err
}
} }
// Value (4 bytes, float32 bits). // Value (4 bytes, float32 bits).
binary.LittleEndian.PutUint32(scratch[:4], math.Float32bits(float32(msg.Value))) binary.LittleEndian.PutUint32(buf[p:p+4], math.Float32bits(float32(msg.Value)))
crc.Write(scratch[:4]) p += 4
if _, err := w.Write(scratch[:4]); err != nil {
return err
}
// CRC32 (4 bytes). // CRC32 over payload (bytes 8..8+payloadSize).
binary.LittleEndian.PutUint32(scratch[:4], crc.Sum32()) crc := crc32.ChecksumIEEE(buf[8 : 8+payloadSize])
_, err := w.Write(scratch[:4]) binary.LittleEndian.PutUint32(buf[p:p+4], crc)
return err
}
// buildWALPayload encodes a WALMessage into a binary payload (without magic/length/CRC). // Single atomic write to the buffered writer.
func buildWALPayload(msg *WALMessage) []byte { _, err := w.Write(buf)
size := 8 + 2 + len(msg.MetricName) + 1 + 4
for _, s := range msg.Selector {
size += 1 + len(s)
}
buf := make([]byte, 0, size)
// Timestamp (8 bytes, little-endian int64)
var ts [8]byte
binary.LittleEndian.PutUint64(ts[:], uint64(msg.Timestamp))
buf = append(buf, ts[:]...)
// Metric name (2-byte length prefix + bytes)
var mLen [2]byte
binary.LittleEndian.PutUint16(mLen[:], uint16(len(msg.MetricName)))
buf = append(buf, mLen[:]...)
buf = append(buf, msg.MetricName...)
// Selector count (1 byte)
buf = append(buf, byte(len(msg.Selector)))
// Selectors (1-byte length prefix + bytes each)
for _, sel := range msg.Selector {
buf = append(buf, byte(len(sel)))
buf = append(buf, sel...)
}
// Value (4 bytes, float32 bit representation)
var val [4]byte
binary.LittleEndian.PutUint32(val[:], math.Float32bits(float32(msg.Value)))
buf = append(buf, val[:]...)
return buf
}
// writeWALRecord appends a binary WAL record to the writer.
// Format: [4B magic][4B payload_len][payload][4B CRC32]
func writeWALRecord(w io.Writer, msg *WALMessage) error {
payload := buildWALPayload(msg)
crc := crc32.ChecksumIEEE(payload)
record := make([]byte, 0, 4+4+len(payload)+4)
var magic [4]byte
binary.LittleEndian.PutUint32(magic[:], walRecordMagic)
record = append(record, magic[:]...)
var pLen [4]byte
binary.LittleEndian.PutUint32(pLen[:], uint32(len(payload)))
record = append(record, pLen[:]...)
record = append(record, payload...)
var crcBytes [4]byte
binary.LittleEndian.PutUint32(crcBytes[:], crc)
record = append(record, crcBytes[:]...)
_, err := w.Write(record)
return err return err
} }
@@ -697,7 +655,10 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
selector []string selector []string
} }
n, errs := int32(0), int32(0) totalWork := len(levels)
cclog.Infof("[METRICSTORE]> Starting binary checkpoint for %d hosts with %d workers", totalWork, Keys.NumWorkers)
n, errs, completed := int32(0), int32(0), int32(0)
var successDirs []string var successDirs []string
var successMu sync.Mutex var successMu sync.Mutex
@@ -705,6 +666,22 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
wg.Add(Keys.NumWorkers) wg.Add(Keys.NumWorkers)
work := make(chan workItem, Keys.NumWorkers*2) work := make(chan workItem, Keys.NumWorkers*2)
// Progress logging goroutine.
stopProgress := make(chan struct{})
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cclog.Infof("[METRICSTORE]> Checkpoint progress: %d/%d hosts (%d written, %d errors)",
atomic.LoadInt32(&completed), totalWork, atomic.LoadInt32(&n), atomic.LoadInt32(&errs))
case <-stopProgress:
return
}
}
}()
for range Keys.NumWorkers { for range Keys.NumWorkers {
go func() { go func() {
defer wg.Done() defer wg.Done()
@@ -712,16 +689,23 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
err := wi.level.toCheckpointBinary(wi.hostDir, from, to, m) err := wi.level.toCheckpointBinary(wi.hostDir, from, to, m)
if err != nil { if err != nil {
if err == ErrNoNewArchiveData { if err == ErrNoNewArchiveData {
atomic.AddInt32(&completed, 1)
continue continue
} }
cclog.Errorf("[METRICSTORE]> binary checkpoint error for %s: %v", wi.hostDir, err) cclog.Errorf("[METRICSTORE]> binary checkpoint error for %s: %v", wi.hostDir, err)
atomic.AddInt32(&errs, 1) atomic.AddInt32(&errs, 1)
} else { } else {
atomic.AddInt32(&n, 1) atomic.AddInt32(&n, 1)
// Delete WAL immediately after successful snapshot.
walPath := path.Join(wi.hostDir, "current.wal")
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err)
}
successMu.Lock() successMu.Lock()
successDirs = append(successDirs, wi.hostDir) successDirs = append(successDirs, wi.hostDir)
successMu.Unlock() successMu.Unlock()
} }
atomic.AddInt32(&completed, 1)
} }
}() }()
} }
@@ -736,6 +720,7 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
} }
close(work) close(work)
wg.Wait() wg.Wait()
close(stopProgress)
if errs > 0 { if errs > 0 {
return int(n), successDirs, fmt.Errorf("[METRICSTORE]> %d errors during binary checkpoint (%d successes)", errs, n) return int(n), successDirs, fmt.Errorf("[METRICSTORE]> %d errors during binary checkpoint (%d successes)", errs, n)

View File

@@ -0,0 +1,381 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
// binaryCheckpointReader reads .wal or .bin checkpoint files produced by the
// metricstore WAL/snapshot system and dumps their contents to a human-readable
// .txt file (same name as input, with .txt extension).
//
// Usage:
//
// go run ./tools/binaryCheckpointReader <file.wal|file.bin>
package main
import (
"bufio"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"math"
"os"
"path/filepath"
"strings"
"time"
)
// Magic numbers matching metricstore/walCheckpoint.go.
const (
walFileMagic = uint32(0xCC1DA701)
walRecordMagic = uint32(0xCC1DA7A1)
snapFileMagic = uint32(0xCC5B0001)
)
func main() {
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: %s <file.wal|file.bin>\n", os.Args[0])
os.Exit(1)
}
inputPath := os.Args[1]
ext := strings.ToLower(filepath.Ext(inputPath))
if ext != ".wal" && ext != ".bin" {
fmt.Fprintf(os.Stderr, "Error: file must have .wal or .bin extension, got %q\n", ext)
os.Exit(1)
}
f, err := os.Open(inputPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Error opening %s: %v\n", inputPath, err)
os.Exit(1)
}
defer f.Close()
// Output file: replace extension with .txt
outputPath := strings.TrimSuffix(inputPath, filepath.Ext(inputPath)) + ".txt"
out, err := os.Create(outputPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating output %s: %v\n", outputPath, err)
os.Exit(1)
}
defer out.Close()
w := bufio.NewWriter(out)
defer w.Flush()
switch ext {
case ".wal":
err = dumpWAL(f, w)
case ".bin":
err = dumpBinarySnapshot(f, w)
}
if err != nil {
fmt.Fprintf(os.Stderr, "Error reading %s: %v\n", inputPath, err)
os.Exit(1)
}
w.Flush()
fmt.Printf("Output written to %s\n", outputPath)
}
// ---------- WAL reader ----------
func dumpWAL(f *os.File, w *bufio.Writer) error {
br := bufio.NewReader(f)
// Read and verify file header magic.
var fileMagic uint32
if err := binary.Read(br, binary.LittleEndian, &fileMagic); err != nil {
if err == io.EOF {
fmt.Fprintln(w, "WAL file is empty (0 bytes).")
return nil
}
return fmt.Errorf("read file header: %w", err)
}
if fileMagic != walFileMagic {
return fmt.Errorf("invalid WAL file magic 0x%08X (expected 0x%08X)", fileMagic, walFileMagic)
}
fmt.Fprintf(w, "=== WAL File Dump ===\n")
fmt.Fprintf(w, "File: %s\n", f.Name())
fmt.Fprintf(w, "File Magic: 0x%08X (valid)\n\n", fileMagic)
recordNum := 0
for {
msg, err := readWALRecord(br)
if err != nil {
fmt.Fprintf(w, "--- Record #%d: ERROR ---\n", recordNum+1)
fmt.Fprintf(w, " Error: %v\n", err)
fmt.Fprintf(w, " (stopping replay — likely truncated trailing record)\n\n")
break
}
if msg == nil {
break // Clean EOF
}
recordNum++
ts := time.Unix(msg.Timestamp, 0).UTC()
fmt.Fprintf(w, "--- Record #%d ---\n", recordNum)
fmt.Fprintf(w, " Timestamp: %d (%s)\n", msg.Timestamp, ts.Format(time.RFC3339))
fmt.Fprintf(w, " Metric: %s\n", msg.MetricName)
if len(msg.Selector) > 0 {
fmt.Fprintf(w, " Selectors: [%s]\n", strings.Join(msg.Selector, ", "))
} else {
fmt.Fprintf(w, " Selectors: (none)\n")
}
fmt.Fprintf(w, " Value: %g\n\n", msg.Value)
}
fmt.Fprintf(w, "=== Total valid records: %d ===\n", recordNum)
return nil
}
type walMessage struct {
MetricName string
Selector []string
Value float32
Timestamp int64
}
func readWALRecord(r io.Reader) (*walMessage, error) {
var magic uint32
if err := binary.Read(r, binary.LittleEndian, &magic); err != nil {
if err == io.EOF {
return nil, nil
}
return nil, fmt.Errorf("read record magic: %w", err)
}
if magic != walRecordMagic {
return nil, fmt.Errorf("invalid record magic 0x%08X (expected 0x%08X)", magic, walRecordMagic)
}
var payloadLen uint32
if err := binary.Read(r, binary.LittleEndian, &payloadLen); err != nil {
return nil, fmt.Errorf("read payload length: %w", err)
}
if payloadLen > 1<<20 {
return nil, fmt.Errorf("record payload too large: %d bytes", payloadLen)
}
payload := make([]byte, payloadLen)
if _, err := io.ReadFull(r, payload); err != nil {
return nil, fmt.Errorf("read payload: %w", err)
}
var storedCRC uint32
if err := binary.Read(r, binary.LittleEndian, &storedCRC); err != nil {
return nil, fmt.Errorf("read CRC: %w", err)
}
if crc32.ChecksumIEEE(payload) != storedCRC {
return nil, fmt.Errorf("CRC mismatch (truncated write or corruption)")
}
return parseWALPayload(payload)
}
func parseWALPayload(payload []byte) (*walMessage, error) {
if len(payload) < 8+2+1+4 {
return nil, fmt.Errorf("payload too short: %d bytes", len(payload))
}
offset := 0
// Timestamp (8 bytes).
ts := int64(binary.LittleEndian.Uint64(payload[offset : offset+8]))
offset += 8
// Metric name (2-byte length + bytes).
if offset+2 > len(payload) {
return nil, fmt.Errorf("metric name length overflows payload")
}
mLen := int(binary.LittleEndian.Uint16(payload[offset : offset+2]))
offset += 2
if offset+mLen > len(payload) {
return nil, fmt.Errorf("metric name overflows payload")
}
metricName := string(payload[offset : offset+mLen])
offset += mLen
// Selector count (1 byte).
if offset >= len(payload) {
return nil, fmt.Errorf("selector count overflows payload")
}
selCount := int(payload[offset])
offset++
selectors := make([]string, selCount)
for i := range selCount {
if offset >= len(payload) {
return nil, fmt.Errorf("selector[%d] length overflows payload", i)
}
sLen := int(payload[offset])
offset++
if offset+sLen > len(payload) {
return nil, fmt.Errorf("selector[%d] data overflows payload", i)
}
selectors[i] = string(payload[offset : offset+sLen])
offset += sLen
}
// Value (4 bytes, float32 bits).
if offset+4 > len(payload) {
return nil, fmt.Errorf("value overflows payload")
}
bits := binary.LittleEndian.Uint32(payload[offset : offset+4])
value := math.Float32frombits(bits)
return &walMessage{
MetricName: metricName,
Timestamp: ts,
Selector: selectors,
Value: value,
}, nil
}
// ---------- Binary snapshot reader ----------
func dumpBinarySnapshot(f *os.File, w *bufio.Writer) error {
br := bufio.NewReader(f)
var magic uint32
if err := binary.Read(br, binary.LittleEndian, &magic); err != nil {
return fmt.Errorf("read magic: %w", err)
}
if magic != snapFileMagic {
return fmt.Errorf("invalid snapshot magic 0x%08X (expected 0x%08X)", magic, snapFileMagic)
}
var from, to int64
if err := binary.Read(br, binary.LittleEndian, &from); err != nil {
return fmt.Errorf("read from: %w", err)
}
if err := binary.Read(br, binary.LittleEndian, &to); err != nil {
return fmt.Errorf("read to: %w", err)
}
fromTime := time.Unix(from, 0).UTC()
toTime := time.Unix(to, 0).UTC()
fmt.Fprintf(w, "=== Binary Snapshot Dump ===\n")
fmt.Fprintf(w, "File: %s\n", f.Name())
fmt.Fprintf(w, "Magic: 0x%08X (valid)\n", magic)
fmt.Fprintf(w, "From: %d (%s)\n", from, fromTime.Format(time.RFC3339))
fmt.Fprintf(w, "To: %d (%s)\n\n", to, toTime.Format(time.RFC3339))
return dumpBinaryLevel(br, w, 0)
}
func dumpBinaryLevel(r io.Reader, w *bufio.Writer, depth int) error {
indent := strings.Repeat(" ", depth)
var numMetrics uint32
if err := binary.Read(r, binary.LittleEndian, &numMetrics); err != nil {
return fmt.Errorf("read num_metrics: %w", err)
}
if numMetrics > 0 {
fmt.Fprintf(w, "%sMetrics (%d):\n", indent, numMetrics)
}
for i := range numMetrics {
name, err := readString16(r)
if err != nil {
return fmt.Errorf("read metric name [%d]: %w", i, err)
}
var freq, start int64
if err := binary.Read(r, binary.LittleEndian, &freq); err != nil {
return fmt.Errorf("read frequency for %s: %w", name, err)
}
if err := binary.Read(r, binary.LittleEndian, &start); err != nil {
return fmt.Errorf("read start for %s: %w", name, err)
}
var numValues uint32
if err := binary.Read(r, binary.LittleEndian, &numValues); err != nil {
return fmt.Errorf("read num_values for %s: %w", name, err)
}
startTime := time.Unix(start, 0).UTC()
fmt.Fprintf(w, "%s [%s]\n", indent, name)
fmt.Fprintf(w, "%s Frequency: %d s\n", indent, freq)
fmt.Fprintf(w, "%s Start: %d (%s)\n", indent, start, startTime.Format(time.RFC3339))
fmt.Fprintf(w, "%s Values (%d):", indent, numValues)
if numValues == 0 {
fmt.Fprintln(w, " (none)")
} else {
fmt.Fprintln(w)
// Print values in rows of 10 for readability.
for j := range numValues {
var bits uint32
if err := binary.Read(r, binary.LittleEndian, &bits); err != nil {
return fmt.Errorf("read value[%d] for %s: %w", j, name, err)
}
val := math.Float32frombits(bits)
if j%10 == 0 {
if j > 0 {
fmt.Fprintln(w)
}
// Print the timestamp for this row's first value.
rowTS := start + int64(j)*freq
fmt.Fprintf(w, "%s [%s] ", indent, time.Unix(rowTS, 0).UTC().Format("15:04:05"))
}
if math.IsNaN(float64(val)) {
fmt.Fprintf(w, "NaN ")
} else {
fmt.Fprintf(w, "%g ", val)
}
}
fmt.Fprintln(w)
}
}
var numChildren uint32
if err := binary.Read(r, binary.LittleEndian, &numChildren); err != nil {
return fmt.Errorf("read num_children: %w", err)
}
if numChildren > 0 {
fmt.Fprintf(w, "%sChildren (%d):\n", indent, numChildren)
}
for i := range numChildren {
childName, err := readString16(r)
if err != nil {
return fmt.Errorf("read child name [%d]: %w", i, err)
}
fmt.Fprintf(w, "%s [%s]\n", indent, childName)
if err := dumpBinaryLevel(r, w, depth+2); err != nil {
return fmt.Errorf("read child %s: %w", childName, err)
}
}
return nil
}
func readString16(r io.Reader) (string, error) {
var sLen uint16
if err := binary.Read(r, binary.LittleEndian, &sLen); err != nil {
return "", err
}
buf := make([]byte, sLen)
if _, err := io.ReadFull(r, buf); err != nil {
return "", err
}
return string(buf), nil
}

View File

@@ -1,12 +1,12 @@
{ {
"name": "cc-frontend", "name": "cc-frontend",
"version": "1.5.2", "version": "1.5.3",
"lockfileVersion": 4, "lockfileVersion": 4,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "cc-frontend", "name": "cc-frontend",
"version": "1.5.2", "version": "1.5.3",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"@rollup/plugin-replace": "^6.0.3", "@rollup/plugin-replace": "^6.0.3",

View File

@@ -1,6 +1,6 @@
{ {
"name": "cc-frontend", "name": "cc-frontend",
"version": "1.5.2", "version": "1.5.3",
"license": "MIT", "license": "MIT",
"scripts": { "scripts": {
"build": "rollup -c", "build": "rollup -c",

View File

@@ -54,11 +54,16 @@
const paging = { itemsPerPage: 50, page: 1 }; const paging = { itemsPerPage: 50, page: 1 };
const sorting = { field: "startTime", type: "col", order: "DESC" }; const sorting = { field: "startTime", type: "col", order: "DESC" };
const nodeMetricsQuery = gql` const nodeMetricsQuery = gql`
query ($cluster: String!, $nodes: [String!], $from: Time!, $to: Time!) { query (
$cluster: String!,
$nodes: [String!],
$from: Time!,
$to: Time!,
$nodeFilter: [NodeFilter!]!,
$sorting: OrderByInput!
) {
nodeMetrics(cluster: $cluster, nodes: $nodes, from: $from, to: $to) { nodeMetrics(cluster: $cluster, nodes: $nodes, from: $from, to: $to) {
host host
nodeState
metricHealth
subCluster subCluster
metrics { metrics {
name name
@@ -79,6 +84,13 @@
} }
} }
} }
},
nodeStatus: nodes(filter: $nodeFilter, order: $sorting) {
count
items {
schedulerState
healthState
}
} }
} }
`; `;
@@ -146,6 +158,8 @@
nodes: [hostname], nodes: [hostname],
from: from?.toISOString(), from: from?.toISOString(),
to: to?.toISOString(), to: to?.toISOString(),
nodeFilter: { hostname: { eq: hostname }},
sorting // $sorting unused in backend: Use placeholder
}, },
}) })
); );
@@ -157,8 +171,8 @@
}) })
); );
const thisNodeState = $derived($nodeMetricsData?.data?.nodeMetrics[0]?.nodeState || 'notindb'); const thisNodeState = $derived($nodeMetricsData?.data?.nodeStatus?.items[0]?.schedulerState || 'notindb');
const thisMetricHealth = $derived($nodeMetricsData?.data?.nodeMetrics[0]?.metricHealth || 'unknown'); const thisMetricHealth = $derived($nodeMetricsData?.data?.nodeStatus?.items[0]?.healthState || 'unknown');
</script> </script>
<Row cols={{ xs: 2, lg: 3}}> <Row cols={{ xs: 2, lg: 3}}>

View File

@@ -166,12 +166,12 @@
items.push({ project: { [filters.projectMatch]: filters.project } }); items.push({ project: { [filters.projectMatch]: filters.project } });
if (filters.user) if (filters.user)
items.push({ user: { [filters.userMatch]: filters.user } }); items.push({ user: { [filters.userMatch]: filters.user } });
if (filters.numNodes.from != null || filters.numNodes.to != null) { if (filters.numNodes.from != null && filters.numNodes.to != null) {
items.push({ items.push({
numNodes: { from: filters.numNodes.from, to: filters.numNodes.to }, numNodes: { from: filters.numNodes.from, to: filters.numNodes.to },
}); });
} }
if (filters.numAccelerators.from != null || filters.numAccelerators.to != null) { if (filters.numAccelerators.from != null && filters.numAccelerators.to != null) {
items.push({ items.push({
numAccelerators: { numAccelerators: {
from: filters.numAccelerators.from, from: filters.numAccelerators.from,
@@ -179,7 +179,7 @@
}, },
}); });
} }
if (filters.numHWThreads.from != null || filters.numHWThreads.to != null) { if (filters.numHWThreads.from != null && filters.numHWThreads.to != null) {
items.push({ items.push({
numHWThreads: { numHWThreads: {
from: filters.numHWThreads.from, from: filters.numHWThreads.from,
@@ -206,14 +206,21 @@
items.push({ duration: { to: filters.duration.lessThan, from: 0 } }); items.push({ duration: { to: filters.duration.lessThan, from: 0 } });
if (filters.duration.moreThan) if (filters.duration.moreThan)
items.push({ duration: { to: 0, from: filters.duration.moreThan } }); items.push({ duration: { to: 0, from: filters.duration.moreThan } });
if (filters.energy.from != null || filters.energy.to != null) if (filters.energy.from != null && filters.energy.to != null)
items.push({ items.push({
energy: { from: filters.energy.from, to: filters.energy.to }, energy: { from: filters.energy.from, to: filters.energy.to },
}); });
if (filters.jobId) if (filters.jobId)
items.push({ jobId: { [filters.jobIdMatch]: filters.jobId } }); items.push({ jobId: { [filters.jobIdMatch]: filters.jobId } });
if (filters.stats.length != 0) if (filters.stats.length != 0) {
items.push({ metricStats: filters.stats.map((st) => { return { metricName: st.field, range: { from: st.from, to: st.to }} }) }); const metricStats = [];
filters.stats.forEach((st) => {
if (st.from != null && st.to != null)
metricStats.push({ metricName: st.field, range: { from: st.from, to: st.to }});
});
if (metricStats.length != 0)
items.push({metricStats})
};
if (filters.node) items.push({ node: { [filters.nodeMatch]: filters.node } }); if (filters.node) items.push({ node: { [filters.nodeMatch]: filters.node } });
if (filters.jobName) items.push({ jobName: { contains: filters.jobName } }); if (filters.jobName) items.push({ jobName: { contains: filters.jobName } });
if (filters.schedule) items.push({ schedule: filters.schedule }); if (filters.schedule) items.push({ schedule: filters.schedule });
@@ -280,40 +287,40 @@
opts.push(`duration=morethan-${filters.duration.moreThan}`); opts.push(`duration=morethan-${filters.duration.moreThan}`);
if (filters.tags.length != 0) if (filters.tags.length != 0)
for (let tag of filters.tags) opts.push(`tag=${tag}`); for (let tag of filters.tags) opts.push(`tag=${tag}`);
if (filters.numNodes.from > 1 && filters.numNodes.to > 0) if (filters.numNodes.from > 0 && filters.numNodes.to > 0)
opts.push(`numNodes=${filters.numNodes.from}-${filters.numNodes.to}`); opts.push(`numNodes=${filters.numNodes.from}-${filters.numNodes.to}`);
else if (filters.numNodes.from > 1 && filters.numNodes.to == 0) else if (filters.numNodes.from > 0 && filters.numNodes.to == 0)
opts.push(`numNodes=morethan-${filters.numNodes.from}`); opts.push(`numNodes=morethan-${filters.numNodes.from}`);
else if (filters.numNodes.from == 1 && filters.numNodes.to > 0) else if (filters.numNodes.from == 0 && filters.numNodes.to > 0)
opts.push(`numNodes=lessthan-${filters.numNodes.to}`); opts.push(`numNodes=lessthan-${filters.numNodes.to}`);
if (filters.numHWThreads.from > 1 && filters.numHWThreads.to > 0) if (filters.numHWThreads.from > 0 && filters.numHWThreads.to > 0)
opts.push(`numHWThreads=${filters.numHWThreads.from}-${filters.numHWThreads.to}`); opts.push(`numHWThreads=${filters.numHWThreads.from}-${filters.numHWThreads.to}`);
else if (filters.numHWThreads.from > 1 && filters.numHWThreads.to == 0) else if (filters.numHWThreads.from > 0 && filters.numHWThreads.to == 0)
opts.push(`numHWThreads=morethan-${filters.numHWThreads.from}`); opts.push(`numHWThreads=morethan-${filters.numHWThreads.from}`);
else if (filters.numHWThreads.from == 1 && filters.numHWThreads.to > 0) else if (filters.numHWThreads.from == 0 && filters.numHWThreads.to > 0)
opts.push(`numHWThreads=lessthan-${filters.numHWThreads.to}`); opts.push(`numHWThreads=lessthan-${filters.numHWThreads.to}`);
if (filters.numAccelerators.from && filters.numAccelerators.to) if (filters.numAccelerators.from > 0 && filters.numAccelerators.to > 0)
opts.push(`numAccelerators=${filters.numAccelerators.from}-${filters.numAccelerators.to}`); opts.push(`numAccelerators=${filters.numAccelerators.from}-${filters.numAccelerators.to}`);
else if (filters.numAccelerators.from > 1 && filters.numAccelerators.to == 0) else if (filters.numAccelerators.from > 0 && filters.numAccelerators.to == 0)
opts.push(`numAccelerators=morethan-${filters.numAccelerators.from}`); opts.push(`numAccelerators=morethan-${filters.numAccelerators.from}`);
else if (filters.numAccelerators.from == 1 && filters.numAccelerators.to > 0) else if (filters.numAccelerators.from == 0 && filters.numAccelerators.to > 0)
opts.push(`numAccelerators=lessthan-${filters.numAccelerators.to}`); opts.push(`numAccelerators=lessthan-${filters.numAccelerators.to}`);
if (filters.node) opts.push(`node=${filters.node}`); if (filters.node) opts.push(`node=${filters.node}`);
if (filters.node && filters.nodeMatch != "eq") // "eq" is default-case if (filters.node && filters.nodeMatch != "eq") // "eq" is default-case
opts.push(`nodeMatch=${filters.nodeMatch}`); opts.push(`nodeMatch=${filters.nodeMatch}`);
if (filters.energy.from > 1 && filters.energy.to > 0) if (filters.energy.from > 0 && filters.energy.to > 0)
opts.push(`energy=${filters.energy.from}-${filters.energy.to}`); opts.push(`energy=${filters.energy.from}-${filters.energy.to}`);
else if (filters.energy.from > 1 && filters.energy.to == 0) else if (filters.energy.from > 0 && filters.energy.to == 0)
opts.push(`energy=morethan-${filters.energy.from}`); opts.push(`energy=morethan-${filters.energy.from}`);
else if (filters.energy.from == 1 && filters.energy.to > 0) else if (filters.energy.from == 0 && filters.energy.to > 0)
opts.push(`energy=lessthan-${filters.energy.to}`); opts.push(`energy=lessthan-${filters.energy.to}`);
if (filters.stats.length > 0) if (filters.stats.length > 0)
for (let stat of filters.stats) { for (let stat of filters.stats) {
if (stat.from > 1 && stat.to > 0) if (stat.from > 0 && stat.to > 0)
opts.push(`stat=${stat.field}-${stat.from}-${stat.to}`); opts.push(`stat=${stat.field}-${stat.from}-${stat.to}`);
else if (stat.from > 1 && stat.to == 0) else if (stat.from > 0 && stat.to == 0)
opts.push(`stat=${stat.field}-morethan-${stat.from}`); opts.push(`stat=${stat.field}-morethan-${stat.from}`);
else if (stat.from == 1 && stat.to > 0) else if (stat.from == 0 && stat.to > 0)
opts.push(`stat=${stat.field}-lessthan-${stat.to}`); opts.push(`stat=${stat.field}-lessthan-${stat.to}`);
} }
// Build && Return // Build && Return
@@ -511,43 +518,43 @@
</Info> </Info>
{/if} {/if}
{#if filters.numNodes.from > 1 && filters.numNodes.to > 0} {#if filters.numNodes.from > 0 && filters.numNodes.to > 0}
<Info icon="hdd-stack" onclick={() => (isResourcesOpen = true)}> <Info icon="hdd-stack" onclick={() => (isResourcesOpen = true)}>
Nodes: {filters.numNodes.from} - {filters.numNodes.to} Nodes: {filters.numNodes.from} - {filters.numNodes.to}
</Info> </Info>
{:else if filters.numNodes.from > 1 && filters.numNodes.to == 0} {:else if filters.numNodes.from > 0 && filters.numNodes.to == 0}
<Info icon="hdd-stack" onclick={() => (isResourcesOpen = true)}> <Info icon="hdd-stack" onclick={() => (isResourcesOpen = true)}>
&nbsp;&ge;&nbsp;{filters.numNodes.from} Node(s) &nbsp;&ge;&nbsp;{filters.numNodes.from} Node(s)
</Info> </Info>
{:else if filters.numNodes.from == 1 && filters.numNodes.to > 0} {:else if filters.numNodes.from == 0 && filters.numNodes.to > 0}
<Info icon="hdd-stack" onclick={() => (isResourcesOpen = true)}> <Info icon="hdd-stack" onclick={() => (isResourcesOpen = true)}>
&nbsp;&le;&nbsp;{filters.numNodes.to} Node(s) &nbsp;&le;&nbsp;{filters.numNodes.to} Node(s)
</Info> </Info>
{/if} {/if}
{#if filters.numHWThreads.from > 1 && filters.numHWThreads.to > 0} {#if filters.numHWThreads.from > 0 && filters.numHWThreads.to > 0}
<Info icon="cpu" onclick={() => (isResourcesOpen = true)}> <Info icon="cpu" onclick={() => (isResourcesOpen = true)}>
HWThreads: {filters.numHWThreads.from} - {filters.numHWThreads.to} HWThreads: {filters.numHWThreads.from} - {filters.numHWThreads.to}
</Info> </Info>
{:else if filters.numHWThreads.from > 1 && filters.numHWThreads.to == 0} {:else if filters.numHWThreads.from > 0 && filters.numHWThreads.to == 0}
<Info icon="cpu" onclick={() => (isResourcesOpen = true)}> <Info icon="cpu" onclick={() => (isResourcesOpen = true)}>
&nbsp;&ge;&nbsp;{filters.numHWThreads.from} HWThread(s) &nbsp;&ge;&nbsp;{filters.numHWThreads.from} HWThread(s)
</Info> </Info>
{:else if filters.numHWThreads.from == 1 && filters.numHWThreads.to > 0} {:else if filters.numHWThreads.from == 0 && filters.numHWThreads.to > 0}
<Info icon="cpu" onclick={() => (isResourcesOpen = true)}> <Info icon="cpu" onclick={() => (isResourcesOpen = true)}>
&nbsp;&le;&nbsp;{filters.numHWThreads.to} HWThread(s) &nbsp;&le;&nbsp;{filters.numHWThreads.to} HWThread(s)
</Info> </Info>
{/if} {/if}
{#if filters.numAccelerators.from > 1 && filters.numAccelerators.to > 0} {#if filters.numAccelerators.from > 0 && filters.numAccelerators.to > 0}
<Info icon="gpu-card" onclick={() => (isResourcesOpen = true)}> <Info icon="gpu-card" onclick={() => (isResourcesOpen = true)}>
Accelerators: {filters.numAccelerators.from} - {filters.numAccelerators.to} Accelerators: {filters.numAccelerators.from} - {filters.numAccelerators.to}
</Info> </Info>
{:else if filters.numAccelerators.from > 1 && filters.numAccelerators.to == 0} {:else if filters.numAccelerators.from > 0 && filters.numAccelerators.to == 0}
<Info icon="gpu-card" onclick={() => (isResourcesOpen = true)}> <Info icon="gpu-card" onclick={() => (isResourcesOpen = true)}>
&nbsp;&ge;&nbsp;{filters.numAccelerators.from} Acc(s) &nbsp;&ge;&nbsp;{filters.numAccelerators.from} Acc(s)
</Info> </Info>
{:else if filters.numAccelerators.from == 1 && filters.numAccelerators.to > 0} {:else if filters.numAccelerators.from == 0 && filters.numAccelerators.to > 0}
<Info icon="gpu-card" onclick={() => (isResourcesOpen = true)}> <Info icon="gpu-card" onclick={() => (isResourcesOpen = true)}>
&nbsp;&le;&nbsp;{filters.numAccelerators.to} Acc(s) &nbsp;&le;&nbsp;{filters.numAccelerators.to} Acc(s)
</Info> </Info>
@@ -559,15 +566,15 @@
</Info> </Info>
{/if} {/if}
{#if filters.energy.from > 1 && filters.energy.to > 0} {#if filters.energy.from > 0 && filters.energy.to > 0}
<Info icon="lightning-charge-fill" onclick={() => (isEnergyOpen = true)}> <Info icon="lightning-charge-fill" onclick={() => (isEnergyOpen = true)}>
Total Energy: {filters.energy.from} - {filters.energy.to} kWh Total Energy: {filters.energy.from} - {filters.energy.to} kWh
</Info> </Info>
{:else if filters.energy.from > 1 && filters.energy.to == 0} {:else if filters.energy.from > 0 && filters.energy.to == 0}
<Info icon="lightning-charge-fill" onclick={() => (isEnergyOpen = true)}> <Info icon="lightning-charge-fill" onclick={() => (isEnergyOpen = true)}>
Total Energy &ge;&nbsp;{filters.energy.from} kWh Total Energy &ge;&nbsp;{filters.energy.from} kWh
</Info> </Info>
{:else if filters.energy.from == 1 && filters.energy.to > 0} {:else if filters.energy.from == 0 && filters.energy.to > 0}
<Info icon="lightning-charge-fill" onclick={() => (isEnergyOpen = true)}> <Info icon="lightning-charge-fill" onclick={() => (isEnergyOpen = true)}>
Total Energy &le;&nbsp;{filters.energy.to} kWh Total Energy &le;&nbsp;{filters.energy.to} kWh
</Info> </Info>
@@ -575,15 +582,15 @@
{#if filters.stats.length > 0} {#if filters.stats.length > 0}
{#each filters.stats as stat} {#each filters.stats as stat}
{#if stat.from > 1 && stat.to > 0} {#if stat.from > 0 && stat.to > 0}
<Info icon="bar-chart" onclick={() => (isStatsOpen = true)}> <Info icon="bar-chart" onclick={() => (isStatsOpen = true)}>
{stat.field}: {stat.from} - {stat.to} {stat.unit} {stat.field}: {stat.from} - {stat.to} {stat.unit}
</Info>&thinsp; </Info>&thinsp;
{:else if stat.from > 1 && stat.to == 0} {:else if stat.from > 0 && stat.to == 0}
<Info icon="bar-chart" onclick={() => (isStatsOpen = true)}> <Info icon="bar-chart" onclick={() => (isStatsOpen = true)}>
{stat.field} &ge;&nbsp;{stat.from} {stat.unit} {stat.field} &ge;&nbsp;{stat.from} {stat.unit}
</Info>&thinsp; </Info>&thinsp;
{:else if stat.from == 1 && stat.to > 0} {:else if stat.from == 0 && stat.to > 0}
<Info icon="bar-chart" onclick={() => (isStatsOpen = true)}> <Info icon="bar-chart" onclick={() => (isStatsOpen = true)}>
{stat.field} &le;&nbsp;{stat.to} {stat.unit} {stat.field} &le;&nbsp;{stat.to} {stat.unit}
</Info>&thinsp; </Info>&thinsp;

View File

@@ -28,31 +28,29 @@
} = $props(); } = $props();
/* Const */ /* Const */
const minEnergyPreset = 1; const minEnergyPreset = 0;
const maxEnergyPreset = 100; const maxEnergyPreset = 100;
/* Derived */ /* Derived */
// Pending // Pending
let pendingEnergyState = $derived({ let pendingEnergyState = $derived({
from: presetEnergy?.from ? presetEnergy.from : minEnergyPreset, from: presetEnergy?.from || minEnergyPreset,
to: !(presetEnergy.to == null || presetEnergy.to == 0) ? presetEnergy.to : maxEnergyPreset, to: (presetEnergy.to == 0) ? null : presetEnergy.to,
}); });
// Changable // Changable
let energyState = $derived({ let energyState = $derived({
from: presetEnergy?.from ? presetEnergy.from : minEnergyPreset, from: presetEnergy?.from || minEnergyPreset,
to: !(presetEnergy.to == null || presetEnergy.to == 0) ? presetEnergy.to : maxEnergyPreset, to: (presetEnergy.to == 0) ? null : presetEnergy.to,
}); });
const energyActive = $derived(!(JSON.stringify(energyState) === JSON.stringify({ from: minEnergyPreset, to: maxEnergyPreset }))); const energyActive = $derived(!(JSON.stringify(energyState) === JSON.stringify({ from: minEnergyPreset, to: null })));
// Block Apply if null
const disableApply = $derived(energyState.from === null || energyState.to === null);
/* Function */ /* Function */
function setEnergy() { function setEnergy() {
if (energyActive) { if (energyActive) {
pendingEnergyState = { pendingEnergyState = {
from: energyState.from, from: (!energyState?.from) ? 0 : energyState.from,
to: (energyState.to == maxEnergyPreset) ? 0 : energyState.to to: (energyState.to === null) ? 0 : energyState.to
}; };
} else { } else {
pendingEnergyState = { from: null, to: null}; pendingEnergyState = { from: null, to: null};
@@ -86,7 +84,6 @@
<ModalFooter> <ModalFooter>
<Button <Button
color="primary" color="primary"
disabled={disableApply}
onclick={() => { onclick={() => {
isOpen = false; isOpen = false;
setEnergy(); setEnergy();

View File

@@ -98,44 +98,38 @@
// Pending // Pending
let pendingNumNodes = $derived({ let pendingNumNodes = $derived({
from: presetNumNodes.from, from: presetNumNodes.from,
to: (presetNumNodes.to == 0) ? maxNumNodes : presetNumNodes.to to: (presetNumNodes.to == 0) ? null : presetNumNodes.to
}); });
let pendingNumHWThreads = $derived({ let pendingNumHWThreads = $derived({
from: presetNumHWThreads.from, from: presetNumHWThreads.from,
to: (presetNumHWThreads.to == 0) ? maxNumHWThreads : presetNumHWThreads.to to: (presetNumHWThreads.to == 0) ? null : presetNumHWThreads.to
}); });
let pendingNumAccelerators = $derived({ let pendingNumAccelerators = $derived({
from: presetNumAccelerators.from, from: presetNumAccelerators.from,
to: (presetNumAccelerators.to == 0) ? maxNumAccelerators : presetNumAccelerators.to to: (presetNumAccelerators.to == 0) ? null : presetNumAccelerators.to
}); });
let pendingNamedNode = $derived(presetNamedNode); let pendingNamedNode = $derived(presetNamedNode);
let pendingNodeMatch = $derived(presetNodeMatch); let pendingNodeMatch = $derived(presetNodeMatch);
// Changable States // Changable States
let nodesState = $derived({ let nodesState = $derived({
from: presetNumNodes.from, from: presetNumNodes?.from || 0,
to: (presetNumNodes.to == 0) ? maxNumNodes : presetNumNodes.to to: (presetNumNodes.to == 0) ? null : presetNumNodes.to
}); });
let threadState = $derived({ let threadState = $derived({
from: presetNumHWThreads.from, from: presetNumHWThreads?.from || 0,
to: (presetNumHWThreads.to == 0) ? maxNumHWThreads : presetNumHWThreads.to to: (presetNumHWThreads.to == 0) ? null : presetNumHWThreads.to
}); });
let accState = $derived({ let accState = $derived({
from: presetNumAccelerators.from, from: presetNumAccelerators?.from || 0,
to: (presetNumAccelerators.to == 0) ? maxNumAccelerators : presetNumAccelerators.to to: (presetNumAccelerators.to == 0) ? null : presetNumAccelerators.to
}); });
const initialized = $derived(getContext("initialized") || false); const initialized = $derived(getContext("initialized") || false);
const clusterInfos = $derived($initialized ? getContext("clusters") : null); const clusterInfos = $derived($initialized ? getContext("clusters") : null);
// Is Selection Active // Is Selection Active
const nodesActive = $derived(!(JSON.stringify(nodesState) === JSON.stringify({ from: 1, to: maxNumNodes }))); const nodesActive = $derived(!(JSON.stringify(nodesState) === JSON.stringify({ from: 0, to: null })));
const threadActive = $derived(!(JSON.stringify(threadState) === JSON.stringify({ from: 1, to: maxNumHWThreads }))); const threadActive = $derived(!(JSON.stringify(threadState) === JSON.stringify({ from: 0, to: null })));
const accActive = $derived(!(JSON.stringify(accState) === JSON.stringify({ from: 1, to: maxNumAccelerators }))); const accActive = $derived(!(JSON.stringify(accState) === JSON.stringify({ from: 0, to: null })));
// Block Apply if null
const disableApply = $derived(
nodesState.from === null || nodesState.to === null ||
threadState.from === null || threadState.to === null ||
accState.from === null || accState.to === null
);
/* Reactive Effects | Svelte 5 onMount */ /* Reactive Effects | Svelte 5 onMount */
$effect(() => { $effect(() => {
@@ -153,58 +147,28 @@
} }
}); });
$effect(() => {
if (
$initialized &&
pendingNumNodes.from == null &&
pendingNumNodes.to == null
) {
nodesState = { from: 1, to: maxNumNodes };
}
});
$effect(() => {
if (
$initialized &&
pendingNumHWThreads.from == null &&
pendingNumHWThreads.to == null
) {
threadState = { from: 1, to: maxNumHWThreads };
}
});
$effect(() => {
if (
$initialized &&
pendingNumAccelerators.from == null &&
pendingNumAccelerators.to == null
) {
accState = { from: 1, to: maxNumAccelerators };
}
});
/* Functions */ /* Functions */
function setResources() { function setResources() {
if (nodesActive) { if (nodesActive) {
pendingNumNodes = { pendingNumNodes = {
from: nodesState.from, from: (!nodesState?.from) ? 0 : nodesState.from,
to: (nodesState.to == maxNumNodes) ? 0 : nodesState.to to: (nodesState.to === null) ? 0 : nodesState.to
}; };
} else { } else {
pendingNumNodes = { from: null, to: null}; pendingNumNodes = { from: null, to: null};
}; };
if (threadActive) { if (threadActive) {
pendingNumHWThreads = { pendingNumHWThreads = {
from: threadState.from, from: (!threadState?.from) ? 0 : threadState.from,
to: (threadState.to == maxNumHWThreads) ? 0 : threadState.to to: (threadState.to === null) ? 0 : threadState.to
}; };
} else { } else {
pendingNumHWThreads = { from: null, to: null}; pendingNumHWThreads = { from: null, to: null};
}; };
if (accActive) { if (accActive) {
pendingNumAccelerators = { pendingNumAccelerators = {
from: accState.from, from: (!accState?.from) ? 0 : accState.from,
to: (accState.to == maxNumAccelerators) ? 0 : accState.to to: (accState.to === null) ? 0 : accState.to
}; };
} else { } else {
pendingNumAccelerators = { from: null, to: null}; pendingNumAccelerators = { from: null, to: null};
@@ -249,7 +213,7 @@
nodesState.from = detail[0]; nodesState.from = detail[0];
nodesState.to = detail[1]; nodesState.to = detail[1];
}} }}
sliderMin={1} sliderMin={0}
sliderMax={maxNumNodes} sliderMax={maxNumNodes}
fromPreset={nodesState.from} fromPreset={nodesState.from}
toPreset={nodesState.to} toPreset={nodesState.to}
@@ -269,7 +233,7 @@
threadState.from = detail[0]; threadState.from = detail[0];
threadState.to = detail[1]; threadState.to = detail[1];
}} }}
sliderMin={1} sliderMin={0}
sliderMax={maxNumHWThreads} sliderMax={maxNumHWThreads}
fromPreset={threadState.from} fromPreset={threadState.from}
toPreset={threadState.to} toPreset={threadState.to}
@@ -289,7 +253,7 @@
accState.from = detail[0]; accState.from = detail[0];
accState.to = detail[1]; accState.to = detail[1];
}} }}
sliderMin={1} sliderMin={0}
sliderMax={maxNumAccelerators} sliderMax={maxNumAccelerators}
fromPreset={accState.from} fromPreset={accState.from}
toPreset={accState.to} toPreset={accState.to}
@@ -300,7 +264,6 @@
<ModalFooter> <ModalFooter>
<Button <Button
color="primary" color="primary"
disabled={disableApply}
onclick={() => { onclick={() => {
isOpen = false; isOpen = false;
setResources(); setResources();

View File

@@ -34,7 +34,8 @@
function setRanges() { function setRanges() {
for (let as of availableStats) { for (let as of availableStats) {
if (as.enabled) { if (as.enabled) {
as.to = (as.to == as.peak) ? 0 : as.to as.from = (!as?.from) ? 0 : as.from,
as.to = (as.to == null) ? 0 : as.to
} }
}; };
} }
@@ -42,8 +43,8 @@
function resetRanges() { function resetRanges() {
for (let as of availableStats) { for (let as of availableStats) {
as.enabled = false as.enabled = false
as.from = 1 as.from = null
as.to = as.peak as.to = null
}; };
} }
</script> </script>
@@ -66,13 +67,13 @@
changeRange={(detail) => { changeRange={(detail) => {
aStat.from = detail[0]; aStat.from = detail[0];
aStat.to = detail[1]; aStat.to = detail[1];
if (aStat.from == 1 && aStat.to == aStat.peak) { if (aStat.from == 0 && aStat.to === null) {
aStat.enabled = false; aStat.enabled = false;
} else { } else {
aStat.enabled = true; aStat.enabled = true;
} }
}} }}
sliderMin={1} sliderMin={0}
sliderMax={aStat.peak} sliderMax={aStat.peak}
fromPreset={aStat.from} fromPreset={aStat.from}
toPreset={aStat.to} toPreset={aStat.to}

View File

@@ -21,7 +21,7 @@
let { let {
sliderMin, sliderMin,
sliderMax, sliderMax,
fromPreset = 1, fromPreset = 0,
toPreset = 100, toPreset = 100,
changeRange changeRange
} = $props(); } = $props();
@@ -33,9 +33,9 @@
/* Derived */ /* Derived */
let pendingValues = $derived([fromPreset, toPreset]); let pendingValues = $derived([fromPreset, toPreset]);
let sliderFrom = $derived(Math.max(((fromPreset == null ? sliderMin : fromPreset) - sliderMin) / (sliderMax - sliderMin), 0.)); let sliderFrom = $derived(Math.max(((fromPreset == null ? sliderMin : fromPreset) - sliderMin) / (sliderMax - sliderMin), 0.));
let sliderTo = $derived(Math.min(((toPreset == null ? sliderMin : toPreset) - sliderMin) / (sliderMax - sliderMin), 1.)); let sliderTo = $derived(Math.min(((toPreset == null ? sliderMax : toPreset) - sliderMin) / (sliderMax - sliderMin), 1.));
let inputFieldFrom = $derived(fromPreset ? fromPreset.toString() : null); let inputFieldFrom = $derived(fromPreset != null ? fromPreset.toString() : null);
let inputFieldTo = $derived(toPreset ? toPreset.toString() : null); let inputFieldTo = $derived(toPreset != null ? toPreset.toString() : null);
/* Var Init */ /* Var Init */
let timeoutId = null; let timeoutId = null;
@@ -79,11 +79,15 @@
evt.preventDefault() evt.preventDefault()
evt.stopPropagation() evt.stopPropagation()
const newV = Number.parseInt(evt.target.value); const newV = Number.parseInt(evt.target.value);
const newP = clamp((newV - sliderMin) / (sliderMax - sliderMin), 0., 1.) const newP = clamp((newV - sliderMin) / (sliderMax - sliderMin), 0., 1., target)
updateStates(newV, newP, target); updateStates(newV, newP, target);
} }
function clamp(x, testMin, testMax) { function clamp(x, testMin, testMax, target) {
if (isNaN(x)) {
if (target == 'from') return testMin
else if (target == 'to') return testMax
} else {
return x < testMin return x < testMin
? testMin ? testMin
: (x > testMax : (x > testMax
@@ -91,6 +95,7 @@
: x : x
); );
} }
}
function draggable(node) { function draggable(node) {
let x; let x;
@@ -159,23 +164,23 @@
<div class="double-range-container"> <div class="double-range-container">
<div class="header"> <div class="header">
<input class="form-control" type="text" placeholder="from..." value={inputFieldFrom} <input class="form-control" type="text" placeholder={`${sliderMin} ...`} value={inputFieldFrom}
oninput={(e) => { oninput={(e) => {
inputChanged(e, 'from'); inputChanged(e, 'from');
}} }}
/> />
{#if inputFieldFrom != sliderMin?.toString() && inputFieldTo != sliderMax?.toString() } {#if (inputFieldFrom && inputFieldFrom != sliderMin?.toString()) && inputFieldTo != null }
<span>Selected: Range <b> {inputFieldFrom} </b> - <b> {inputFieldTo} </b></span> <span>Selected: Range <b> {inputFieldFrom} </b> - <b> {inputFieldTo} </b></span>
{:else if inputFieldFrom != sliderMin?.toString() && inputFieldTo == sliderMax?.toString() } {:else if (inputFieldFrom && inputFieldFrom != sliderMin?.toString()) && inputFieldTo == null }
<span>Selected: More than <b> {inputFieldFrom} </b> </span> <span>Selected: More Than Equal <b> {inputFieldFrom} </b> </span>
{:else if inputFieldFrom == sliderMin?.toString() && inputFieldTo != sliderMax?.toString() } {:else if (!inputFieldFrom || inputFieldFrom == sliderMin?.toString()) && inputFieldTo != null }
<span>Selected: Less than <b> {inputFieldTo} </b></span> <span>Selected: Less Than Equal <b> {inputFieldTo} </b></span>
{:else} {:else}
<span><i>No Selection</i></span> <span><i>No Selection</i></span>
{/if} {/if}
<input class="form-control" type="text" placeholder="to..." value={inputFieldTo} <input class="form-control" type="text" placeholder={`... ${sliderMax} ...`} value={inputFieldTo}
oninput={(e) => { oninput={(e) => {
inputChanged(e, 'to'); inputChanged(e, 'to');
}} }}

View File

@@ -347,8 +347,8 @@ export function getStatsItems(presetStats = []) {
field: presetEntry.field, field: presetEntry.field,
text: `${gm.name} (${gm.footprint})`, text: `${gm.name} (${gm.footprint})`,
metric: gm.name, metric: gm.name,
from: presetEntry.from, from: presetEntry?.from || 0,
to: (presetEntry.to == 0) ? mc.peak : presetEntry.to, to: (presetEntry.to == 0) ? null : presetEntry.to,
peak: mc.peak, peak: mc.peak,
enabled: true, enabled: true,
unit: `${gm?.unit?.prefix ? gm.unit.prefix : ''}${gm.unit.base}` unit: `${gm?.unit?.prefix ? gm.unit.prefix : ''}${gm.unit.base}`
@@ -358,8 +358,8 @@ export function getStatsItems(presetStats = []) {
field: `${gm.name}_${gm.footprint}`, field: `${gm.name}_${gm.footprint}`,
text: `${gm.name} (${gm.footprint})`, text: `${gm.name} (${gm.footprint})`,
metric: gm.name, metric: gm.name,
from: 1, from: 0,
to: mc.peak, to: null,
peak: mc.peak, peak: mc.peak,
enabled: false, enabled: false,
unit: `${gm?.unit?.prefix ? gm.unit.prefix : ''}${gm.unit.base}` unit: `${gm?.unit?.prefix ? gm.unit.prefix : ''}${gm.unit.base}`