mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-27 18:17:29 +01:00
Compare commits
36 Commits
feature/52
...
fix/checkp
| Author | SHA1 | Date | |
|---|---|---|---|
|
0ce2fa2fbe
|
|||
|
|
71fc9efec7 | ||
|
|
6e97ac8b28 | ||
| 97d65a9e5c | |||
| e759810051 | |||
| b1884fda9d | |||
| c267501a1b | |||
| a550344f13 | |||
|
|
bd7125a52e | ||
| 93a9d732a4 | |||
| 6f7dda53ee | |||
| 0325d9e866 | |||
| 3d94b0bf79 | |||
|
|
d5ea2b4cf5 | ||
| 45f329e5fb | |||
|
|
100dd7dacf | ||
| 192c94a78d | |||
| e41d1251ba | |||
| 586c902044 | |||
| 01ec70baa8 | |||
|
|
97330ce598 | ||
| fb176c5afb | |||
|
|
d4ee937115 | ||
| 999d93efc3 | |||
|
|
4ce0cfb686 | ||
| 359962d166 | |||
| 60554896d5 | |||
|
|
a9f335d910 | ||
| bf48389aeb | |||
|
|
676025adfe | ||
|
|
d4a0ae173f | ||
|
|
a7e5ecaf6c | ||
|
|
965e2007fb | ||
|
|
6a29faf460 | ||
|
|
8751ae023d | ||
|
|
128c098865 |
@@ -5,6 +5,7 @@ before:
|
|||||||
builds:
|
builds:
|
||||||
- env:
|
- env:
|
||||||
- CGO_ENABLED=1
|
- CGO_ENABLED=1
|
||||||
|
- CC=x86_64-linux-musl-gcc
|
||||||
goos:
|
goos:
|
||||||
- linux
|
- linux
|
||||||
goarch:
|
goarch:
|
||||||
|
|||||||
2
Makefile
2
Makefile
@@ -1,6 +1,6 @@
|
|||||||
TARGET = ./cc-backend
|
TARGET = ./cc-backend
|
||||||
FRONTEND = ./web/frontend
|
FRONTEND = ./web/frontend
|
||||||
VERSION = 1.5.2
|
VERSION = 1.5.3
|
||||||
GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development')
|
GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development')
|
||||||
CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S")
|
CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S")
|
||||||
LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}'
|
LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}'
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
# `cc-backend` version 1.5.2
|
# `cc-backend` version 1.5.3
|
||||||
|
|
||||||
Supports job archive version 3 and database version 11.
|
Supports job archive version 3 and database version 11.
|
||||||
|
|
||||||
@@ -10,7 +10,51 @@ If you are upgrading from v1.5.0 you need to do another DB migration. This
|
|||||||
should not take long. For optimal database performance after the migration it is
|
should not take long. For optimal database performance after the migration it is
|
||||||
recommended to apply the new `optimize-db` flag, which runs the sqlite `ANALYZE`
|
recommended to apply the new `optimize-db` flag, which runs the sqlite `ANALYZE`
|
||||||
and `VACUUM` commands. Depending on your database size (more then 40GB) the
|
and `VACUUM` commands. Depending on your database size (more then 40GB) the
|
||||||
`VACUUM` may take up to 2h.
|
`VACUUM` may take up to 2h. You can also run the `ANALYZE` command manually.
|
||||||
|
While we are confident that the memory issue with the metricstore cleanup move
|
||||||
|
policy is fixed, it is still recommended to use delete policy for cleanup.
|
||||||
|
This is also the default.
|
||||||
|
|
||||||
|
## Changes in 1.5.3
|
||||||
|
|
||||||
|
### Bug fixes
|
||||||
|
|
||||||
|
- **Doubleranged filter fixes**: Range filters now correctly handle zero as a
|
||||||
|
boundary value. Improved validation and UI text for "more than equal" and
|
||||||
|
"less than equal" range selections.
|
||||||
|
- **Lineprotocol body parsing interrupted**: Switched from `ReadTimeout` to
|
||||||
|
`ReadHeaderTimeout` so that long-running metric submissions are no longer
|
||||||
|
cut off mid-stream.
|
||||||
|
- **Checkpoint archiving continues on error**: A single cluster's archiving
|
||||||
|
failure no longer aborts the entire cleanup operation. Errors are collected
|
||||||
|
and reported per cluster.
|
||||||
|
- **Parquet row group overflow**: Added periodic flush during checkpoint
|
||||||
|
archiving to prevent exceeding the parquet-go 32k column-write limit.
|
||||||
|
- **Removed metrics excluded from subcluster config**: Metrics removed from a
|
||||||
|
subcluster are no longer returned by `GetMetricConfigSubCluster`.
|
||||||
|
|
||||||
|
### MetricStore performance
|
||||||
|
|
||||||
|
- **WAL writer throughput**: Decoupled WAL file flushing from message processing
|
||||||
|
using a periodic 5-second batch flush (up to 4096 messages per cycle),
|
||||||
|
significantly increasing metric ingestion throughput.
|
||||||
|
- **Improved shutdown time**: HTTP shutdown timeout reduced; metricstore and
|
||||||
|
archiver now shut down concurrently. Overall shutdown deadline raised to
|
||||||
|
60 seconds.
|
||||||
|
|
||||||
|
### New features
|
||||||
|
|
||||||
|
- **Manual checkpoint cleanup flag**: New `-cleanup-checkpoints` CLI flag
|
||||||
|
triggers checkpoint cleanup without starting the server, useful for
|
||||||
|
maintenance windows or automated cleanup scripts.
|
||||||
|
- **Explicit node state queries in node view**: Node health and scheduler state
|
||||||
|
are now fetched independently from metric data for fresher status information.
|
||||||
|
|
||||||
|
### Logging improvements
|
||||||
|
|
||||||
|
- **Reduced tagger log noise**: Missing metrics and expression evaluation errors
|
||||||
|
in the job classification tagger are now logged at debug level instead of
|
||||||
|
error level.
|
||||||
|
|
||||||
## Changes in 1.5.2
|
## Changes in 1.5.2
|
||||||
|
|
||||||
@@ -19,6 +63,14 @@ and `VACUUM` commands. Depending on your database size (more then 40GB) the
|
|||||||
- **Memory spike in parquet writer**: Fixed memory spikes when using the
|
- **Memory spike in parquet writer**: Fixed memory spikes when using the
|
||||||
metricstore move (archive) policy with the parquet writer. The writer now
|
metricstore move (archive) policy with the parquet writer. The writer now
|
||||||
processes data in a streaming fashion to avoid accumulating large allocations.
|
processes data in a streaming fashion to avoid accumulating large allocations.
|
||||||
|
- **Top list query fixes**: Fixed top list queries in analysis and dashboard
|
||||||
|
views.
|
||||||
|
- **Exclude down nodes from HealthCheck**: Down nodes are now excluded from
|
||||||
|
health checks in both the REST and NATS handlers.
|
||||||
|
- **Node state priority order**: Node state determination now enforces a
|
||||||
|
priority order. Exception: idle+down results in idle.
|
||||||
|
- **Blocking ReceiveNats call**: Fixed a blocking NATS receive call in the
|
||||||
|
metricstore.
|
||||||
|
|
||||||
### Database performance
|
### Database performance
|
||||||
|
|
||||||
@@ -33,6 +85,16 @@ and `VACUUM` commands. Depending on your database size (more then 40GB) the
|
|||||||
write load.
|
write load.
|
||||||
- **Increased default SQLite timeout**: The default SQLite connection timeout
|
- **Increased default SQLite timeout**: The default SQLite connection timeout
|
||||||
has been raised to reduce spurious timeout errors under load.
|
has been raised to reduce spurious timeout errors under load.
|
||||||
|
- **Optimized stats queries**: Improved sortby handling in stats queries, fixed
|
||||||
|
cache key passing, and simplified a stats query condition that caused an
|
||||||
|
expensive unnecessary subquery.
|
||||||
|
|
||||||
|
### MetricStore performance
|
||||||
|
|
||||||
|
- **Sharded WAL consumer**: The WAL consumer is now sharded for significantly
|
||||||
|
higher write throughput.
|
||||||
|
- **NATS contention fix**: Fixed contention in the metricstore NATS ingestion
|
||||||
|
path.
|
||||||
|
|
||||||
### NATS API
|
### NATS API
|
||||||
|
|
||||||
@@ -52,6 +114,24 @@ and `VACUUM` commands. Depending on your database size (more then 40GB) the
|
|||||||
operation.
|
operation.
|
||||||
- **Checkpoint archiving log**: Added an informational log message when the
|
- **Checkpoint archiving log**: Added an informational log message when the
|
||||||
metricstore checkpoint archiving process runs.
|
metricstore checkpoint archiving process runs.
|
||||||
|
- **Auth failure context**: Auth failure log messages now include more context
|
||||||
|
information.
|
||||||
|
|
||||||
|
### Behavior changes
|
||||||
|
|
||||||
|
- **DB-based metricHealth**: Replaced heuristic-based metric health with
|
||||||
|
DB-based metric health for the node view, providing more accurate health
|
||||||
|
status information.
|
||||||
|
- **Removed minRunningFor filter remnants**: Cleaned up remaining `minRunningFor`
|
||||||
|
references from the GraphQL schema and query builder.
|
||||||
|
|
||||||
|
### Frontend
|
||||||
|
|
||||||
|
- **Streamlined statsSeries**: Unified stats series calculation and rendering
|
||||||
|
across plot components.
|
||||||
|
- **Clarified plot titles**: Improved titles in dashboard and health views.
|
||||||
|
- **Bumped frontend dependencies**: Updated frontend dependencies to latest
|
||||||
|
versions.
|
||||||
|
|
||||||
### Dependencies
|
### Dependencies
|
||||||
|
|
||||||
@@ -67,7 +147,7 @@ and `VACUUM` commands. Depending on your database size (more then 40GB) the
|
|||||||
running has to be allowed to execute the journalctl command.
|
running has to be allowed to execute the journalctl command.
|
||||||
- The user configuration keys for the ui have changed. Therefore old user
|
- The user configuration keys for the ui have changed. Therefore old user
|
||||||
configuration persisted in the database is not used anymore. It is recommended
|
configuration persisted in the database is not used anymore. It is recommended
|
||||||
to configure the metrics shown in the ui-config sestion and remove all records
|
to configure the metrics shown in the ui-config section and remove all records
|
||||||
in the table after the update.
|
in the table after the update.
|
||||||
- Currently energy footprint metrics of type energy are ignored for calculating
|
- Currently energy footprint metrics of type energy are ignored for calculating
|
||||||
total energy.
|
total energy.
|
||||||
|
|||||||
@@ -11,7 +11,8 @@ import "flag"
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB,
|
flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB,
|
||||||
flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags, flagOptimizeDB bool
|
flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags, flagOptimizeDB,
|
||||||
|
flagCleanupCheckpoints bool
|
||||||
flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string
|
flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -28,6 +29,7 @@ func cliInit() {
|
|||||||
flag.BoolVar(&flagApplyTags, "apply-tags", false, "Run taggers on all completed jobs and exit")
|
flag.BoolVar(&flagApplyTags, "apply-tags", false, "Run taggers on all completed jobs and exit")
|
||||||
flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit")
|
flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit")
|
||||||
flag.BoolVar(&flagOptimizeDB, "optimize-db", false, "Optimize database: run VACUUM to reclaim space, then ANALYZE to update query planner statistics")
|
flag.BoolVar(&flagOptimizeDB, "optimize-db", false, "Optimize database: run VACUUM to reclaim space, then ANALYZE to update query planner statistics")
|
||||||
|
flag.BoolVar(&flagCleanupCheckpoints, "cleanup-checkpoints", false, "Clean up old checkpoint files (delete or archive) based on retention settings, then exit")
|
||||||
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
|
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
|
||||||
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
|
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
|
||||||
flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: <username>:[admin,support,manager,api,user]:<password>")
|
flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: <username>:[admin,support,manager,api,user]:<password>")
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
goruntime "runtime"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -536,6 +537,43 @@ func run() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle checkpoint cleanup
|
||||||
|
if flagCleanupCheckpoints {
|
||||||
|
mscfg := ccconf.GetPackageConfig("metric-store")
|
||||||
|
if mscfg == nil {
|
||||||
|
return fmt.Errorf("metric-store configuration required for checkpoint cleanup")
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(mscfg, &metricstore.Keys); err != nil {
|
||||||
|
return fmt.Errorf("decoding metric-store config: %w", err)
|
||||||
|
}
|
||||||
|
if metricstore.Keys.NumWorkers <= 0 {
|
||||||
|
metricstore.Keys.NumWorkers = min(goruntime.NumCPU()/2+1, metricstore.DefaultMaxWorkers)
|
||||||
|
}
|
||||||
|
|
||||||
|
d, err := time.ParseDuration(metricstore.Keys.RetentionInMemory)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("parsing retention-in-memory: %w", err)
|
||||||
|
}
|
||||||
|
from := time.Now().Add(-d)
|
||||||
|
deleteMode := metricstore.Keys.Cleanup == nil || metricstore.Keys.Cleanup.Mode != "archive"
|
||||||
|
cleanupDir := ""
|
||||||
|
if !deleteMode {
|
||||||
|
cleanupDir = metricstore.Keys.Cleanup.RootDir
|
||||||
|
}
|
||||||
|
|
||||||
|
cclog.Infof("Cleaning up checkpoints older than %s...", from.Format(time.RFC3339))
|
||||||
|
n, err := metricstore.CleanupCheckpoints(
|
||||||
|
metricstore.Keys.Checkpoints.RootDir, cleanupDir, from.Unix(), deleteMode)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("checkpoint cleanup: %w", err)
|
||||||
|
}
|
||||||
|
if deleteMode {
|
||||||
|
cclog.Exitf("Cleanup done: %d checkpoint files deleted.", n)
|
||||||
|
} else {
|
||||||
|
cclog.Exitf("Cleanup done: %d checkpoint files archived to parquet.", n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Exit if start server is not requested
|
// Exit if start server is not requested
|
||||||
if !flagServer {
|
if !flagServer {
|
||||||
cclog.Exit("No errors, server flag not set. Exiting cc-backend.")
|
cclog.Exit("No errors, server flag not set. Exiting cc-backend.")
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/99designs/gqlgen/graphql"
|
"github.com/99designs/gqlgen/graphql"
|
||||||
@@ -344,20 +345,20 @@ func (s *Server) init() error {
|
|||||||
|
|
||||||
// Server timeout defaults (in seconds)
|
// Server timeout defaults (in seconds)
|
||||||
const (
|
const (
|
||||||
defaultReadTimeout = 20
|
defaultReadHeaderTimeout = 20
|
||||||
defaultWriteTimeout = 20
|
defaultWriteTimeout = 20
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Server) Start(ctx context.Context) error {
|
func (s *Server) Start(ctx context.Context) error {
|
||||||
// Use configurable timeouts with defaults
|
// Use configurable timeouts with defaults
|
||||||
readTimeout := time.Duration(defaultReadTimeout) * time.Second
|
readHeaderTimeout := time.Duration(defaultReadHeaderTimeout) * time.Second
|
||||||
writeTimeout := time.Duration(defaultWriteTimeout) * time.Second
|
writeTimeout := time.Duration(defaultWriteTimeout) * time.Second
|
||||||
|
|
||||||
s.server = &http.Server{
|
s.server = &http.Server{
|
||||||
ReadTimeout: readTimeout,
|
ReadHeaderTimeout: readHeaderTimeout,
|
||||||
WriteTimeout: writeTimeout,
|
WriteTimeout: writeTimeout,
|
||||||
Handler: s.router,
|
Handler: s.router,
|
||||||
Addr: config.Keys.Addr,
|
Addr: config.Keys.Addr,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start http or https server
|
// Start http or https server
|
||||||
@@ -399,16 +400,6 @@ func (s *Server) Start(ctx context.Context) error {
|
|||||||
return fmt.Errorf("dropping privileges: %w", err)
|
return fmt.Errorf("dropping privileges: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle context cancellation for graceful shutdown
|
|
||||||
go func() {
|
|
||||||
<-ctx.Done()
|
|
||||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
if err := s.server.Shutdown(shutdownCtx); err != nil {
|
|
||||||
cclog.Errorf("Server shutdown error: %v", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
if err = s.server.Serve(listener); err != nil && err != http.ErrServerClosed {
|
if err = s.server.Serve(listener); err != nil && err != http.ErrServerClosed {
|
||||||
return fmt.Errorf("server failed: %w", err)
|
return fmt.Errorf("server failed: %w", err)
|
||||||
}
|
}
|
||||||
@@ -416,29 +407,53 @@ func (s *Server) Start(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) Shutdown(ctx context.Context) {
|
func (s *Server) Shutdown(ctx context.Context) {
|
||||||
// Create a shutdown context with timeout
|
shutdownStart := time.Now()
|
||||||
shutdownCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
|
natsStart := time.Now()
|
||||||
nc := nats.GetClient()
|
nc := nats.GetClient()
|
||||||
if nc != nil {
|
if nc != nil {
|
||||||
nc.Close()
|
nc.Close()
|
||||||
}
|
}
|
||||||
|
cclog.Infof("Shutdown: NATS closed (%v)", time.Since(natsStart))
|
||||||
|
|
||||||
// First shut down the server gracefully (waiting for all ongoing requests)
|
httpStart := time.Now()
|
||||||
|
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
if err := s.server.Shutdown(shutdownCtx); err != nil {
|
if err := s.server.Shutdown(shutdownCtx); err != nil {
|
||||||
cclog.Errorf("Server shutdown error: %v", err)
|
cclog.Errorf("Server shutdown error: %v", err)
|
||||||
}
|
}
|
||||||
|
cclog.Infof("Shutdown: HTTP server stopped (%v)", time.Since(httpStart))
|
||||||
|
|
||||||
// Archive all the metric store data
|
// Run metricstore and archiver shutdown concurrently.
|
||||||
ms := metricstore.GetMemoryStore()
|
// They are independent: metricstore writes .bin snapshots,
|
||||||
|
// archiver flushes pending job archives.
|
||||||
|
storeStart := time.Now()
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(done)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
if ms != nil {
|
if ms := metricstore.GetMemoryStore(); ms != nil {
|
||||||
metricstore.Shutdown()
|
wg.Go(func() {
|
||||||
|
metricstore.Shutdown()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Go(func() {
|
||||||
|
if err := archiver.Shutdown(10 * time.Second); err != nil {
|
||||||
|
cclog.Warnf("Archiver shutdown: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
cclog.Infof("Shutdown: metricstore + archiver completed (%v)", time.Since(storeStart))
|
||||||
|
case <-time.After(60 * time.Second):
|
||||||
|
cclog.Warnf("Shutdown deadline exceeded after %v, forcing exit", time.Since(shutdownStart))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown archiver with 10 second timeout for fast shutdown
|
cclog.Infof("Shutdown: total time %v", time.Since(shutdownStart))
|
||||||
if err := archiver.Shutdown(10 * time.Second); err != nil {
|
|
||||||
cclog.Warnf("Archiver shutdown: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -4,10 +4,6 @@ github.com/99designs/gqlgen v0.17.88 h1:neMQDgehMwT1vYIOx/w5ZYPUU/iMNAJzRO44I5In
|
|||||||
github.com/99designs/gqlgen v0.17.88/go.mod h1:qeqYFEgOeSKqWedOjogPizimp2iu4E23bdPvl4jTYic=
|
github.com/99designs/gqlgen v0.17.88/go.mod h1:qeqYFEgOeSKqWedOjogPizimp2iu4E23bdPvl4jTYic=
|
||||||
github.com/Azure/go-ntlmssp v0.1.0 h1:DjFo6YtWzNqNvQdrwEyr/e4nhU3vRiwenz5QX7sFz+A=
|
github.com/Azure/go-ntlmssp v0.1.0 h1:DjFo6YtWzNqNvQdrwEyr/e4nhU3vRiwenz5QX7sFz+A=
|
||||||
github.com/Azure/go-ntlmssp v0.1.0/go.mod h1:NYqdhxd/8aAct/s4qSYZEerdPuH1liG2/X9DiVTbhpk=
|
github.com/Azure/go-ntlmssp v0.1.0/go.mod h1:NYqdhxd/8aAct/s4qSYZEerdPuH1liG2/X9DiVTbhpk=
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.8.2 h1:rCLZk8wz8yq8xBnBEdVKigvA2ngR8dPmHbEFwxxb3jw=
|
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.8.2/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
|
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.9.0 h1:mzUYakcjwb+UP5II4jOvr36rSYct90gXBbtUg+nvm9c=
|
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.9.0/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
|
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.9.1 h1:eplKhXQyGAElBGCEGdmxwj7fLv26Op16uK0KxUePDak=
|
github.com/ClusterCockpit/cc-lib/v2 v2.9.1 h1:eplKhXQyGAElBGCEGdmxwj7fLv26Op16uK0KxUePDak=
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.9.1/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
|
github.com/ClusterCockpit/cc-lib/v2 v2.9.1/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
|
||||||
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 h1:hIzxgTBWcmCIHtoDKDkSCsKCOCOwUC34sFsbD2wcW0Q=
|
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 h1:hIzxgTBWcmCIHtoDKDkSCsKCOCOwUC34sFsbD2wcW0Q=
|
||||||
|
|||||||
@@ -676,6 +676,11 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
|
|||||||
// Use request-scoped cache: multiple aliases with same (filter, groupBy)
|
// Use request-scoped cache: multiple aliases with same (filter, groupBy)
|
||||||
// but different sortBy/page hit the DB only once.
|
// but different sortBy/page hit the DB only once.
|
||||||
if cache := getStatsGroupCache(ctx); cache != nil {
|
if cache := getStatsGroupCache(ctx); cache != nil {
|
||||||
|
// Ensure the sort field is computed even if not in the GraphQL selection,
|
||||||
|
// because sortAndPageStats will sort by it in memory.
|
||||||
|
if sortBy != nil {
|
||||||
|
reqFields[sortByFieldName(*sortBy)] = true
|
||||||
|
}
|
||||||
key := statsCacheKey(filter, groupBy, reqFields)
|
key := statsCacheKey(filter, groupBy, reqFields)
|
||||||
var allStats []*model.JobsStatistics
|
var allStats []*model.JobsStatistics
|
||||||
allStats, err = cache.getOrCompute(key, func() ([]*model.JobsStatistics, error) {
|
allStats, err = cache.getOrCompute(key, func() ([]*model.JobsStatistics, error) {
|
||||||
|
|||||||
@@ -107,6 +107,33 @@ func sortAndPageStats(allStats []*model.JobsStatistics, sortBy *model.SortByAggr
|
|||||||
return sorted
|
return sorted
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sortByFieldName maps a SortByAggregate enum to the corresponding reqFields key.
|
||||||
|
// This ensures the DB computes the column that sortAndPageStats will sort by.
|
||||||
|
func sortByFieldName(sortBy model.SortByAggregate) string {
|
||||||
|
switch sortBy {
|
||||||
|
case model.SortByAggregateTotaljobs:
|
||||||
|
return "totalJobs"
|
||||||
|
case model.SortByAggregateTotalusers:
|
||||||
|
return "totalUsers"
|
||||||
|
case model.SortByAggregateTotalwalltime:
|
||||||
|
return "totalWalltime"
|
||||||
|
case model.SortByAggregateTotalnodes:
|
||||||
|
return "totalNodes"
|
||||||
|
case model.SortByAggregateTotalnodehours:
|
||||||
|
return "totalNodeHours"
|
||||||
|
case model.SortByAggregateTotalcores:
|
||||||
|
return "totalCores"
|
||||||
|
case model.SortByAggregateTotalcorehours:
|
||||||
|
return "totalCoreHours"
|
||||||
|
case model.SortByAggregateTotalaccs:
|
||||||
|
return "totalAccs"
|
||||||
|
case model.SortByAggregateTotalacchours:
|
||||||
|
return "totalAccHours"
|
||||||
|
default:
|
||||||
|
return "totalJobs"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// statsFieldGetter returns a function that extracts the sortable int field
|
// statsFieldGetter returns a function that extracts the sortable int field
|
||||||
// from a JobsStatistics struct for the given sort key.
|
// from a JobsStatistics struct for the given sort key.
|
||||||
func statsFieldGetter(sortBy model.SortByAggregate) func(*model.JobsStatistics) int {
|
func statsFieldGetter(sortBy model.SortByAggregate) func(*model.JobsStatistics) int {
|
||||||
|
|||||||
@@ -280,11 +280,11 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
|
|||||||
|
|
||||||
// buildIntCondition creates clauses for integer range filters, using BETWEEN only if required.
|
// buildIntCondition creates clauses for integer range filters, using BETWEEN only if required.
|
||||||
func buildIntCondition(field string, cond *config.IntRange, query sq.SelectBuilder) sq.SelectBuilder {
|
func buildIntCondition(field string, cond *config.IntRange, query sq.SelectBuilder) sq.SelectBuilder {
|
||||||
if cond.From != 1 && cond.To != 0 {
|
if cond.From > 0 && cond.To > 0 {
|
||||||
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
|
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
|
||||||
} else if cond.From != 1 && cond.To == 0 {
|
} else if cond.From > 0 && cond.To == 0 {
|
||||||
return query.Where(field+" >= ?", cond.From)
|
return query.Where(field+" >= ?", cond.From)
|
||||||
} else if cond.From == 1 && cond.To != 0 {
|
} else if cond.From == 0 && cond.To > 0 {
|
||||||
return query.Where(field+" <= ?", cond.To)
|
return query.Where(field+" <= ?", cond.To)
|
||||||
} else {
|
} else {
|
||||||
return query
|
return query
|
||||||
@@ -293,11 +293,11 @@ func buildIntCondition(field string, cond *config.IntRange, query sq.SelectBuild
|
|||||||
|
|
||||||
// buildFloatCondition creates a clauses for float range filters, using BETWEEN only if required.
|
// buildFloatCondition creates a clauses for float range filters, using BETWEEN only if required.
|
||||||
func buildFloatCondition(field string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder {
|
func buildFloatCondition(field string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder {
|
||||||
if cond.From != 1.0 && cond.To != 0.0 {
|
if cond.From > 0.0 && cond.To > 0.0 {
|
||||||
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
|
return query.Where(field+" BETWEEN ? AND ?", cond.From, cond.To)
|
||||||
} else if cond.From != 1.0 && cond.To == 0.0 {
|
} else if cond.From > 0.0 && cond.To == 0.0 {
|
||||||
return query.Where(field+" >= ?", cond.From)
|
return query.Where(field+" >= ?", cond.From)
|
||||||
} else if cond.From == 1.0 && cond.To != 0.0 {
|
} else if cond.From == 0.0 && cond.To > 0.0 {
|
||||||
return query.Where(field+" <= ?", cond.To)
|
return query.Where(field+" <= ?", cond.To)
|
||||||
} else {
|
} else {
|
||||||
return query
|
return query
|
||||||
@@ -339,11 +339,11 @@ func buildTimeCondition(field string, cond *config.TimeRange, query sq.SelectBui
|
|||||||
// buildFloatJSONCondition creates a filter on a numeric field within the footprint JSON column, using BETWEEN only if required.
|
// buildFloatJSONCondition creates a filter on a numeric field within the footprint JSON column, using BETWEEN only if required.
|
||||||
func buildFloatJSONCondition(jsonField string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder {
|
func buildFloatJSONCondition(jsonField string, cond *model.FloatRange, query sq.SelectBuilder) sq.SelectBuilder {
|
||||||
query = query.Where("JSON_VALID(footprint)")
|
query = query.Where("JSON_VALID(footprint)")
|
||||||
if cond.From != 1.0 && cond.To != 0.0 {
|
if cond.From > 0.0 && cond.To > 0.0 {
|
||||||
return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") BETWEEN ? AND ?", cond.From, cond.To)
|
return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") BETWEEN ? AND ?", cond.From, cond.To)
|
||||||
} else if cond.From != 1.0 && cond.To == 0.0 {
|
} else if cond.From > 0.0 && cond.To == 0.0 {
|
||||||
return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") >= ?", cond.From)
|
return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") >= ?", cond.From)
|
||||||
} else if cond.From == 1.0 && cond.To != 0.0 {
|
} else if cond.From == 0.0 && cond.To > 0.0 {
|
||||||
return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") <= ?", cond.To)
|
return query.Where("JSON_EXTRACT(footprint, \"$."+jsonField+"\") <= ?", cond.To)
|
||||||
} else {
|
} else {
|
||||||
return query
|
return query
|
||||||
|
|||||||
@@ -308,7 +308,7 @@ func buildFilterPresets(query url.Values) map[string]any {
|
|||||||
if parts[0] == "lessthan" {
|
if parts[0] == "lessthan" {
|
||||||
lt, lte := strconv.Atoi(parts[1])
|
lt, lte := strconv.Atoi(parts[1])
|
||||||
if lte == nil {
|
if lte == nil {
|
||||||
filterPresets["numNodes"] = map[string]int{"from": 1, "to": lt}
|
filterPresets["numNodes"] = map[string]int{"from": 0, "to": lt}
|
||||||
}
|
}
|
||||||
} else if parts[0] == "morethan" {
|
} else if parts[0] == "morethan" {
|
||||||
mt, mte := strconv.Atoi(parts[1])
|
mt, mte := strconv.Atoi(parts[1])
|
||||||
@@ -330,7 +330,7 @@ func buildFilterPresets(query url.Values) map[string]any {
|
|||||||
if parts[0] == "lessthan" {
|
if parts[0] == "lessthan" {
|
||||||
lt, lte := strconv.Atoi(parts[1])
|
lt, lte := strconv.Atoi(parts[1])
|
||||||
if lte == nil {
|
if lte == nil {
|
||||||
filterPresets["numHWThreads"] = map[string]int{"from": 1, "to": lt}
|
filterPresets["numHWThreads"] = map[string]int{"from": 0, "to": lt}
|
||||||
}
|
}
|
||||||
} else if parts[0] == "morethan" {
|
} else if parts[0] == "morethan" {
|
||||||
mt, mte := strconv.Atoi(parts[1])
|
mt, mte := strconv.Atoi(parts[1])
|
||||||
@@ -352,7 +352,7 @@ func buildFilterPresets(query url.Values) map[string]any {
|
|||||||
if parts[0] == "lessthan" {
|
if parts[0] == "lessthan" {
|
||||||
lt, lte := strconv.Atoi(parts[1])
|
lt, lte := strconv.Atoi(parts[1])
|
||||||
if lte == nil {
|
if lte == nil {
|
||||||
filterPresets["numAccelerators"] = map[string]int{"from": 1, "to": lt}
|
filterPresets["numAccelerators"] = map[string]int{"from": 0, "to": lt}
|
||||||
}
|
}
|
||||||
} else if parts[0] == "morethan" {
|
} else if parts[0] == "morethan" {
|
||||||
mt, mte := strconv.Atoi(parts[1])
|
mt, mte := strconv.Atoi(parts[1])
|
||||||
@@ -408,7 +408,7 @@ func buildFilterPresets(query url.Values) map[string]any {
|
|||||||
if parts[0] == "lessthan" {
|
if parts[0] == "lessthan" {
|
||||||
lt, lte := strconv.Atoi(parts[1])
|
lt, lte := strconv.Atoi(parts[1])
|
||||||
if lte == nil {
|
if lte == nil {
|
||||||
filterPresets["energy"] = map[string]int{"from": 1, "to": lt}
|
filterPresets["energy"] = map[string]int{"from": 0, "to": lt}
|
||||||
}
|
}
|
||||||
} else if parts[0] == "morethan" {
|
} else if parts[0] == "morethan" {
|
||||||
mt, mte := strconv.Atoi(parts[1])
|
mt, mte := strconv.Atoi(parts[1])
|
||||||
@@ -434,7 +434,7 @@ func buildFilterPresets(query url.Values) map[string]any {
|
|||||||
if lte == nil {
|
if lte == nil {
|
||||||
statEntry := map[string]any{
|
statEntry := map[string]any{
|
||||||
"field": parts[0],
|
"field": parts[0],
|
||||||
"from": 1,
|
"from": 0,
|
||||||
"to": lt,
|
"to": lt,
|
||||||
}
|
}
|
||||||
statList = append(statList, statEntry)
|
statList = append(statList, statEntry)
|
||||||
|
|||||||
@@ -363,7 +363,7 @@ func (t *JobClassTagger) Match(job *schema.Job) {
|
|||||||
for _, m := range ri.metrics {
|
for _, m := range ri.metrics {
|
||||||
stats, ok := jobStats[m]
|
stats, ok := jobStats[m]
|
||||||
if !ok {
|
if !ok {
|
||||||
cclog.Errorf("job classification: missing metric '%s' for rule %s on job %d", m, tag, job.JobID)
|
cclog.Debugf("job classification: missing metric '%s' for rule %s on job %d", m, tag, job.JobID)
|
||||||
skipRule = true
|
skipRule = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -388,7 +388,7 @@ func (t *JobClassTagger) Match(job *schema.Job) {
|
|||||||
for _, r := range ri.requirements {
|
for _, r := range ri.requirements {
|
||||||
ok, err := expr.Run(r, env)
|
ok, err := expr.Run(r, env)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("error running requirement for rule %s: %#v", tag, err)
|
cclog.Debugf("error running requirement for rule %s: %#v", tag, err)
|
||||||
requirementsMet = false
|
requirementsMet = false
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -407,7 +407,7 @@ func (t *JobClassTagger) Match(job *schema.Job) {
|
|||||||
for _, v := range ri.variables {
|
for _, v := range ri.variables {
|
||||||
value, err := expr.Run(v.expr, env)
|
value, err := expr.Run(v.expr, env)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("error evaluating variable %s for rule %s: %#v", v.name, tag, err)
|
cclog.Debugf("error evaluating variable %s for rule %s: %#v", v.name, tag, err)
|
||||||
varError = true
|
varError = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -198,36 +198,19 @@ func GetSubCluster(cluster, subcluster string) (*schema.SubCluster, error) {
|
|||||||
func GetMetricConfigSubCluster(cluster, subcluster string) map[string]*schema.Metric {
|
func GetMetricConfigSubCluster(cluster, subcluster string) map[string]*schema.Metric {
|
||||||
metrics := make(map[string]*schema.Metric)
|
metrics := make(map[string]*schema.Metric)
|
||||||
|
|
||||||
for _, c := range Clusters {
|
sc, err := GetSubCluster(cluster, subcluster)
|
||||||
if c.Name == cluster {
|
if err != nil {
|
||||||
for _, m := range c.MetricConfig {
|
return metrics
|
||||||
for _, s := range m.SubClusters {
|
}
|
||||||
if s.Name == subcluster {
|
|
||||||
metrics[m.Name] = &schema.Metric{
|
|
||||||
Name: m.Name,
|
|
||||||
Unit: s.Unit,
|
|
||||||
Peak: s.Peak,
|
|
||||||
Normal: s.Normal,
|
|
||||||
Caution: s.Caution,
|
|
||||||
Alert: s.Alert,
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_, ok := metrics[m.Name]
|
for _, m := range sc.MetricConfig {
|
||||||
if !ok {
|
metrics[m.Name] = &schema.Metric{
|
||||||
metrics[m.Name] = &schema.Metric{
|
Name: m.Name,
|
||||||
Name: m.Name,
|
Unit: m.Unit,
|
||||||
Unit: m.Unit,
|
Peak: m.Peak,
|
||||||
Peak: m.Peak,
|
Normal: m.Normal,
|
||||||
Normal: m.Normal,
|
Caution: m.Caution,
|
||||||
Caution: m.Caution,
|
Alert: m.Alert,
|
||||||
Alert: m.Alert,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -37,3 +37,27 @@ func TestClusterConfig(t *testing.T) {
|
|||||||
// spew.Dump(archive.GlobalMetricList)
|
// spew.Dump(archive.GlobalMetricList)
|
||||||
// t.Fail()
|
// t.Fail()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetMetricConfigSubClusterRespectsRemovedMetrics(t *testing.T) {
|
||||||
|
if err := archive.Init(json.RawMessage(`{"kind": "file","path": "testdata/archive"}`)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sc, err := archive.GetSubCluster("fritz", "spr2tb")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics := archive.GetMetricConfigSubCluster("fritz", "spr2tb")
|
||||||
|
if len(metrics) != len(sc.MetricConfig) {
|
||||||
|
t.Fatalf("GetMetricConfigSubCluster() returned %d metrics, want %d", len(metrics), len(sc.MetricConfig))
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := metrics["flops_any"]; ok {
|
||||||
|
t.Fatalf("GetMetricConfigSubCluster() returned removed metric flops_any for subcluster spr2tb")
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := metrics["cpu_power"]; !ok {
|
||||||
|
t.Fatalf("GetMetricConfigSubCluster() missing active metric cpu_power for subcluster spr2tb")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@@ -22,6 +23,7 @@ import (
|
|||||||
|
|
||||||
func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
|
func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
|
||||||
if Keys.Cleanup.Mode == "archive" {
|
if Keys.Cleanup.Mode == "archive" {
|
||||||
|
cclog.Info("[METRICSTORE]> enable archive cleanup to parquet")
|
||||||
// Run as Archiver
|
// Run as Archiver
|
||||||
cleanUpWorker(wg, ctx,
|
cleanUpWorker(wg, ctx,
|
||||||
Keys.RetentionInMemory,
|
Keys.RetentionInMemory,
|
||||||
@@ -43,7 +45,6 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
// cleanUpWorker takes simple values to configure what it does
|
// cleanUpWorker takes simple values to configure what it does
|
||||||
func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) {
|
func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) {
|
||||||
wg.Go(func() {
|
wg.Go(func() {
|
||||||
|
|
||||||
d, err := time.ParseDuration(interval)
|
d, err := time.ParseDuration(interval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err)
|
cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err)
|
||||||
@@ -52,6 +53,16 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Account for checkpoint span: files named {from}.bin contain data up to
|
||||||
|
// from+checkpointInterval. Subtract the checkpoint interval so we don't
|
||||||
|
// delete files whose data still falls within the retention window.
|
||||||
|
checkpointSpan := 12 * time.Hour
|
||||||
|
if Keys.CheckpointInterval != "" {
|
||||||
|
if parsed, err := time.ParseDuration(Keys.CheckpointInterval); err == nil {
|
||||||
|
checkpointSpan = parsed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ticker := time.NewTicker(d)
|
ticker := time.NewTicker(d)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
@@ -60,7 +71,7 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
t := time.Now().Add(-d)
|
t := time.Now().Add(-d).Add(-checkpointSpan)
|
||||||
cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339))
|
cclog.Infof("[METRICSTORE]> start %s checkpoints (older than %s)...", mode, t.Format(time.RFC3339))
|
||||||
|
|
||||||
n, err := CleanupCheckpoints(Keys.Checkpoints.RootDir, cleanupDir, t.Unix(), delete)
|
n, err := CleanupCheckpoints(Keys.Checkpoints.RootDir, cleanupDir, t.Unix(), delete)
|
||||||
@@ -99,8 +110,8 @@ func deleteCheckpoints(checkpointsDir string, from int64) (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type workItem struct {
|
type workItem struct {
|
||||||
dir string
|
dir string
|
||||||
cluster, host string
|
cluster, host string
|
||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@@ -181,6 +192,7 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
|
|||||||
}
|
}
|
||||||
|
|
||||||
totalFiles := 0
|
totalFiles := 0
|
||||||
|
var clusterErrors []string
|
||||||
|
|
||||||
for _, clusterEntry := range clusterEntries {
|
for _, clusterEntry := range clusterEntries {
|
||||||
if !clusterEntry.IsDir() {
|
if !clusterEntry.IsDir() {
|
||||||
@@ -190,7 +202,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
|
|||||||
cluster := clusterEntry.Name()
|
cluster := clusterEntry.Name()
|
||||||
hostEntries, err := os.ReadDir(filepath.Join(checkpointsDir, cluster))
|
hostEntries, err := os.ReadDir(filepath.Join(checkpointsDir, cluster))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return totalFiles, err
|
cclog.Errorf("[METRICSTORE]> error reading host entries for cluster %s: %s", cluster, err.Error())
|
||||||
|
clusterErrors = append(clusterErrors, cluster)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Workers load checkpoint files from disk; main thread writes to parquet.
|
// Workers load checkpoint files from disk; main thread writes to parquet.
|
||||||
@@ -255,7 +269,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
|
|||||||
// Drain results channel to unblock workers
|
// Drain results channel to unblock workers
|
||||||
for range results {
|
for range results {
|
||||||
}
|
}
|
||||||
return totalFiles, fmt.Errorf("creating parquet writer for cluster %s: %w", cluster, err)
|
cclog.Errorf("[METRICSTORE]> error creating parquet writer for cluster %s: %s", cluster, err.Error())
|
||||||
|
clusterErrors = append(clusterErrors, cluster)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
type deleteItem struct {
|
type deleteItem struct {
|
||||||
@@ -275,6 +291,12 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Flush once per host to keep row group count within parquet limits.
|
||||||
|
if writeErr == nil {
|
||||||
|
if err := writer.FlushRowGroup(); err != nil {
|
||||||
|
writeErr = err
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Always track files for deletion (even if write failed, we still drain)
|
// Always track files for deletion (even if write failed, we still drain)
|
||||||
toDelete = append(toDelete, deleteItem{dir: r.dir, files: r.files})
|
toDelete = append(toDelete, deleteItem{dir: r.dir, files: r.files})
|
||||||
@@ -285,7 +307,10 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
|
|||||||
}
|
}
|
||||||
|
|
||||||
if errs > 0 {
|
if errs > 0 {
|
||||||
return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster)
|
cclog.Errorf("[METRICSTORE]> %d errors reading checkpoints for cluster %s", errs, cluster)
|
||||||
|
clusterErrors = append(clusterErrors, cluster)
|
||||||
|
os.Remove(parquetFile)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if writer.count == 0 {
|
if writer.count == 0 {
|
||||||
@@ -296,7 +321,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
|
|||||||
|
|
||||||
if writeErr != nil {
|
if writeErr != nil {
|
||||||
os.Remove(parquetFile)
|
os.Remove(parquetFile)
|
||||||
return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, writeErr)
|
cclog.Errorf("[METRICSTORE]> error writing parquet archive for cluster %s: %s", cluster, writeErr.Error())
|
||||||
|
clusterErrors = append(clusterErrors, cluster)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete archived checkpoint files
|
// Delete archived checkpoint files
|
||||||
@@ -316,5 +343,10 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
|
|||||||
}
|
}
|
||||||
|
|
||||||
cclog.Infof("[METRICSTORE]> archiving checkpoints completed in %s (%d files)", time.Since(startTime).Round(time.Millisecond), totalFiles)
|
cclog.Infof("[METRICSTORE]> archiving checkpoints completed in %s (%d files)", time.Since(startTime).Round(time.Millisecond), totalFiles)
|
||||||
|
|
||||||
|
if len(clusterErrors) > 0 {
|
||||||
|
return totalFiles, fmt.Errorf("archiving failed for clusters: %s", strings.Join(clusterErrors, ", "))
|
||||||
|
}
|
||||||
|
|
||||||
return totalFiles, nil
|
return totalFiles, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -146,7 +146,9 @@ var (
|
|||||||
|
|
||||||
// ErrDataDoesNotAlign indicates that aggregated data from child scopes
|
// ErrDataDoesNotAlign indicates that aggregated data from child scopes
|
||||||
// does not align with the parent scope's expected timestamps/intervals.
|
// does not align with the parent scope's expected timestamps/intervals.
|
||||||
ErrDataDoesNotAlign error = errors.New("[METRICSTORE]> data from lower granularities does not align")
|
ErrDataDoesNotAlignMissingFront error = errors.New("[METRICSTORE]> data from lower granularities does not align (missing data prior to start of the buffers)")
|
||||||
|
ErrDataDoesNotAlignMissingBack error = errors.New("[METRICSTORE]> data from lower granularities does not align (missing data after the end of the buffers)")
|
||||||
|
ErrDataDoesNotAlignDataLenMismatch error = errors.New("[METRICSTORE]> data from lower granularities does not align (collected data length is different than expected data length)")
|
||||||
)
|
)
|
||||||
|
|
||||||
// buffer stores time-series data for a single metric at a specific hierarchical level.
|
// buffer stores time-series data for a single metric at a specific hierarchical level.
|
||||||
|
|||||||
@@ -86,14 +86,16 @@ var (
|
|||||||
|
|
||||||
// Checkpointing starts a background worker that periodically saves metric data to disk.
|
// Checkpointing starts a background worker that periodically saves metric data to disk.
|
||||||
//
|
//
|
||||||
// Checkpoints are written every 12 hours (hardcoded).
|
// restoreFrom is the earliest timestamp of data loaded from checkpoint files at startup.
|
||||||
|
// The first periodic checkpoint after restart will cover [restoreFrom, now], ensuring that
|
||||||
|
// loaded data is re-persisted before old checkpoint files are cleaned up.
|
||||||
//
|
//
|
||||||
// Format behaviour:
|
// Format behaviour:
|
||||||
// - "json": Periodic checkpointing every checkpointInterval
|
// - "json": Periodic checkpointing every checkpointInterval
|
||||||
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
|
// - "wal": Periodic binary snapshots + WAL rotation every checkpointInterval
|
||||||
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
func Checkpointing(wg *sync.WaitGroup, ctx context.Context, restoreFrom time.Time) {
|
||||||
lastCheckpointMu.Lock()
|
lastCheckpointMu.Lock()
|
||||||
lastCheckpoint = time.Now()
|
lastCheckpoint = restoreFrom
|
||||||
lastCheckpointMu.Unlock()
|
lastCheckpointMu.Unlock()
|
||||||
|
|
||||||
ms := GetMemoryStore()
|
ms := GetMemoryStore()
|
||||||
@@ -337,25 +339,35 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
|
|||||||
return ErrNoNewArchiveData
|
return ErrNoNewArchiveData
|
||||||
}
|
}
|
||||||
|
|
||||||
filepath := path.Join(dir, fmt.Sprintf("%d.json", from))
|
finalPath := path.Join(dir, fmt.Sprintf("%d.json", from))
|
||||||
f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
|
tmpPath := finalPath + ".tmp"
|
||||||
|
|
||||||
|
f, err := os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
|
||||||
if err != nil && os.IsNotExist(err) {
|
if err != nil && os.IsNotExist(err) {
|
||||||
err = os.MkdirAll(dir, CheckpointDirPerms)
|
err = os.MkdirAll(dir, CheckpointDirPerms)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
|
f, err = os.OpenFile(tmpPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
bw := bufio.NewWriter(f)
|
bw := bufio.NewWriter(f)
|
||||||
if err = json.NewEncoder(bw).Encode(cf); err != nil {
|
if err = json.NewEncoder(bw).Encode(cf); err != nil {
|
||||||
|
f.Close()
|
||||||
|
os.Remove(tmpPath)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return bw.Flush()
|
if err = bw.Flush(); err != nil {
|
||||||
|
f.Close()
|
||||||
|
os.Remove(tmpPath)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
f.Close()
|
||||||
|
|
||||||
|
return os.Rename(tmpPath, finalPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs.
|
// enqueueCheckpointHosts traverses checkpoint directory and enqueues cluster/host pairs.
|
||||||
@@ -470,7 +482,7 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
|
|||||||
data: metric.Data[0:n:n],
|
data: metric.Data[0:n:n],
|
||||||
prev: nil,
|
prev: nil,
|
||||||
next: nil,
|
next: nil,
|
||||||
archived: true,
|
archived: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
minfo, ok := m.Metrics[name]
|
minfo, ok := m.Metrics[name]
|
||||||
|
|||||||
@@ -170,7 +170,7 @@ func Init(rawConfig json.RawMessage, metrics map[string]MetricConfig, wg *sync.W
|
|||||||
ctx, shutdown := context.WithCancel(context.Background())
|
ctx, shutdown := context.WithCancel(context.Background())
|
||||||
|
|
||||||
Retention(wg, ctx)
|
Retention(wg, ctx)
|
||||||
Checkpointing(wg, ctx)
|
Checkpointing(wg, ctx, restoreFrom)
|
||||||
CleanUp(wg, ctx)
|
CleanUp(wg, ctx)
|
||||||
WALStaging(wg, ctx)
|
WALStaging(wg, ctx)
|
||||||
MemoryUsageTracker(wg, ctx)
|
MemoryUsageTracker(wg, ctx)
|
||||||
@@ -271,19 +271,32 @@ func (ms *MemoryStore) SetNodeProvider(provider NodeProvider) {
|
|||||||
//
|
//
|
||||||
// Note: This function blocks until the final checkpoint is written.
|
// Note: This function blocks until the final checkpoint is written.
|
||||||
func Shutdown() {
|
func Shutdown() {
|
||||||
|
totalStart := time.Now()
|
||||||
|
|
||||||
shutdownFuncMu.Lock()
|
shutdownFuncMu.Lock()
|
||||||
defer shutdownFuncMu.Unlock()
|
defer shutdownFuncMu.Unlock()
|
||||||
if shutdownFunc != nil {
|
if shutdownFunc != nil {
|
||||||
shutdownFunc()
|
shutdownFunc()
|
||||||
}
|
}
|
||||||
|
cclog.Infof("[METRICSTORE]> Background workers cancelled (%v)", time.Since(totalStart))
|
||||||
|
|
||||||
if Keys.Checkpoints.FileFormat == "wal" {
|
if Keys.Checkpoints.FileFormat == "wal" {
|
||||||
|
// Signal producers to stop sending before closing channels,
|
||||||
|
// preventing send-on-closed-channel panics from in-flight NATS workers.
|
||||||
|
walShuttingDown.Store(true)
|
||||||
|
// Brief grace period for in-flight DecodeLine calls to complete.
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
for _, ch := range walShardChs {
|
for _, ch := range walShardChs {
|
||||||
close(ch)
|
close(ch)
|
||||||
}
|
}
|
||||||
|
drainStart := time.Now()
|
||||||
|
WaitForWALStagingDrain()
|
||||||
|
cclog.Infof("[METRICSTORE]> WAL staging goroutines exited (%v)", time.Since(drainStart))
|
||||||
}
|
}
|
||||||
|
|
||||||
cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir)
|
cclog.Infof("[METRICSTORE]> Writing checkpoint to '%s'...", Keys.Checkpoints.RootDir)
|
||||||
|
checkpointStart := time.Now()
|
||||||
var files int
|
var files int
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
@@ -294,19 +307,16 @@ func Shutdown() {
|
|||||||
lastCheckpointMu.Unlock()
|
lastCheckpointMu.Unlock()
|
||||||
|
|
||||||
if Keys.Checkpoints.FileFormat == "wal" {
|
if Keys.Checkpoints.FileFormat == "wal" {
|
||||||
var hostDirs []string
|
// WAL files are deleted per-host inside ToCheckpointWAL workers.
|
||||||
files, hostDirs, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
|
files, _, err = ms.ToCheckpointWAL(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
|
||||||
if err == nil {
|
|
||||||
RotateWALFilesAfterShutdown(hostDirs)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
|
files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, from.Unix(), time.Now().Unix())
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s\n", err.Error())
|
cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s", err.Error())
|
||||||
}
|
}
|
||||||
cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files)
|
cclog.Infof("[METRICSTORE]> Done! (%d files written in %v, total shutdown: %v)", files, time.Since(checkpointStart), time.Since(totalStart))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retention starts a background goroutine that periodically frees old metric data.
|
// Retention starts a background goroutine that periodically frees old metric data.
|
||||||
@@ -702,16 +712,16 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso
|
|||||||
} else if from != cfrom || to != cto || len(data) != len(cdata) {
|
} else if from != cfrom || to != cto || len(data) != len(cdata) {
|
||||||
missingfront, missingback := int((from-cfrom)/minfo.Frequency), int((to-cto)/minfo.Frequency)
|
missingfront, missingback := int((from-cfrom)/minfo.Frequency), int((to-cto)/minfo.Frequency)
|
||||||
if missingfront != 0 {
|
if missingfront != 0 {
|
||||||
return ErrDataDoesNotAlign
|
return ErrDataDoesNotAlignMissingFront
|
||||||
}
|
}
|
||||||
|
|
||||||
newlen := len(cdata) - missingback
|
newlen := len(cdata) - missingback
|
||||||
if newlen < 1 {
|
if newlen < 1 {
|
||||||
return ErrDataDoesNotAlign
|
return ErrDataDoesNotAlignMissingBack
|
||||||
}
|
}
|
||||||
cdata = cdata[0:newlen]
|
cdata = cdata[0:newlen]
|
||||||
if len(cdata) != len(data) {
|
if len(cdata) != len(data) {
|
||||||
return ErrDataDoesNotAlign
|
return ErrDataDoesNotAlignDataLenMismatch
|
||||||
}
|
}
|
||||||
|
|
||||||
from, to = cfrom, cto
|
from, to = cfrom, cto
|
||||||
|
|||||||
@@ -99,7 +99,7 @@ func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) {
|
|||||||
|
|
||||||
// WriteCheckpointFile streams a CheckpointFile tree directly to Parquet rows,
|
// WriteCheckpointFile streams a CheckpointFile tree directly to Parquet rows,
|
||||||
// writing metrics in sorted order without materializing all rows in memory.
|
// writing metrics in sorted order without materializing all rows in memory.
|
||||||
// Produces one row group per call (typically one host's data).
|
// Call FlushRowGroup() after writing all checkpoint files for a host.
|
||||||
func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string) error {
|
func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string) error {
|
||||||
w.writeLevel(cf, cluster, hostname, scope, scopeID)
|
w.writeLevel(cf, cluster, hostname, scope, scopeID)
|
||||||
|
|
||||||
@@ -112,10 +112,15 @@ func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster,
|
|||||||
w.batch = w.batch[:0]
|
w.batch = w.batch[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FlushRowGroup flushes the current row group to the Parquet file.
|
||||||
|
// Should be called once per host after all checkpoint files for that host are written.
|
||||||
|
func (w *parquetArchiveWriter) FlushRowGroup() error {
|
||||||
if err := w.writer.Flush(); err != nil {
|
if err := w.writer.Flush(); err != nil {
|
||||||
return fmt.Errorf("flushing parquet row group: %w", err)
|
return fmt.Errorf("flushing parquet row group: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -91,8 +91,10 @@ func (m *MemoryStore) Stats(selector util.Selector, metric string, from, to int6
|
|||||||
|
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
from, to = cfrom, cto
|
from, to = cfrom, cto
|
||||||
} else if from != cfrom || to != cto {
|
} else if from != cfrom {
|
||||||
return ErrDataDoesNotAlign
|
return ErrDataDoesNotAlignMissingFront
|
||||||
|
} else if to != cto {
|
||||||
|
return ErrDataDoesNotAlignMissingBack
|
||||||
}
|
}
|
||||||
|
|
||||||
samples += stats.Samples
|
samples += stats.Samples
|
||||||
|
|||||||
@@ -69,6 +69,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||||
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
||||||
@@ -91,6 +92,13 @@ var walShardRotateChs []chan walRotateReq
|
|||||||
// walNumShards stores the number of shards (set during WALStaging init).
|
// walNumShards stores the number of shards (set during WALStaging init).
|
||||||
var walNumShards int
|
var walNumShards int
|
||||||
|
|
||||||
|
// walStagingWg tracks WALStaging goroutine exits for shutdown synchronization.
|
||||||
|
var walStagingWg sync.WaitGroup
|
||||||
|
|
||||||
|
// walShuttingDown is set before closing shard channels to prevent
|
||||||
|
// SendWALMessage from sending on a closed channel (which panics in Go).
|
||||||
|
var walShuttingDown atomic.Bool
|
||||||
|
|
||||||
// WALMessage represents a single metric write to be appended to the WAL.
|
// WALMessage represents a single metric write to be appended to the WAL.
|
||||||
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
|
// Cluster and Node are NOT stored in the WAL record (inferred from file path).
|
||||||
type WALMessage struct {
|
type WALMessage struct {
|
||||||
@@ -111,10 +119,16 @@ type walRotateReq struct {
|
|||||||
|
|
||||||
// walFileState holds an open WAL file handle and buffered writer for one host directory.
|
// walFileState holds an open WAL file handle and buffered writer for one host directory.
|
||||||
type walFileState struct {
|
type walFileState struct {
|
||||||
f *os.File
|
f *os.File
|
||||||
w *bufio.Writer
|
w *bufio.Writer
|
||||||
|
dirty bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// walFlushInterval controls how often dirty WAL files are flushed to disk.
|
||||||
|
// Decoupling flushes from message processing lets the consumer run at memory
|
||||||
|
// speed, amortizing syscall overhead across many writes.
|
||||||
|
const walFlushInterval = 5 * time.Second
|
||||||
|
|
||||||
// walShardIndex computes which shard a message belongs to based on cluster+node.
|
// walShardIndex computes which shard a message belongs to based on cluster+node.
|
||||||
// Uses FNV-1a hash for fast, well-distributed mapping.
|
// Uses FNV-1a hash for fast, well-distributed mapping.
|
||||||
func walShardIndex(cluster, node string) int {
|
func walShardIndex(cluster, node string) int {
|
||||||
@@ -126,9 +140,9 @@ func walShardIndex(cluster, node string) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SendWALMessage routes a WAL message to the appropriate shard channel.
|
// SendWALMessage routes a WAL message to the appropriate shard channel.
|
||||||
// Returns false if the channel is full (message dropped).
|
// Returns false if the channel is full or shutdown is in progress.
|
||||||
func SendWALMessage(msg *WALMessage) bool {
|
func SendWALMessage(msg *WALMessage) bool {
|
||||||
if walShardChs == nil {
|
if walShardChs == nil || walShuttingDown.Load() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
shard := walShardIndex(msg.Cluster, msg.Node)
|
shard := walShardIndex(msg.Cluster, msg.Node)
|
||||||
@@ -164,7 +178,9 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
msgCh := walShardChs[i]
|
msgCh := walShardChs[i]
|
||||||
rotateCh := walShardRotateChs[i]
|
rotateCh := walShardRotateChs[i]
|
||||||
|
|
||||||
|
walStagingWg.Add(1)
|
||||||
wg.Go(func() {
|
wg.Go(func() {
|
||||||
|
defer walStagingWg.Done()
|
||||||
hostFiles := make(map[string]*walFileState)
|
hostFiles := make(map[string]*walFileState)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -222,6 +238,7 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
if err := writeWALRecordDirect(ws.w, msg); err != nil {
|
if err := writeWALRecordDirect(ws.w, msg); err != nil {
|
||||||
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
|
cclog.Errorf("[METRICSTORE]> WAL: write record: %v", err)
|
||||||
}
|
}
|
||||||
|
ws.dirty = true
|
||||||
}
|
}
|
||||||
|
|
||||||
processRotate := func(req walRotateReq) {
|
processRotate := func(req walRotateReq) {
|
||||||
@@ -238,58 +255,57 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
close(req.done)
|
close(req.done)
|
||||||
}
|
}
|
||||||
|
|
||||||
flushAll := func() {
|
flushDirty := func() {
|
||||||
for _, ws := range hostFiles {
|
for _, ws := range hostFiles {
|
||||||
if ws.f != nil {
|
if ws.dirty {
|
||||||
ws.w.Flush()
|
ws.w.Flush()
|
||||||
|
ws.dirty = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
drain := func() {
|
ticker := time.NewTicker(walFlushInterval)
|
||||||
for {
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
// drainBatch processes up to 4096 pending messages without blocking.
|
||||||
|
// Returns false if the channel was closed.
|
||||||
|
drainBatch := func() bool {
|
||||||
|
for range 4096 {
|
||||||
select {
|
select {
|
||||||
case msg, ok := <-msgCh:
|
case msg, ok := <-msgCh:
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
flushDirty()
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
processMsg(msg)
|
processMsg(msg)
|
||||||
case req := <-rotateCh:
|
case req := <-rotateCh:
|
||||||
processRotate(req)
|
processRotate(req)
|
||||||
default:
|
default:
|
||||||
flushAll()
|
return true
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
drain()
|
// On shutdown, skip draining buffered messages — a full binary
|
||||||
|
// checkpoint will be written from in-memory state, making
|
||||||
|
// buffered WAL records redundant.
|
||||||
|
flushDirty()
|
||||||
return
|
return
|
||||||
case msg, ok := <-msgCh:
|
case msg, ok := <-msgCh:
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
processMsg(msg)
|
processMsg(msg)
|
||||||
|
if !drainBatch() {
|
||||||
// Drain up to 256 more messages without blocking to batch writes.
|
return
|
||||||
for range 256 {
|
|
||||||
select {
|
|
||||||
case msg, ok := <-msgCh:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
processMsg(msg)
|
|
||||||
case req := <-rotateCh:
|
|
||||||
processRotate(req)
|
|
||||||
default:
|
|
||||||
goto flushed
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
flushed:
|
// No flush here — timer handles periodic flushing.
|
||||||
flushAll()
|
case <-ticker.C:
|
||||||
|
flushDirty()
|
||||||
case req := <-rotateCh:
|
case req := <-rotateCh:
|
||||||
processRotate(req)
|
processRotate(req)
|
||||||
}
|
}
|
||||||
@@ -298,23 +314,42 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WaitForWALStagingDrain blocks until all WALStaging goroutines have exited.
|
||||||
|
// Must be called after closing walShardChs to ensure all file handles are
|
||||||
|
// flushed and closed before checkpoint writes begin.
|
||||||
|
func WaitForWALStagingDrain() {
|
||||||
|
walStagingWg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
// RotateWALFiles sends rotation requests for the given host directories
|
// RotateWALFiles sends rotation requests for the given host directories
|
||||||
// and blocks until all rotations complete. Each request is routed to the
|
// and blocks until all rotations complete. Each request is routed to the
|
||||||
// shard that owns the host directory.
|
// shard that owns the host directory.
|
||||||
|
//
|
||||||
|
// If shutdown is in progress (WAL staging goroutines may have exited),
|
||||||
|
// rotation is skipped to avoid deadlocking on abandoned channels.
|
||||||
func RotateWALFiles(hostDirs []string) {
|
func RotateWALFiles(hostDirs []string) {
|
||||||
if walShardRotateChs == nil {
|
if walShardRotateChs == nil || walShuttingDown.Load() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
dones := make([]chan struct{}, len(hostDirs))
|
dones := make([]chan struct{}, 0, len(hostDirs))
|
||||||
for i, dir := range hostDirs {
|
for _, dir := range hostDirs {
|
||||||
dones[i] = make(chan struct{})
|
done := make(chan struct{})
|
||||||
// Extract cluster/node from hostDir to find the right shard.
|
|
||||||
// hostDir = rootDir/cluster/node
|
|
||||||
shard := walShardIndexFromDir(dir)
|
shard := walShardIndexFromDir(dir)
|
||||||
walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: dones[i]}
|
select {
|
||||||
|
case walShardRotateChs[shard] <- walRotateReq{hostDir: dir, done: done}:
|
||||||
|
dones = append(dones, done)
|
||||||
|
default:
|
||||||
|
// Channel full or goroutine not consuming — skip this host.
|
||||||
|
cclog.Warnf("[METRICSTORE]> WAL rotation skipped for %s (channel full)", dir)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for _, done := range dones {
|
for _, done := range dones {
|
||||||
<-done
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(30 * time.Second):
|
||||||
|
cclog.Warn("[METRICSTORE]> WAL rotation timed out, continuing")
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -338,141 +373,64 @@ func RotateWALFilesAfterShutdown(hostDirs []string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeWALRecordDirect encodes a WAL record directly into the bufio.Writer,
|
// writeWALRecordDirect encodes a WAL record into a contiguous buffer first,
|
||||||
// avoiding heap allocations by using a stack-allocated scratch buffer for
|
// then writes it to the bufio.Writer in a single call. This prevents partial
|
||||||
// the fixed-size header/trailer and computing CRC inline.
|
// records in the write buffer if a write error occurs mid-record (e.g. disk full).
|
||||||
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
|
func writeWALRecordDirect(w *bufio.Writer, msg *WALMessage) error {
|
||||||
// Compute payload size.
|
// Compute payload and total record size.
|
||||||
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
|
payloadSize := 8 + 2 + len(msg.MetricName) + 1 + 4
|
||||||
for _, s := range msg.Selector {
|
for _, s := range msg.Selector {
|
||||||
payloadSize += 1 + len(s)
|
payloadSize += 1 + len(s)
|
||||||
}
|
}
|
||||||
|
// Total: 8 (header) + payload + 4 (CRC).
|
||||||
|
totalSize := 8 + payloadSize + 4
|
||||||
|
|
||||||
// Write magic + payload length (8 bytes header).
|
// Use stack buffer for typical small records, heap-allocate only for large ones.
|
||||||
var hdr [8]byte
|
var stackBuf [256]byte
|
||||||
binary.LittleEndian.PutUint32(hdr[0:4], walRecordMagic)
|
var buf []byte
|
||||||
binary.LittleEndian.PutUint32(hdr[4:8], uint32(payloadSize))
|
if totalSize <= len(stackBuf) {
|
||||||
if _, err := w.Write(hdr[:]); err != nil {
|
buf = stackBuf[:totalSize]
|
||||||
return err
|
} else {
|
||||||
|
buf = make([]byte, totalSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
// We need to compute CRC over the payload as we write it.
|
// Header: magic + payload length.
|
||||||
crc := crc32.NewIEEE()
|
binary.LittleEndian.PutUint32(buf[0:4], walRecordMagic)
|
||||||
|
binary.LittleEndian.PutUint32(buf[4:8], uint32(payloadSize))
|
||||||
|
|
||||||
|
// Payload starts at offset 8.
|
||||||
|
p := 8
|
||||||
|
|
||||||
// Timestamp (8 bytes).
|
// Timestamp (8 bytes).
|
||||||
var scratch [8]byte
|
binary.LittleEndian.PutUint64(buf[p:p+8], uint64(msg.Timestamp))
|
||||||
binary.LittleEndian.PutUint64(scratch[:8], uint64(msg.Timestamp))
|
p += 8
|
||||||
crc.Write(scratch[:8])
|
|
||||||
if _, err := w.Write(scratch[:8]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Metric name length (2 bytes) + metric name.
|
// Metric name length (2 bytes) + metric name.
|
||||||
binary.LittleEndian.PutUint16(scratch[:2], uint16(len(msg.MetricName)))
|
binary.LittleEndian.PutUint16(buf[p:p+2], uint16(len(msg.MetricName)))
|
||||||
crc.Write(scratch[:2])
|
p += 2
|
||||||
if _, err := w.Write(scratch[:2]); err != nil {
|
p += copy(buf[p:], msg.MetricName)
|
||||||
return err
|
|
||||||
}
|
|
||||||
nameBytes := []byte(msg.MetricName)
|
|
||||||
crc.Write(nameBytes)
|
|
||||||
if _, err := w.Write(nameBytes); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Selector count (1 byte).
|
// Selector count (1 byte).
|
||||||
scratch[0] = byte(len(msg.Selector))
|
buf[p] = byte(len(msg.Selector))
|
||||||
crc.Write(scratch[:1])
|
p++
|
||||||
if _, err := w.Write(scratch[:1]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Selectors (1-byte length + bytes each).
|
// Selectors (1-byte length + bytes each).
|
||||||
for _, sel := range msg.Selector {
|
for _, sel := range msg.Selector {
|
||||||
scratch[0] = byte(len(sel))
|
buf[p] = byte(len(sel))
|
||||||
crc.Write(scratch[:1])
|
p++
|
||||||
if _, err := w.Write(scratch[:1]); err != nil {
|
p += copy(buf[p:], sel)
|
||||||
return err
|
|
||||||
}
|
|
||||||
selBytes := []byte(sel)
|
|
||||||
crc.Write(selBytes)
|
|
||||||
if _, err := w.Write(selBytes); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Value (4 bytes, float32 bits).
|
// Value (4 bytes, float32 bits).
|
||||||
binary.LittleEndian.PutUint32(scratch[:4], math.Float32bits(float32(msg.Value)))
|
binary.LittleEndian.PutUint32(buf[p:p+4], math.Float32bits(float32(msg.Value)))
|
||||||
crc.Write(scratch[:4])
|
p += 4
|
||||||
if _, err := w.Write(scratch[:4]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// CRC32 (4 bytes).
|
// CRC32 over payload (bytes 8..8+payloadSize).
|
||||||
binary.LittleEndian.PutUint32(scratch[:4], crc.Sum32())
|
crc := crc32.ChecksumIEEE(buf[8 : 8+payloadSize])
|
||||||
_, err := w.Write(scratch[:4])
|
binary.LittleEndian.PutUint32(buf[p:p+4], crc)
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// buildWALPayload encodes a WALMessage into a binary payload (without magic/length/CRC).
|
// Single atomic write to the buffered writer.
|
||||||
func buildWALPayload(msg *WALMessage) []byte {
|
_, err := w.Write(buf)
|
||||||
size := 8 + 2 + len(msg.MetricName) + 1 + 4
|
|
||||||
for _, s := range msg.Selector {
|
|
||||||
size += 1 + len(s)
|
|
||||||
}
|
|
||||||
|
|
||||||
buf := make([]byte, 0, size)
|
|
||||||
|
|
||||||
// Timestamp (8 bytes, little-endian int64)
|
|
||||||
var ts [8]byte
|
|
||||||
binary.LittleEndian.PutUint64(ts[:], uint64(msg.Timestamp))
|
|
||||||
buf = append(buf, ts[:]...)
|
|
||||||
|
|
||||||
// Metric name (2-byte length prefix + bytes)
|
|
||||||
var mLen [2]byte
|
|
||||||
binary.LittleEndian.PutUint16(mLen[:], uint16(len(msg.MetricName)))
|
|
||||||
buf = append(buf, mLen[:]...)
|
|
||||||
buf = append(buf, msg.MetricName...)
|
|
||||||
|
|
||||||
// Selector count (1 byte)
|
|
||||||
buf = append(buf, byte(len(msg.Selector)))
|
|
||||||
|
|
||||||
// Selectors (1-byte length prefix + bytes each)
|
|
||||||
for _, sel := range msg.Selector {
|
|
||||||
buf = append(buf, byte(len(sel)))
|
|
||||||
buf = append(buf, sel...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Value (4 bytes, float32 bit representation)
|
|
||||||
var val [4]byte
|
|
||||||
binary.LittleEndian.PutUint32(val[:], math.Float32bits(float32(msg.Value)))
|
|
||||||
buf = append(buf, val[:]...)
|
|
||||||
|
|
||||||
return buf
|
|
||||||
}
|
|
||||||
|
|
||||||
// writeWALRecord appends a binary WAL record to the writer.
|
|
||||||
// Format: [4B magic][4B payload_len][payload][4B CRC32]
|
|
||||||
func writeWALRecord(w io.Writer, msg *WALMessage) error {
|
|
||||||
payload := buildWALPayload(msg)
|
|
||||||
crc := crc32.ChecksumIEEE(payload)
|
|
||||||
|
|
||||||
record := make([]byte, 0, 4+4+len(payload)+4)
|
|
||||||
|
|
||||||
var magic [4]byte
|
|
||||||
binary.LittleEndian.PutUint32(magic[:], walRecordMagic)
|
|
||||||
record = append(record, magic[:]...)
|
|
||||||
|
|
||||||
var pLen [4]byte
|
|
||||||
binary.LittleEndian.PutUint32(pLen[:], uint32(len(payload)))
|
|
||||||
record = append(record, pLen[:]...)
|
|
||||||
|
|
||||||
record = append(record, payload...)
|
|
||||||
|
|
||||||
var crcBytes [4]byte
|
|
||||||
binary.LittleEndian.PutUint32(crcBytes[:], crc)
|
|
||||||
record = append(record, crcBytes[:]...)
|
|
||||||
|
|
||||||
_, err := w.Write(record)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -697,7 +655,10 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
|
|||||||
selector []string
|
selector []string
|
||||||
}
|
}
|
||||||
|
|
||||||
n, errs := int32(0), int32(0)
|
totalWork := len(levels)
|
||||||
|
cclog.Infof("[METRICSTORE]> Starting binary checkpoint for %d hosts with %d workers", totalWork, Keys.NumWorkers)
|
||||||
|
|
||||||
|
n, errs, completed := int32(0), int32(0), int32(0)
|
||||||
var successDirs []string
|
var successDirs []string
|
||||||
var successMu sync.Mutex
|
var successMu sync.Mutex
|
||||||
|
|
||||||
@@ -705,6 +666,22 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
|
|||||||
wg.Add(Keys.NumWorkers)
|
wg.Add(Keys.NumWorkers)
|
||||||
work := make(chan workItem, Keys.NumWorkers*2)
|
work := make(chan workItem, Keys.NumWorkers*2)
|
||||||
|
|
||||||
|
// Progress logging goroutine.
|
||||||
|
stopProgress := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(10 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
cclog.Infof("[METRICSTORE]> Checkpoint progress: %d/%d hosts (%d written, %d errors)",
|
||||||
|
atomic.LoadInt32(&completed), totalWork, atomic.LoadInt32(&n), atomic.LoadInt32(&errs))
|
||||||
|
case <-stopProgress:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for range Keys.NumWorkers {
|
for range Keys.NumWorkers {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
@@ -712,16 +689,23 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
|
|||||||
err := wi.level.toCheckpointBinary(wi.hostDir, from, to, m)
|
err := wi.level.toCheckpointBinary(wi.hostDir, from, to, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == ErrNoNewArchiveData {
|
if err == ErrNoNewArchiveData {
|
||||||
|
atomic.AddInt32(&completed, 1)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
cclog.Errorf("[METRICSTORE]> binary checkpoint error for %s: %v", wi.hostDir, err)
|
cclog.Errorf("[METRICSTORE]> binary checkpoint error for %s: %v", wi.hostDir, err)
|
||||||
atomic.AddInt32(&errs, 1)
|
atomic.AddInt32(&errs, 1)
|
||||||
} else {
|
} else {
|
||||||
atomic.AddInt32(&n, 1)
|
atomic.AddInt32(&n, 1)
|
||||||
|
// Delete WAL immediately after successful snapshot.
|
||||||
|
walPath := path.Join(wi.hostDir, "current.wal")
|
||||||
|
if err := os.Remove(walPath); err != nil && !os.IsNotExist(err) {
|
||||||
|
cclog.Errorf("[METRICSTORE]> WAL remove %s: %v", walPath, err)
|
||||||
|
}
|
||||||
successMu.Lock()
|
successMu.Lock()
|
||||||
successDirs = append(successDirs, wi.hostDir)
|
successDirs = append(successDirs, wi.hostDir)
|
||||||
successMu.Unlock()
|
successMu.Unlock()
|
||||||
}
|
}
|
||||||
|
atomic.AddInt32(&completed, 1)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@@ -736,6 +720,7 @@ func (m *MemoryStore) ToCheckpointWAL(dir string, from, to int64) (int, []string
|
|||||||
}
|
}
|
||||||
close(work)
|
close(work)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
close(stopProgress)
|
||||||
|
|
||||||
if errs > 0 {
|
if errs > 0 {
|
||||||
return int(n), successDirs, fmt.Errorf("[METRICSTORE]> %d errors during binary checkpoint (%d successes)", errs, n)
|
return int(n), successDirs, fmt.Errorf("[METRICSTORE]> %d errors during binary checkpoint (%d successes)", errs, n)
|
||||||
|
|||||||
381
tools/binaryCheckpointReader/binaryCheckpointReader.go
Normal file
381
tools/binaryCheckpointReader/binaryCheckpointReader.go
Normal file
@@ -0,0 +1,381 @@
|
|||||||
|
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved. This file is part of cc-backend.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
// binaryCheckpointReader reads .wal or .bin checkpoint files produced by the
|
||||||
|
// metricstore WAL/snapshot system and dumps their contents to a human-readable
|
||||||
|
// .txt file (same name as input, with .txt extension).
|
||||||
|
//
|
||||||
|
// Usage:
|
||||||
|
//
|
||||||
|
// go run ./tools/binaryCheckpointReader <file.wal|file.bin>
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"hash/crc32"
|
||||||
|
"io"
|
||||||
|
"math"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Magic numbers matching metricstore/walCheckpoint.go.
|
||||||
|
const (
|
||||||
|
walFileMagic = uint32(0xCC1DA701)
|
||||||
|
walRecordMagic = uint32(0xCC1DA7A1)
|
||||||
|
snapFileMagic = uint32(0xCC5B0001)
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
if len(os.Args) != 2 {
|
||||||
|
fmt.Fprintf(os.Stderr, "Usage: %s <file.wal|file.bin>\n", os.Args[0])
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
inputPath := os.Args[1]
|
||||||
|
ext := strings.ToLower(filepath.Ext(inputPath))
|
||||||
|
|
||||||
|
if ext != ".wal" && ext != ".bin" {
|
||||||
|
fmt.Fprintf(os.Stderr, "Error: file must have .wal or .bin extension, got %q\n", ext)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := os.Open(inputPath)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Error opening %s: %v\n", inputPath, err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
// Output file: replace extension with .txt
|
||||||
|
outputPath := strings.TrimSuffix(inputPath, filepath.Ext(inputPath)) + ".txt"
|
||||||
|
out, err := os.Create(outputPath)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Error creating output %s: %v\n", outputPath, err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
defer out.Close()
|
||||||
|
|
||||||
|
w := bufio.NewWriter(out)
|
||||||
|
defer w.Flush()
|
||||||
|
|
||||||
|
switch ext {
|
||||||
|
case ".wal":
|
||||||
|
err = dumpWAL(f, w)
|
||||||
|
case ".bin":
|
||||||
|
err = dumpBinarySnapshot(f, w)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Error reading %s: %v\n", inputPath, err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Flush()
|
||||||
|
fmt.Printf("Output written to %s\n", outputPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------- WAL reader ----------
|
||||||
|
|
||||||
|
func dumpWAL(f *os.File, w *bufio.Writer) error {
|
||||||
|
br := bufio.NewReader(f)
|
||||||
|
|
||||||
|
// Read and verify file header magic.
|
||||||
|
var fileMagic uint32
|
||||||
|
if err := binary.Read(br, binary.LittleEndian, &fileMagic); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
fmt.Fprintln(w, "WAL file is empty (0 bytes).")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("read file header: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if fileMagic != walFileMagic {
|
||||||
|
return fmt.Errorf("invalid WAL file magic 0x%08X (expected 0x%08X)", fileMagic, walFileMagic)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(w, "=== WAL File Dump ===\n")
|
||||||
|
fmt.Fprintf(w, "File: %s\n", f.Name())
|
||||||
|
fmt.Fprintf(w, "File Magic: 0x%08X (valid)\n\n", fileMagic)
|
||||||
|
|
||||||
|
recordNum := 0
|
||||||
|
for {
|
||||||
|
msg, err := readWALRecord(br)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(w, "--- Record #%d: ERROR ---\n", recordNum+1)
|
||||||
|
fmt.Fprintf(w, " Error: %v\n", err)
|
||||||
|
fmt.Fprintf(w, " (stopping replay — likely truncated trailing record)\n\n")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if msg == nil {
|
||||||
|
break // Clean EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
recordNum++
|
||||||
|
ts := time.Unix(msg.Timestamp, 0).UTC()
|
||||||
|
|
||||||
|
fmt.Fprintf(w, "--- Record #%d ---\n", recordNum)
|
||||||
|
fmt.Fprintf(w, " Timestamp: %d (%s)\n", msg.Timestamp, ts.Format(time.RFC3339))
|
||||||
|
fmt.Fprintf(w, " Metric: %s\n", msg.MetricName)
|
||||||
|
if len(msg.Selector) > 0 {
|
||||||
|
fmt.Fprintf(w, " Selectors: [%s]\n", strings.Join(msg.Selector, ", "))
|
||||||
|
} else {
|
||||||
|
fmt.Fprintf(w, " Selectors: (none)\n")
|
||||||
|
}
|
||||||
|
fmt.Fprintf(w, " Value: %g\n\n", msg.Value)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(w, "=== Total valid records: %d ===\n", recordNum)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type walMessage struct {
|
||||||
|
MetricName string
|
||||||
|
Selector []string
|
||||||
|
Value float32
|
||||||
|
Timestamp int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func readWALRecord(r io.Reader) (*walMessage, error) {
|
||||||
|
var magic uint32
|
||||||
|
if err := binary.Read(r, binary.LittleEndian, &magic); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("read record magic: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if magic != walRecordMagic {
|
||||||
|
return nil, fmt.Errorf("invalid record magic 0x%08X (expected 0x%08X)", magic, walRecordMagic)
|
||||||
|
}
|
||||||
|
|
||||||
|
var payloadLen uint32
|
||||||
|
if err := binary.Read(r, binary.LittleEndian, &payloadLen); err != nil {
|
||||||
|
return nil, fmt.Errorf("read payload length: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if payloadLen > 1<<20 {
|
||||||
|
return nil, fmt.Errorf("record payload too large: %d bytes", payloadLen)
|
||||||
|
}
|
||||||
|
|
||||||
|
payload := make([]byte, payloadLen)
|
||||||
|
if _, err := io.ReadFull(r, payload); err != nil {
|
||||||
|
return nil, fmt.Errorf("read payload: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var storedCRC uint32
|
||||||
|
if err := binary.Read(r, binary.LittleEndian, &storedCRC); err != nil {
|
||||||
|
return nil, fmt.Errorf("read CRC: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if crc32.ChecksumIEEE(payload) != storedCRC {
|
||||||
|
return nil, fmt.Errorf("CRC mismatch (truncated write or corruption)")
|
||||||
|
}
|
||||||
|
|
||||||
|
return parseWALPayload(payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseWALPayload(payload []byte) (*walMessage, error) {
|
||||||
|
if len(payload) < 8+2+1+4 {
|
||||||
|
return nil, fmt.Errorf("payload too short: %d bytes", len(payload))
|
||||||
|
}
|
||||||
|
|
||||||
|
offset := 0
|
||||||
|
|
||||||
|
// Timestamp (8 bytes).
|
||||||
|
ts := int64(binary.LittleEndian.Uint64(payload[offset : offset+8]))
|
||||||
|
offset += 8
|
||||||
|
|
||||||
|
// Metric name (2-byte length + bytes).
|
||||||
|
if offset+2 > len(payload) {
|
||||||
|
return nil, fmt.Errorf("metric name length overflows payload")
|
||||||
|
}
|
||||||
|
mLen := int(binary.LittleEndian.Uint16(payload[offset : offset+2]))
|
||||||
|
offset += 2
|
||||||
|
|
||||||
|
if offset+mLen > len(payload) {
|
||||||
|
return nil, fmt.Errorf("metric name overflows payload")
|
||||||
|
}
|
||||||
|
metricName := string(payload[offset : offset+mLen])
|
||||||
|
offset += mLen
|
||||||
|
|
||||||
|
// Selector count (1 byte).
|
||||||
|
if offset >= len(payload) {
|
||||||
|
return nil, fmt.Errorf("selector count overflows payload")
|
||||||
|
}
|
||||||
|
selCount := int(payload[offset])
|
||||||
|
offset++
|
||||||
|
|
||||||
|
selectors := make([]string, selCount)
|
||||||
|
for i := range selCount {
|
||||||
|
if offset >= len(payload) {
|
||||||
|
return nil, fmt.Errorf("selector[%d] length overflows payload", i)
|
||||||
|
}
|
||||||
|
sLen := int(payload[offset])
|
||||||
|
offset++
|
||||||
|
|
||||||
|
if offset+sLen > len(payload) {
|
||||||
|
return nil, fmt.Errorf("selector[%d] data overflows payload", i)
|
||||||
|
}
|
||||||
|
selectors[i] = string(payload[offset : offset+sLen])
|
||||||
|
offset += sLen
|
||||||
|
}
|
||||||
|
|
||||||
|
// Value (4 bytes, float32 bits).
|
||||||
|
if offset+4 > len(payload) {
|
||||||
|
return nil, fmt.Errorf("value overflows payload")
|
||||||
|
}
|
||||||
|
bits := binary.LittleEndian.Uint32(payload[offset : offset+4])
|
||||||
|
value := math.Float32frombits(bits)
|
||||||
|
|
||||||
|
return &walMessage{
|
||||||
|
MetricName: metricName,
|
||||||
|
Timestamp: ts,
|
||||||
|
Selector: selectors,
|
||||||
|
Value: value,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------- Binary snapshot reader ----------
|
||||||
|
|
||||||
|
func dumpBinarySnapshot(f *os.File, w *bufio.Writer) error {
|
||||||
|
br := bufio.NewReader(f)
|
||||||
|
|
||||||
|
var magic uint32
|
||||||
|
if err := binary.Read(br, binary.LittleEndian, &magic); err != nil {
|
||||||
|
return fmt.Errorf("read magic: %w", err)
|
||||||
|
}
|
||||||
|
if magic != snapFileMagic {
|
||||||
|
return fmt.Errorf("invalid snapshot magic 0x%08X (expected 0x%08X)", magic, snapFileMagic)
|
||||||
|
}
|
||||||
|
|
||||||
|
var from, to int64
|
||||||
|
if err := binary.Read(br, binary.LittleEndian, &from); err != nil {
|
||||||
|
return fmt.Errorf("read from: %w", err)
|
||||||
|
}
|
||||||
|
if err := binary.Read(br, binary.LittleEndian, &to); err != nil {
|
||||||
|
return fmt.Errorf("read to: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fromTime := time.Unix(from, 0).UTC()
|
||||||
|
toTime := time.Unix(to, 0).UTC()
|
||||||
|
|
||||||
|
fmt.Fprintf(w, "=== Binary Snapshot Dump ===\n")
|
||||||
|
fmt.Fprintf(w, "File: %s\n", f.Name())
|
||||||
|
fmt.Fprintf(w, "Magic: 0x%08X (valid)\n", magic)
|
||||||
|
fmt.Fprintf(w, "From: %d (%s)\n", from, fromTime.Format(time.RFC3339))
|
||||||
|
fmt.Fprintf(w, "To: %d (%s)\n\n", to, toTime.Format(time.RFC3339))
|
||||||
|
|
||||||
|
return dumpBinaryLevel(br, w, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func dumpBinaryLevel(r io.Reader, w *bufio.Writer, depth int) error {
|
||||||
|
indent := strings.Repeat(" ", depth)
|
||||||
|
|
||||||
|
var numMetrics uint32
|
||||||
|
if err := binary.Read(r, binary.LittleEndian, &numMetrics); err != nil {
|
||||||
|
return fmt.Errorf("read num_metrics: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if numMetrics > 0 {
|
||||||
|
fmt.Fprintf(w, "%sMetrics (%d):\n", indent, numMetrics)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range numMetrics {
|
||||||
|
name, err := readString16(r)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("read metric name [%d]: %w", i, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var freq, start int64
|
||||||
|
if err := binary.Read(r, binary.LittleEndian, &freq); err != nil {
|
||||||
|
return fmt.Errorf("read frequency for %s: %w", name, err)
|
||||||
|
}
|
||||||
|
if err := binary.Read(r, binary.LittleEndian, &start); err != nil {
|
||||||
|
return fmt.Errorf("read start for %s: %w", name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var numValues uint32
|
||||||
|
if err := binary.Read(r, binary.LittleEndian, &numValues); err != nil {
|
||||||
|
return fmt.Errorf("read num_values for %s: %w", name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
startTime := time.Unix(start, 0).UTC()
|
||||||
|
|
||||||
|
fmt.Fprintf(w, "%s [%s]\n", indent, name)
|
||||||
|
fmt.Fprintf(w, "%s Frequency: %d s\n", indent, freq)
|
||||||
|
fmt.Fprintf(w, "%s Start: %d (%s)\n", indent, start, startTime.Format(time.RFC3339))
|
||||||
|
fmt.Fprintf(w, "%s Values (%d):", indent, numValues)
|
||||||
|
|
||||||
|
if numValues == 0 {
|
||||||
|
fmt.Fprintln(w, " (none)")
|
||||||
|
} else {
|
||||||
|
fmt.Fprintln(w)
|
||||||
|
// Print values in rows of 10 for readability.
|
||||||
|
for j := range numValues {
|
||||||
|
var bits uint32
|
||||||
|
if err := binary.Read(r, binary.LittleEndian, &bits); err != nil {
|
||||||
|
return fmt.Errorf("read value[%d] for %s: %w", j, name, err)
|
||||||
|
}
|
||||||
|
val := math.Float32frombits(bits)
|
||||||
|
|
||||||
|
if j%10 == 0 {
|
||||||
|
if j > 0 {
|
||||||
|
fmt.Fprintln(w)
|
||||||
|
}
|
||||||
|
// Print the timestamp for this row's first value.
|
||||||
|
rowTS := start + int64(j)*freq
|
||||||
|
fmt.Fprintf(w, "%s [%s] ", indent, time.Unix(rowTS, 0).UTC().Format("15:04:05"))
|
||||||
|
}
|
||||||
|
|
||||||
|
if math.IsNaN(float64(val)) {
|
||||||
|
fmt.Fprintf(w, "NaN ")
|
||||||
|
} else {
|
||||||
|
fmt.Fprintf(w, "%g ", val)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fmt.Fprintln(w)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var numChildren uint32
|
||||||
|
if err := binary.Read(r, binary.LittleEndian, &numChildren); err != nil {
|
||||||
|
return fmt.Errorf("read num_children: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if numChildren > 0 {
|
||||||
|
fmt.Fprintf(w, "%sChildren (%d):\n", indent, numChildren)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range numChildren {
|
||||||
|
childName, err := readString16(r)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("read child name [%d]: %w", i, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(w, "%s [%s]\n", indent, childName)
|
||||||
|
if err := dumpBinaryLevel(r, w, depth+2); err != nil {
|
||||||
|
return fmt.Errorf("read child %s: %w", childName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readString16(r io.Reader) (string, error) {
|
||||||
|
var sLen uint16
|
||||||
|
if err := binary.Read(r, binary.LittleEndian, &sLen); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
buf := make([]byte, sLen)
|
||||||
|
if _, err := io.ReadFull(r, buf); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return string(buf), nil
|
||||||
|
}
|
||||||
4
web/frontend/package-lock.json
generated
4
web/frontend/package-lock.json
generated
@@ -1,12 +1,12 @@
|
|||||||
{
|
{
|
||||||
"name": "cc-frontend",
|
"name": "cc-frontend",
|
||||||
"version": "1.5.2",
|
"version": "1.5.3",
|
||||||
"lockfileVersion": 4,
|
"lockfileVersion": 4,
|
||||||
"requires": true,
|
"requires": true,
|
||||||
"packages": {
|
"packages": {
|
||||||
"": {
|
"": {
|
||||||
"name": "cc-frontend",
|
"name": "cc-frontend",
|
||||||
"version": "1.5.2",
|
"version": "1.5.3",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@rollup/plugin-replace": "^6.0.3",
|
"@rollup/plugin-replace": "^6.0.3",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "cc-frontend",
|
"name": "cc-frontend",
|
||||||
"version": "1.5.2",
|
"version": "1.5.3",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"build": "rollup -c",
|
"build": "rollup -c",
|
||||||
|
|||||||
@@ -54,11 +54,16 @@
|
|||||||
const paging = { itemsPerPage: 50, page: 1 };
|
const paging = { itemsPerPage: 50, page: 1 };
|
||||||
const sorting = { field: "startTime", type: "col", order: "DESC" };
|
const sorting = { field: "startTime", type: "col", order: "DESC" };
|
||||||
const nodeMetricsQuery = gql`
|
const nodeMetricsQuery = gql`
|
||||||
query ($cluster: String!, $nodes: [String!], $from: Time!, $to: Time!) {
|
query (
|
||||||
|
$cluster: String!,
|
||||||
|
$nodes: [String!],
|
||||||
|
$from: Time!,
|
||||||
|
$to: Time!,
|
||||||
|
$nodeFilter: [NodeFilter!]!,
|
||||||
|
$sorting: OrderByInput!
|
||||||
|
) {
|
||||||
nodeMetrics(cluster: $cluster, nodes: $nodes, from: $from, to: $to) {
|
nodeMetrics(cluster: $cluster, nodes: $nodes, from: $from, to: $to) {
|
||||||
host
|
host
|
||||||
nodeState
|
|
||||||
metricHealth
|
|
||||||
subCluster
|
subCluster
|
||||||
metrics {
|
metrics {
|
||||||
name
|
name
|
||||||
@@ -79,7 +84,14 @@
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
nodeStatus: nodes(filter: $nodeFilter, order: $sorting) {
|
||||||
|
count
|
||||||
|
items {
|
||||||
|
schedulerState
|
||||||
|
healthState
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
`;
|
`;
|
||||||
const nodeJobsQuery = gql`
|
const nodeJobsQuery = gql`
|
||||||
@@ -146,6 +158,8 @@
|
|||||||
nodes: [hostname],
|
nodes: [hostname],
|
||||||
from: from?.toISOString(),
|
from: from?.toISOString(),
|
||||||
to: to?.toISOString(),
|
to: to?.toISOString(),
|
||||||
|
nodeFilter: { hostname: { eq: hostname }},
|
||||||
|
sorting // $sorting unused in backend: Use placeholder
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
@@ -157,8 +171,8 @@
|
|||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
|
||||||
const thisNodeState = $derived($nodeMetricsData?.data?.nodeMetrics[0]?.nodeState || 'notindb');
|
const thisNodeState = $derived($nodeMetricsData?.data?.nodeStatus?.items[0]?.schedulerState || 'notindb');
|
||||||
const thisMetricHealth = $derived($nodeMetricsData?.data?.nodeMetrics[0]?.metricHealth || 'unknown');
|
const thisMetricHealth = $derived($nodeMetricsData?.data?.nodeStatus?.items[0]?.healthState || 'unknown');
|
||||||
</script>
|
</script>
|
||||||
|
|
||||||
<Row cols={{ xs: 2, lg: 3}}>
|
<Row cols={{ xs: 2, lg: 3}}>
|
||||||
|
|||||||
@@ -166,12 +166,12 @@
|
|||||||
items.push({ project: { [filters.projectMatch]: filters.project } });
|
items.push({ project: { [filters.projectMatch]: filters.project } });
|
||||||
if (filters.user)
|
if (filters.user)
|
||||||
items.push({ user: { [filters.userMatch]: filters.user } });
|
items.push({ user: { [filters.userMatch]: filters.user } });
|
||||||
if (filters.numNodes.from != null || filters.numNodes.to != null) {
|
if (filters.numNodes.from != null && filters.numNodes.to != null) {
|
||||||
items.push({
|
items.push({
|
||||||
numNodes: { from: filters.numNodes.from, to: filters.numNodes.to },
|
numNodes: { from: filters.numNodes.from, to: filters.numNodes.to },
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
if (filters.numAccelerators.from != null || filters.numAccelerators.to != null) {
|
if (filters.numAccelerators.from != null && filters.numAccelerators.to != null) {
|
||||||
items.push({
|
items.push({
|
||||||
numAccelerators: {
|
numAccelerators: {
|
||||||
from: filters.numAccelerators.from,
|
from: filters.numAccelerators.from,
|
||||||
@@ -179,7 +179,7 @@
|
|||||||
},
|
},
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
if (filters.numHWThreads.from != null || filters.numHWThreads.to != null) {
|
if (filters.numHWThreads.from != null && filters.numHWThreads.to != null) {
|
||||||
items.push({
|
items.push({
|
||||||
numHWThreads: {
|
numHWThreads: {
|
||||||
from: filters.numHWThreads.from,
|
from: filters.numHWThreads.from,
|
||||||
@@ -206,14 +206,21 @@
|
|||||||
items.push({ duration: { to: filters.duration.lessThan, from: 0 } });
|
items.push({ duration: { to: filters.duration.lessThan, from: 0 } });
|
||||||
if (filters.duration.moreThan)
|
if (filters.duration.moreThan)
|
||||||
items.push({ duration: { to: 0, from: filters.duration.moreThan } });
|
items.push({ duration: { to: 0, from: filters.duration.moreThan } });
|
||||||
if (filters.energy.from != null || filters.energy.to != null)
|
if (filters.energy.from != null && filters.energy.to != null)
|
||||||
items.push({
|
items.push({
|
||||||
energy: { from: filters.energy.from, to: filters.energy.to },
|
energy: { from: filters.energy.from, to: filters.energy.to },
|
||||||
});
|
});
|
||||||
if (filters.jobId)
|
if (filters.jobId)
|
||||||
items.push({ jobId: { [filters.jobIdMatch]: filters.jobId } });
|
items.push({ jobId: { [filters.jobIdMatch]: filters.jobId } });
|
||||||
if (filters.stats.length != 0)
|
if (filters.stats.length != 0) {
|
||||||
items.push({ metricStats: filters.stats.map((st) => { return { metricName: st.field, range: { from: st.from, to: st.to }} }) });
|
const metricStats = [];
|
||||||
|
filters.stats.forEach((st) => {
|
||||||
|
if (st.from != null && st.to != null)
|
||||||
|
metricStats.push({ metricName: st.field, range: { from: st.from, to: st.to }});
|
||||||
|
});
|
||||||
|
if (metricStats.length != 0)
|
||||||
|
items.push({metricStats})
|
||||||
|
};
|
||||||
if (filters.node) items.push({ node: { [filters.nodeMatch]: filters.node } });
|
if (filters.node) items.push({ node: { [filters.nodeMatch]: filters.node } });
|
||||||
if (filters.jobName) items.push({ jobName: { contains: filters.jobName } });
|
if (filters.jobName) items.push({ jobName: { contains: filters.jobName } });
|
||||||
if (filters.schedule) items.push({ schedule: filters.schedule });
|
if (filters.schedule) items.push({ schedule: filters.schedule });
|
||||||
@@ -280,40 +287,40 @@
|
|||||||
opts.push(`duration=morethan-${filters.duration.moreThan}`);
|
opts.push(`duration=morethan-${filters.duration.moreThan}`);
|
||||||
if (filters.tags.length != 0)
|
if (filters.tags.length != 0)
|
||||||
for (let tag of filters.tags) opts.push(`tag=${tag}`);
|
for (let tag of filters.tags) opts.push(`tag=${tag}`);
|
||||||
if (filters.numNodes.from > 1 && filters.numNodes.to > 0)
|
if (filters.numNodes.from > 0 && filters.numNodes.to > 0)
|
||||||
opts.push(`numNodes=${filters.numNodes.from}-${filters.numNodes.to}`);
|
opts.push(`numNodes=${filters.numNodes.from}-${filters.numNodes.to}`);
|
||||||
else if (filters.numNodes.from > 1 && filters.numNodes.to == 0)
|
else if (filters.numNodes.from > 0 && filters.numNodes.to == 0)
|
||||||
opts.push(`numNodes=morethan-${filters.numNodes.from}`);
|
opts.push(`numNodes=morethan-${filters.numNodes.from}`);
|
||||||
else if (filters.numNodes.from == 1 && filters.numNodes.to > 0)
|
else if (filters.numNodes.from == 0 && filters.numNodes.to > 0)
|
||||||
opts.push(`numNodes=lessthan-${filters.numNodes.to}`);
|
opts.push(`numNodes=lessthan-${filters.numNodes.to}`);
|
||||||
if (filters.numHWThreads.from > 1 && filters.numHWThreads.to > 0)
|
if (filters.numHWThreads.from > 0 && filters.numHWThreads.to > 0)
|
||||||
opts.push(`numHWThreads=${filters.numHWThreads.from}-${filters.numHWThreads.to}`);
|
opts.push(`numHWThreads=${filters.numHWThreads.from}-${filters.numHWThreads.to}`);
|
||||||
else if (filters.numHWThreads.from > 1 && filters.numHWThreads.to == 0)
|
else if (filters.numHWThreads.from > 0 && filters.numHWThreads.to == 0)
|
||||||
opts.push(`numHWThreads=morethan-${filters.numHWThreads.from}`);
|
opts.push(`numHWThreads=morethan-${filters.numHWThreads.from}`);
|
||||||
else if (filters.numHWThreads.from == 1 && filters.numHWThreads.to > 0)
|
else if (filters.numHWThreads.from == 0 && filters.numHWThreads.to > 0)
|
||||||
opts.push(`numHWThreads=lessthan-${filters.numHWThreads.to}`);
|
opts.push(`numHWThreads=lessthan-${filters.numHWThreads.to}`);
|
||||||
if (filters.numAccelerators.from && filters.numAccelerators.to)
|
if (filters.numAccelerators.from > 0 && filters.numAccelerators.to > 0)
|
||||||
opts.push(`numAccelerators=${filters.numAccelerators.from}-${filters.numAccelerators.to}`);
|
opts.push(`numAccelerators=${filters.numAccelerators.from}-${filters.numAccelerators.to}`);
|
||||||
else if (filters.numAccelerators.from > 1 && filters.numAccelerators.to == 0)
|
else if (filters.numAccelerators.from > 0 && filters.numAccelerators.to == 0)
|
||||||
opts.push(`numAccelerators=morethan-${filters.numAccelerators.from}`);
|
opts.push(`numAccelerators=morethan-${filters.numAccelerators.from}`);
|
||||||
else if (filters.numAccelerators.from == 1 && filters.numAccelerators.to > 0)
|
else if (filters.numAccelerators.from == 0 && filters.numAccelerators.to > 0)
|
||||||
opts.push(`numAccelerators=lessthan-${filters.numAccelerators.to}`);
|
opts.push(`numAccelerators=lessthan-${filters.numAccelerators.to}`);
|
||||||
if (filters.node) opts.push(`node=${filters.node}`);
|
if (filters.node) opts.push(`node=${filters.node}`);
|
||||||
if (filters.node && filters.nodeMatch != "eq") // "eq" is default-case
|
if (filters.node && filters.nodeMatch != "eq") // "eq" is default-case
|
||||||
opts.push(`nodeMatch=${filters.nodeMatch}`);
|
opts.push(`nodeMatch=${filters.nodeMatch}`);
|
||||||
if (filters.energy.from > 1 && filters.energy.to > 0)
|
if (filters.energy.from > 0 && filters.energy.to > 0)
|
||||||
opts.push(`energy=${filters.energy.from}-${filters.energy.to}`);
|
opts.push(`energy=${filters.energy.from}-${filters.energy.to}`);
|
||||||
else if (filters.energy.from > 1 && filters.energy.to == 0)
|
else if (filters.energy.from > 0 && filters.energy.to == 0)
|
||||||
opts.push(`energy=morethan-${filters.energy.from}`);
|
opts.push(`energy=morethan-${filters.energy.from}`);
|
||||||
else if (filters.energy.from == 1 && filters.energy.to > 0)
|
else if (filters.energy.from == 0 && filters.energy.to > 0)
|
||||||
opts.push(`energy=lessthan-${filters.energy.to}`);
|
opts.push(`energy=lessthan-${filters.energy.to}`);
|
||||||
if (filters.stats.length > 0)
|
if (filters.stats.length > 0)
|
||||||
for (let stat of filters.stats) {
|
for (let stat of filters.stats) {
|
||||||
if (stat.from > 1 && stat.to > 0)
|
if (stat.from > 0 && stat.to > 0)
|
||||||
opts.push(`stat=${stat.field}-${stat.from}-${stat.to}`);
|
opts.push(`stat=${stat.field}-${stat.from}-${stat.to}`);
|
||||||
else if (stat.from > 1 && stat.to == 0)
|
else if (stat.from > 0 && stat.to == 0)
|
||||||
opts.push(`stat=${stat.field}-morethan-${stat.from}`);
|
opts.push(`stat=${stat.field}-morethan-${stat.from}`);
|
||||||
else if (stat.from == 1 && stat.to > 0)
|
else if (stat.from == 0 && stat.to > 0)
|
||||||
opts.push(`stat=${stat.field}-lessthan-${stat.to}`);
|
opts.push(`stat=${stat.field}-lessthan-${stat.to}`);
|
||||||
}
|
}
|
||||||
// Build && Return
|
// Build && Return
|
||||||
@@ -511,43 +518,43 @@
|
|||||||
</Info>
|
</Info>
|
||||||
{/if}
|
{/if}
|
||||||
|
|
||||||
{#if filters.numNodes.from > 1 && filters.numNodes.to > 0}
|
{#if filters.numNodes.from > 0 && filters.numNodes.to > 0}
|
||||||
<Info icon="hdd-stack" onclick={() => (isResourcesOpen = true)}>
|
<Info icon="hdd-stack" onclick={() => (isResourcesOpen = true)}>
|
||||||
Nodes: {filters.numNodes.from} - {filters.numNodes.to}
|
Nodes: {filters.numNodes.from} - {filters.numNodes.to}
|
||||||
</Info>
|
</Info>
|
||||||
{:else if filters.numNodes.from > 1 && filters.numNodes.to == 0}
|
{:else if filters.numNodes.from > 0 && filters.numNodes.to == 0}
|
||||||
<Info icon="hdd-stack" onclick={() => (isResourcesOpen = true)}>
|
<Info icon="hdd-stack" onclick={() => (isResourcesOpen = true)}>
|
||||||
≥ {filters.numNodes.from} Node(s)
|
≥ {filters.numNodes.from} Node(s)
|
||||||
</Info>
|
</Info>
|
||||||
{:else if filters.numNodes.from == 1 && filters.numNodes.to > 0}
|
{:else if filters.numNodes.from == 0 && filters.numNodes.to > 0}
|
||||||
<Info icon="hdd-stack" onclick={() => (isResourcesOpen = true)}>
|
<Info icon="hdd-stack" onclick={() => (isResourcesOpen = true)}>
|
||||||
≤ {filters.numNodes.to} Node(s)
|
≤ {filters.numNodes.to} Node(s)
|
||||||
</Info>
|
</Info>
|
||||||
{/if}
|
{/if}
|
||||||
|
|
||||||
{#if filters.numHWThreads.from > 1 && filters.numHWThreads.to > 0}
|
{#if filters.numHWThreads.from > 0 && filters.numHWThreads.to > 0}
|
||||||
<Info icon="cpu" onclick={() => (isResourcesOpen = true)}>
|
<Info icon="cpu" onclick={() => (isResourcesOpen = true)}>
|
||||||
HWThreads: {filters.numHWThreads.from} - {filters.numHWThreads.to}
|
HWThreads: {filters.numHWThreads.from} - {filters.numHWThreads.to}
|
||||||
</Info>
|
</Info>
|
||||||
{:else if filters.numHWThreads.from > 1 && filters.numHWThreads.to == 0}
|
{:else if filters.numHWThreads.from > 0 && filters.numHWThreads.to == 0}
|
||||||
<Info icon="cpu" onclick={() => (isResourcesOpen = true)}>
|
<Info icon="cpu" onclick={() => (isResourcesOpen = true)}>
|
||||||
≥ {filters.numHWThreads.from} HWThread(s)
|
≥ {filters.numHWThreads.from} HWThread(s)
|
||||||
</Info>
|
</Info>
|
||||||
{:else if filters.numHWThreads.from == 1 && filters.numHWThreads.to > 0}
|
{:else if filters.numHWThreads.from == 0 && filters.numHWThreads.to > 0}
|
||||||
<Info icon="cpu" onclick={() => (isResourcesOpen = true)}>
|
<Info icon="cpu" onclick={() => (isResourcesOpen = true)}>
|
||||||
≤ {filters.numHWThreads.to} HWThread(s)
|
≤ {filters.numHWThreads.to} HWThread(s)
|
||||||
</Info>
|
</Info>
|
||||||
{/if}
|
{/if}
|
||||||
|
|
||||||
{#if filters.numAccelerators.from > 1 && filters.numAccelerators.to > 0}
|
{#if filters.numAccelerators.from > 0 && filters.numAccelerators.to > 0}
|
||||||
<Info icon="gpu-card" onclick={() => (isResourcesOpen = true)}>
|
<Info icon="gpu-card" onclick={() => (isResourcesOpen = true)}>
|
||||||
Accelerators: {filters.numAccelerators.from} - {filters.numAccelerators.to}
|
Accelerators: {filters.numAccelerators.from} - {filters.numAccelerators.to}
|
||||||
</Info>
|
</Info>
|
||||||
{:else if filters.numAccelerators.from > 1 && filters.numAccelerators.to == 0}
|
{:else if filters.numAccelerators.from > 0 && filters.numAccelerators.to == 0}
|
||||||
<Info icon="gpu-card" onclick={() => (isResourcesOpen = true)}>
|
<Info icon="gpu-card" onclick={() => (isResourcesOpen = true)}>
|
||||||
≥ {filters.numAccelerators.from} Acc(s)
|
≥ {filters.numAccelerators.from} Acc(s)
|
||||||
</Info>
|
</Info>
|
||||||
{:else if filters.numAccelerators.from == 1 && filters.numAccelerators.to > 0}
|
{:else if filters.numAccelerators.from == 0 && filters.numAccelerators.to > 0}
|
||||||
<Info icon="gpu-card" onclick={() => (isResourcesOpen = true)}>
|
<Info icon="gpu-card" onclick={() => (isResourcesOpen = true)}>
|
||||||
≤ {filters.numAccelerators.to} Acc(s)
|
≤ {filters.numAccelerators.to} Acc(s)
|
||||||
</Info>
|
</Info>
|
||||||
@@ -559,15 +566,15 @@
|
|||||||
</Info>
|
</Info>
|
||||||
{/if}
|
{/if}
|
||||||
|
|
||||||
{#if filters.energy.from > 1 && filters.energy.to > 0}
|
{#if filters.energy.from > 0 && filters.energy.to > 0}
|
||||||
<Info icon="lightning-charge-fill" onclick={() => (isEnergyOpen = true)}>
|
<Info icon="lightning-charge-fill" onclick={() => (isEnergyOpen = true)}>
|
||||||
Total Energy: {filters.energy.from} - {filters.energy.to} kWh
|
Total Energy: {filters.energy.from} - {filters.energy.to} kWh
|
||||||
</Info>
|
</Info>
|
||||||
{:else if filters.energy.from > 1 && filters.energy.to == 0}
|
{:else if filters.energy.from > 0 && filters.energy.to == 0}
|
||||||
<Info icon="lightning-charge-fill" onclick={() => (isEnergyOpen = true)}>
|
<Info icon="lightning-charge-fill" onclick={() => (isEnergyOpen = true)}>
|
||||||
Total Energy ≥ {filters.energy.from} kWh
|
Total Energy ≥ {filters.energy.from} kWh
|
||||||
</Info>
|
</Info>
|
||||||
{:else if filters.energy.from == 1 && filters.energy.to > 0}
|
{:else if filters.energy.from == 0 && filters.energy.to > 0}
|
||||||
<Info icon="lightning-charge-fill" onclick={() => (isEnergyOpen = true)}>
|
<Info icon="lightning-charge-fill" onclick={() => (isEnergyOpen = true)}>
|
||||||
Total Energy ≤ {filters.energy.to} kWh
|
Total Energy ≤ {filters.energy.to} kWh
|
||||||
</Info>
|
</Info>
|
||||||
@@ -575,15 +582,15 @@
|
|||||||
|
|
||||||
{#if filters.stats.length > 0}
|
{#if filters.stats.length > 0}
|
||||||
{#each filters.stats as stat}
|
{#each filters.stats as stat}
|
||||||
{#if stat.from > 1 && stat.to > 0}
|
{#if stat.from > 0 && stat.to > 0}
|
||||||
<Info icon="bar-chart" onclick={() => (isStatsOpen = true)}>
|
<Info icon="bar-chart" onclick={() => (isStatsOpen = true)}>
|
||||||
{stat.field}: {stat.from} - {stat.to} {stat.unit}
|
{stat.field}: {stat.from} - {stat.to} {stat.unit}
|
||||||
</Info> 
|
</Info> 
|
||||||
{:else if stat.from > 1 && stat.to == 0}
|
{:else if stat.from > 0 && stat.to == 0}
|
||||||
<Info icon="bar-chart" onclick={() => (isStatsOpen = true)}>
|
<Info icon="bar-chart" onclick={() => (isStatsOpen = true)}>
|
||||||
{stat.field} ≥ {stat.from} {stat.unit}
|
{stat.field} ≥ {stat.from} {stat.unit}
|
||||||
</Info> 
|
</Info> 
|
||||||
{:else if stat.from == 1 && stat.to > 0}
|
{:else if stat.from == 0 && stat.to > 0}
|
||||||
<Info icon="bar-chart" onclick={() => (isStatsOpen = true)}>
|
<Info icon="bar-chart" onclick={() => (isStatsOpen = true)}>
|
||||||
{stat.field} ≤ {stat.to} {stat.unit}
|
{stat.field} ≤ {stat.to} {stat.unit}
|
||||||
</Info> 
|
</Info> 
|
||||||
|
|||||||
@@ -28,31 +28,29 @@
|
|||||||
} = $props();
|
} = $props();
|
||||||
|
|
||||||
/* Const */
|
/* Const */
|
||||||
const minEnergyPreset = 1;
|
const minEnergyPreset = 0;
|
||||||
const maxEnergyPreset = 100;
|
const maxEnergyPreset = 100;
|
||||||
|
|
||||||
/* Derived */
|
/* Derived */
|
||||||
// Pending
|
// Pending
|
||||||
let pendingEnergyState = $derived({
|
let pendingEnergyState = $derived({
|
||||||
from: presetEnergy?.from ? presetEnergy.from : minEnergyPreset,
|
from: presetEnergy?.from || minEnergyPreset,
|
||||||
to: !(presetEnergy.to == null || presetEnergy.to == 0) ? presetEnergy.to : maxEnergyPreset,
|
to: (presetEnergy.to == 0) ? null : presetEnergy.to,
|
||||||
});
|
});
|
||||||
// Changable
|
// Changable
|
||||||
let energyState = $derived({
|
let energyState = $derived({
|
||||||
from: presetEnergy?.from ? presetEnergy.from : minEnergyPreset,
|
from: presetEnergy?.from || minEnergyPreset,
|
||||||
to: !(presetEnergy.to == null || presetEnergy.to == 0) ? presetEnergy.to : maxEnergyPreset,
|
to: (presetEnergy.to == 0) ? null : presetEnergy.to,
|
||||||
});
|
});
|
||||||
|
|
||||||
const energyActive = $derived(!(JSON.stringify(energyState) === JSON.stringify({ from: minEnergyPreset, to: maxEnergyPreset })));
|
const energyActive = $derived(!(JSON.stringify(energyState) === JSON.stringify({ from: minEnergyPreset, to: null })));
|
||||||
// Block Apply if null
|
|
||||||
const disableApply = $derived(energyState.from === null || energyState.to === null);
|
|
||||||
|
|
||||||
/* Function */
|
/* Function */
|
||||||
function setEnergy() {
|
function setEnergy() {
|
||||||
if (energyActive) {
|
if (energyActive) {
|
||||||
pendingEnergyState = {
|
pendingEnergyState = {
|
||||||
from: energyState.from,
|
from: (!energyState?.from) ? 0 : energyState.from,
|
||||||
to: (energyState.to == maxEnergyPreset) ? 0 : energyState.to
|
to: (energyState.to === null) ? 0 : energyState.to
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
pendingEnergyState = { from: null, to: null};
|
pendingEnergyState = { from: null, to: null};
|
||||||
@@ -86,7 +84,6 @@
|
|||||||
<ModalFooter>
|
<ModalFooter>
|
||||||
<Button
|
<Button
|
||||||
color="primary"
|
color="primary"
|
||||||
disabled={disableApply}
|
|
||||||
onclick={() => {
|
onclick={() => {
|
||||||
isOpen = false;
|
isOpen = false;
|
||||||
setEnergy();
|
setEnergy();
|
||||||
|
|||||||
@@ -98,44 +98,38 @@
|
|||||||
// Pending
|
// Pending
|
||||||
let pendingNumNodes = $derived({
|
let pendingNumNodes = $derived({
|
||||||
from: presetNumNodes.from,
|
from: presetNumNodes.from,
|
||||||
to: (presetNumNodes.to == 0) ? maxNumNodes : presetNumNodes.to
|
to: (presetNumNodes.to == 0) ? null : presetNumNodes.to
|
||||||
});
|
});
|
||||||
let pendingNumHWThreads = $derived({
|
let pendingNumHWThreads = $derived({
|
||||||
from: presetNumHWThreads.from,
|
from: presetNumHWThreads.from,
|
||||||
to: (presetNumHWThreads.to == 0) ? maxNumHWThreads : presetNumHWThreads.to
|
to: (presetNumHWThreads.to == 0) ? null : presetNumHWThreads.to
|
||||||
});
|
});
|
||||||
let pendingNumAccelerators = $derived({
|
let pendingNumAccelerators = $derived({
|
||||||
from: presetNumAccelerators.from,
|
from: presetNumAccelerators.from,
|
||||||
to: (presetNumAccelerators.to == 0) ? maxNumAccelerators : presetNumAccelerators.to
|
to: (presetNumAccelerators.to == 0) ? null : presetNumAccelerators.to
|
||||||
});
|
});
|
||||||
let pendingNamedNode = $derived(presetNamedNode);
|
let pendingNamedNode = $derived(presetNamedNode);
|
||||||
let pendingNodeMatch = $derived(presetNodeMatch);
|
let pendingNodeMatch = $derived(presetNodeMatch);
|
||||||
// Changable States
|
// Changable States
|
||||||
let nodesState = $derived({
|
let nodesState = $derived({
|
||||||
from: presetNumNodes.from,
|
from: presetNumNodes?.from || 0,
|
||||||
to: (presetNumNodes.to == 0) ? maxNumNodes : presetNumNodes.to
|
to: (presetNumNodes.to == 0) ? null : presetNumNodes.to
|
||||||
});
|
});
|
||||||
let threadState = $derived({
|
let threadState = $derived({
|
||||||
from: presetNumHWThreads.from,
|
from: presetNumHWThreads?.from || 0,
|
||||||
to: (presetNumHWThreads.to == 0) ? maxNumHWThreads : presetNumHWThreads.to
|
to: (presetNumHWThreads.to == 0) ? null : presetNumHWThreads.to
|
||||||
});
|
});
|
||||||
let accState = $derived({
|
let accState = $derived({
|
||||||
from: presetNumAccelerators.from,
|
from: presetNumAccelerators?.from || 0,
|
||||||
to: (presetNumAccelerators.to == 0) ? maxNumAccelerators : presetNumAccelerators.to
|
to: (presetNumAccelerators.to == 0) ? null : presetNumAccelerators.to
|
||||||
});
|
});
|
||||||
|
|
||||||
const initialized = $derived(getContext("initialized") || false);
|
const initialized = $derived(getContext("initialized") || false);
|
||||||
const clusterInfos = $derived($initialized ? getContext("clusters") : null);
|
const clusterInfos = $derived($initialized ? getContext("clusters") : null);
|
||||||
// Is Selection Active
|
// Is Selection Active
|
||||||
const nodesActive = $derived(!(JSON.stringify(nodesState) === JSON.stringify({ from: 1, to: maxNumNodes })));
|
const nodesActive = $derived(!(JSON.stringify(nodesState) === JSON.stringify({ from: 0, to: null })));
|
||||||
const threadActive = $derived(!(JSON.stringify(threadState) === JSON.stringify({ from: 1, to: maxNumHWThreads })));
|
const threadActive = $derived(!(JSON.stringify(threadState) === JSON.stringify({ from: 0, to: null })));
|
||||||
const accActive = $derived(!(JSON.stringify(accState) === JSON.stringify({ from: 1, to: maxNumAccelerators })));
|
const accActive = $derived(!(JSON.stringify(accState) === JSON.stringify({ from: 0, to: null })));
|
||||||
// Block Apply if null
|
|
||||||
const disableApply = $derived(
|
|
||||||
nodesState.from === null || nodesState.to === null ||
|
|
||||||
threadState.from === null || threadState.to === null ||
|
|
||||||
accState.from === null || accState.to === null
|
|
||||||
);
|
|
||||||
|
|
||||||
/* Reactive Effects | Svelte 5 onMount */
|
/* Reactive Effects | Svelte 5 onMount */
|
||||||
$effect(() => {
|
$effect(() => {
|
||||||
@@ -153,58 +147,28 @@
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
$effect(() => {
|
|
||||||
if (
|
|
||||||
$initialized &&
|
|
||||||
pendingNumNodes.from == null &&
|
|
||||||
pendingNumNodes.to == null
|
|
||||||
) {
|
|
||||||
nodesState = { from: 1, to: maxNumNodes };
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
$effect(() => {
|
|
||||||
if (
|
|
||||||
$initialized &&
|
|
||||||
pendingNumHWThreads.from == null &&
|
|
||||||
pendingNumHWThreads.to == null
|
|
||||||
) {
|
|
||||||
threadState = { from: 1, to: maxNumHWThreads };
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
$effect(() => {
|
|
||||||
if (
|
|
||||||
$initialized &&
|
|
||||||
pendingNumAccelerators.from == null &&
|
|
||||||
pendingNumAccelerators.to == null
|
|
||||||
) {
|
|
||||||
accState = { from: 1, to: maxNumAccelerators };
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
/* Functions */
|
/* Functions */
|
||||||
function setResources() {
|
function setResources() {
|
||||||
if (nodesActive) {
|
if (nodesActive) {
|
||||||
pendingNumNodes = {
|
pendingNumNodes = {
|
||||||
from: nodesState.from,
|
from: (!nodesState?.from) ? 0 : nodesState.from,
|
||||||
to: (nodesState.to == maxNumNodes) ? 0 : nodesState.to
|
to: (nodesState.to === null) ? 0 : nodesState.to
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
pendingNumNodes = { from: null, to: null};
|
pendingNumNodes = { from: null, to: null};
|
||||||
};
|
};
|
||||||
if (threadActive) {
|
if (threadActive) {
|
||||||
pendingNumHWThreads = {
|
pendingNumHWThreads = {
|
||||||
from: threadState.from,
|
from: (!threadState?.from) ? 0 : threadState.from,
|
||||||
to: (threadState.to == maxNumHWThreads) ? 0 : threadState.to
|
to: (threadState.to === null) ? 0 : threadState.to
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
pendingNumHWThreads = { from: null, to: null};
|
pendingNumHWThreads = { from: null, to: null};
|
||||||
};
|
};
|
||||||
if (accActive) {
|
if (accActive) {
|
||||||
pendingNumAccelerators = {
|
pendingNumAccelerators = {
|
||||||
from: accState.from,
|
from: (!accState?.from) ? 0 : accState.from,
|
||||||
to: (accState.to == maxNumAccelerators) ? 0 : accState.to
|
to: (accState.to === null) ? 0 : accState.to
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
pendingNumAccelerators = { from: null, to: null};
|
pendingNumAccelerators = { from: null, to: null};
|
||||||
@@ -249,7 +213,7 @@
|
|||||||
nodesState.from = detail[0];
|
nodesState.from = detail[0];
|
||||||
nodesState.to = detail[1];
|
nodesState.to = detail[1];
|
||||||
}}
|
}}
|
||||||
sliderMin={1}
|
sliderMin={0}
|
||||||
sliderMax={maxNumNodes}
|
sliderMax={maxNumNodes}
|
||||||
fromPreset={nodesState.from}
|
fromPreset={nodesState.from}
|
||||||
toPreset={nodesState.to}
|
toPreset={nodesState.to}
|
||||||
@@ -269,7 +233,7 @@
|
|||||||
threadState.from = detail[0];
|
threadState.from = detail[0];
|
||||||
threadState.to = detail[1];
|
threadState.to = detail[1];
|
||||||
}}
|
}}
|
||||||
sliderMin={1}
|
sliderMin={0}
|
||||||
sliderMax={maxNumHWThreads}
|
sliderMax={maxNumHWThreads}
|
||||||
fromPreset={threadState.from}
|
fromPreset={threadState.from}
|
||||||
toPreset={threadState.to}
|
toPreset={threadState.to}
|
||||||
@@ -289,7 +253,7 @@
|
|||||||
accState.from = detail[0];
|
accState.from = detail[0];
|
||||||
accState.to = detail[1];
|
accState.to = detail[1];
|
||||||
}}
|
}}
|
||||||
sliderMin={1}
|
sliderMin={0}
|
||||||
sliderMax={maxNumAccelerators}
|
sliderMax={maxNumAccelerators}
|
||||||
fromPreset={accState.from}
|
fromPreset={accState.from}
|
||||||
toPreset={accState.to}
|
toPreset={accState.to}
|
||||||
@@ -300,7 +264,6 @@
|
|||||||
<ModalFooter>
|
<ModalFooter>
|
||||||
<Button
|
<Button
|
||||||
color="primary"
|
color="primary"
|
||||||
disabled={disableApply}
|
|
||||||
onclick={() => {
|
onclick={() => {
|
||||||
isOpen = false;
|
isOpen = false;
|
||||||
setResources();
|
setResources();
|
||||||
|
|||||||
@@ -34,7 +34,8 @@
|
|||||||
function setRanges() {
|
function setRanges() {
|
||||||
for (let as of availableStats) {
|
for (let as of availableStats) {
|
||||||
if (as.enabled) {
|
if (as.enabled) {
|
||||||
as.to = (as.to == as.peak) ? 0 : as.to
|
as.from = (!as?.from) ? 0 : as.from,
|
||||||
|
as.to = (as.to == null) ? 0 : as.to
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@@ -42,8 +43,8 @@
|
|||||||
function resetRanges() {
|
function resetRanges() {
|
||||||
for (let as of availableStats) {
|
for (let as of availableStats) {
|
||||||
as.enabled = false
|
as.enabled = false
|
||||||
as.from = 1
|
as.from = null
|
||||||
as.to = as.peak
|
as.to = null
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
</script>
|
</script>
|
||||||
@@ -66,13 +67,13 @@
|
|||||||
changeRange={(detail) => {
|
changeRange={(detail) => {
|
||||||
aStat.from = detail[0];
|
aStat.from = detail[0];
|
||||||
aStat.to = detail[1];
|
aStat.to = detail[1];
|
||||||
if (aStat.from == 1 && aStat.to == aStat.peak) {
|
if (aStat.from == 0 && aStat.to === null) {
|
||||||
aStat.enabled = false;
|
aStat.enabled = false;
|
||||||
} else {
|
} else {
|
||||||
aStat.enabled = true;
|
aStat.enabled = true;
|
||||||
}
|
}
|
||||||
}}
|
}}
|
||||||
sliderMin={1}
|
sliderMin={0}
|
||||||
sliderMax={aStat.peak}
|
sliderMax={aStat.peak}
|
||||||
fromPreset={aStat.from}
|
fromPreset={aStat.from}
|
||||||
toPreset={aStat.to}
|
toPreset={aStat.to}
|
||||||
|
|||||||
@@ -21,7 +21,7 @@
|
|||||||
let {
|
let {
|
||||||
sliderMin,
|
sliderMin,
|
||||||
sliderMax,
|
sliderMax,
|
||||||
fromPreset = 1,
|
fromPreset = 0,
|
||||||
toPreset = 100,
|
toPreset = 100,
|
||||||
changeRange
|
changeRange
|
||||||
} = $props();
|
} = $props();
|
||||||
@@ -33,9 +33,9 @@
|
|||||||
/* Derived */
|
/* Derived */
|
||||||
let pendingValues = $derived([fromPreset, toPreset]);
|
let pendingValues = $derived([fromPreset, toPreset]);
|
||||||
let sliderFrom = $derived(Math.max(((fromPreset == null ? sliderMin : fromPreset) - sliderMin) / (sliderMax - sliderMin), 0.));
|
let sliderFrom = $derived(Math.max(((fromPreset == null ? sliderMin : fromPreset) - sliderMin) / (sliderMax - sliderMin), 0.));
|
||||||
let sliderTo = $derived(Math.min(((toPreset == null ? sliderMin : toPreset) - sliderMin) / (sliderMax - sliderMin), 1.));
|
let sliderTo = $derived(Math.min(((toPreset == null ? sliderMax : toPreset) - sliderMin) / (sliderMax - sliderMin), 1.));
|
||||||
let inputFieldFrom = $derived(fromPreset ? fromPreset.toString() : null);
|
let inputFieldFrom = $derived(fromPreset != null ? fromPreset.toString() : null);
|
||||||
let inputFieldTo = $derived(toPreset ? toPreset.toString() : null);
|
let inputFieldTo = $derived(toPreset != null ? toPreset.toString() : null);
|
||||||
|
|
||||||
/* Var Init */
|
/* Var Init */
|
||||||
let timeoutId = null;
|
let timeoutId = null;
|
||||||
@@ -79,17 +79,22 @@
|
|||||||
evt.preventDefault()
|
evt.preventDefault()
|
||||||
evt.stopPropagation()
|
evt.stopPropagation()
|
||||||
const newV = Number.parseInt(evt.target.value);
|
const newV = Number.parseInt(evt.target.value);
|
||||||
const newP = clamp((newV - sliderMin) / (sliderMax - sliderMin), 0., 1.)
|
const newP = clamp((newV - sliderMin) / (sliderMax - sliderMin), 0., 1., target)
|
||||||
updateStates(newV, newP, target);
|
updateStates(newV, newP, target);
|
||||||
}
|
}
|
||||||
|
|
||||||
function clamp(x, testMin, testMax) {
|
function clamp(x, testMin, testMax, target) {
|
||||||
return x < testMin
|
if (isNaN(x)) {
|
||||||
? testMin
|
if (target == 'from') return testMin
|
||||||
: (x > testMax
|
else if (target == 'to') return testMax
|
||||||
? testMax
|
} else {
|
||||||
: x
|
return x < testMin
|
||||||
);
|
? testMin
|
||||||
|
: (x > testMax
|
||||||
|
? testMax
|
||||||
|
: x
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function draggable(node) {
|
function draggable(node) {
|
||||||
@@ -159,23 +164,23 @@
|
|||||||
|
|
||||||
<div class="double-range-container">
|
<div class="double-range-container">
|
||||||
<div class="header">
|
<div class="header">
|
||||||
<input class="form-control" type="text" placeholder="from..." value={inputFieldFrom}
|
<input class="form-control" type="text" placeholder={`${sliderMin} ...`} value={inputFieldFrom}
|
||||||
oninput={(e) => {
|
oninput={(e) => {
|
||||||
inputChanged(e, 'from');
|
inputChanged(e, 'from');
|
||||||
}}
|
}}
|
||||||
/>
|
/>
|
||||||
|
|
||||||
{#if inputFieldFrom != sliderMin?.toString() && inputFieldTo != sliderMax?.toString() }
|
{#if (inputFieldFrom && inputFieldFrom != sliderMin?.toString()) && inputFieldTo != null }
|
||||||
<span>Selected: Range <b> {inputFieldFrom} </b> - <b> {inputFieldTo} </b></span>
|
<span>Selected: Range <b> {inputFieldFrom} </b> - <b> {inputFieldTo} </b></span>
|
||||||
{:else if inputFieldFrom != sliderMin?.toString() && inputFieldTo == sliderMax?.toString() }
|
{:else if (inputFieldFrom && inputFieldFrom != sliderMin?.toString()) && inputFieldTo == null }
|
||||||
<span>Selected: More than <b> {inputFieldFrom} </b> </span>
|
<span>Selected: More Than Equal <b> {inputFieldFrom} </b> </span>
|
||||||
{:else if inputFieldFrom == sliderMin?.toString() && inputFieldTo != sliderMax?.toString() }
|
{:else if (!inputFieldFrom || inputFieldFrom == sliderMin?.toString()) && inputFieldTo != null }
|
||||||
<span>Selected: Less than <b> {inputFieldTo} </b></span>
|
<span>Selected: Less Than Equal <b> {inputFieldTo} </b></span>
|
||||||
{:else}
|
{:else}
|
||||||
<span><i>No Selection</i></span>
|
<span><i>No Selection</i></span>
|
||||||
{/if}
|
{/if}
|
||||||
|
|
||||||
<input class="form-control" type="text" placeholder="to..." value={inputFieldTo}
|
<input class="form-control" type="text" placeholder={`... ${sliderMax} ...`} value={inputFieldTo}
|
||||||
oninput={(e) => {
|
oninput={(e) => {
|
||||||
inputChanged(e, 'to');
|
inputChanged(e, 'to');
|
||||||
}}
|
}}
|
||||||
|
|||||||
@@ -347,8 +347,8 @@ export function getStatsItems(presetStats = []) {
|
|||||||
field: presetEntry.field,
|
field: presetEntry.field,
|
||||||
text: `${gm.name} (${gm.footprint})`,
|
text: `${gm.name} (${gm.footprint})`,
|
||||||
metric: gm.name,
|
metric: gm.name,
|
||||||
from: presetEntry.from,
|
from: presetEntry?.from || 0,
|
||||||
to: (presetEntry.to == 0) ? mc.peak : presetEntry.to,
|
to: (presetEntry.to == 0) ? null : presetEntry.to,
|
||||||
peak: mc.peak,
|
peak: mc.peak,
|
||||||
enabled: true,
|
enabled: true,
|
||||||
unit: `${gm?.unit?.prefix ? gm.unit.prefix : ''}${gm.unit.base}`
|
unit: `${gm?.unit?.prefix ? gm.unit.prefix : ''}${gm.unit.base}`
|
||||||
@@ -358,8 +358,8 @@ export function getStatsItems(presetStats = []) {
|
|||||||
field: `${gm.name}_${gm.footprint}`,
|
field: `${gm.name}_${gm.footprint}`,
|
||||||
text: `${gm.name} (${gm.footprint})`,
|
text: `${gm.name} (${gm.footprint})`,
|
||||||
metric: gm.name,
|
metric: gm.name,
|
||||||
from: 1,
|
from: 0,
|
||||||
to: mc.peak,
|
to: null,
|
||||||
peak: mc.peak,
|
peak: mc.peak,
|
||||||
enabled: false,
|
enabled: false,
|
||||||
unit: `${gm?.unit?.prefix ? gm.unit.prefix : ''}${gm.unit.base}`
|
unit: `${gm?.unit?.prefix ? gm.unit.prefix : ''}${gm.unit.base}`
|
||||||
|
|||||||
Reference in New Issue
Block a user