mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-16 12:57:30 +01:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
128c098865 |
23
CLAUDE.md
23
CLAUDE.md
@@ -151,11 +151,9 @@ applied automatically on startup. Version tracking in `version` table.
|
|||||||
## Configuration
|
## Configuration
|
||||||
|
|
||||||
- **config.json**: Main configuration (clusters, metric repositories, archive settings)
|
- **config.json**: Main configuration (clusters, metric repositories, archive settings)
|
||||||
- `main.api-subjects`: NATS subject configuration (optional)
|
- `main.apiSubjects`: NATS subject configuration (optional)
|
||||||
- `subject-job-event`: Subject for job start/stop events (e.g., "cc.job.event")
|
- `subjectJobEvent`: Subject for job start/stop events (e.g., "cc.job.event")
|
||||||
- `subject-node-state`: Subject for node state updates (e.g., "cc.node.state")
|
- `subjectNodeState`: Subject for node state updates (e.g., "cc.node.state")
|
||||||
- `job-concurrency`: Worker goroutines for job events (default: 8)
|
|
||||||
- `node-concurrency`: Worker goroutines for node state events (default: 2)
|
|
||||||
- `nats`: NATS client connection configuration (optional)
|
- `nats`: NATS client connection configuration (optional)
|
||||||
- `address`: NATS server address (e.g., "nats://localhost:4222")
|
- `address`: NATS server address (e.g., "nats://localhost:4222")
|
||||||
- `username`: Authentication username (optional)
|
- `username`: Authentication username (optional)
|
||||||
@@ -243,19 +241,13 @@ The backend supports a NATS-based API as an alternative to the REST API for job
|
|||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"main": {
|
"main": {
|
||||||
"api-subjects": {
|
"apiSubjects": {
|
||||||
"subject-job-event": "cc.job.event",
|
"subjectJobEvent": "cc.job.event",
|
||||||
"subject-node-state": "cc.node.state",
|
"subjectNodeState": "cc.node.state"
|
||||||
"job-concurrency": 8,
|
|
||||||
"node-concurrency": 2
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
- `subject-job-event` (required): NATS subject for job start/stop events
|
|
||||||
- `subject-node-state` (required): NATS subject for node state updates
|
|
||||||
- `job-concurrency` (optional, default: 8): Number of concurrent worker goroutines for job events
|
|
||||||
- `node-concurrency` (optional, default: 2): Number of concurrent worker goroutines for node state events
|
|
||||||
|
|
||||||
### Message Format
|
### Message Format
|
||||||
|
|
||||||
@@ -300,10 +292,9 @@ job,function=stop_job event="{\"jobId\":123,\"cluster\":\"test\",\"startTime\":1
|
|||||||
### Implementation Notes
|
### Implementation Notes
|
||||||
|
|
||||||
- NATS API mirrors REST API functionality but uses messaging
|
- NATS API mirrors REST API functionality but uses messaging
|
||||||
- Job start/stop events are processed asynchronously via configurable worker pools
|
- Job start/stop events are processed asynchronously
|
||||||
- Duplicate job detection is handled (same as REST API)
|
- Duplicate job detection is handled (same as REST API)
|
||||||
- All validation rules from REST API apply
|
- All validation rules from REST API apply
|
||||||
- Node state updates include health checks against the metric store (identical to REST handler): nodes are grouped by subcluster, metric configurations are fetched, and `HealthCheck()` is called per subcluster. Nodes default to `MonitoringStateFailed` if no health data is available.
|
|
||||||
- Messages are logged; no responses are sent back to publishers
|
- Messages are logged; no responses are sent back to publishers
|
||||||
- If NATS client is unavailable, API subscriptions are skipped (logged as warning)
|
- If NATS client is unavailable, API subscriptions are skipped (logged as warning)
|
||||||
|
|
||||||
|
|||||||
@@ -148,8 +148,7 @@ usage can be tuned via the optional `db-config` section in config.json under
|
|||||||
"soft-heap-limit-mb": 16384,
|
"soft-heap-limit-mb": 16384,
|
||||||
"max-open-connections": 4,
|
"max-open-connections": 4,
|
||||||
"max-idle-connections": 4,
|
"max-idle-connections": 4,
|
||||||
"max-idle-time-minutes": 10,
|
"max-idle-time-minutes": 10
|
||||||
"busy-timeout-ms": 60000
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -167,7 +166,6 @@ are used.
|
|||||||
| `max-open-connections` | 4 | Maximum number of open database connections. |
|
| `max-open-connections` | 4 | Maximum number of open database connections. |
|
||||||
| `max-idle-connections` | 4 | Maximum number of idle database connections kept in the pool. |
|
| `max-idle-connections` | 4 | Maximum number of idle database connections kept in the pool. |
|
||||||
| `max-idle-time-minutes` | 10 | Maximum time in minutes a connection can sit idle before being closed. |
|
| `max-idle-time-minutes` | 10 | Maximum time in minutes a connection can sit idle before being closed. |
|
||||||
| `busy-timeout-ms` | 60000 | SQLite busy timeout in milliseconds. When a write is blocked by another writer, SQLite retries internally with backoff for up to this duration before returning `SQLITE_BUSY`. |
|
|
||||||
|
|
||||||
### Sizing Guidelines
|
### Sizing Guidelines
|
||||||
|
|
||||||
|
|||||||
@@ -11,23 +11,6 @@ recommended to apply the new `optimize-db` flag, which runs the sqlite `ANALYZE`
|
|||||||
and `VACUUM` commands. Depending on your database size (more then 40GB) the
|
and `VACUUM` commands. Depending on your database size (more then 40GB) the
|
||||||
`VACUUM` may take up to 2h.
|
`VACUUM` may take up to 2h.
|
||||||
|
|
||||||
## Known issues
|
|
||||||
|
|
||||||
- The new dynamic memory management is not bullet proof yet across restarts.
|
|
||||||
Buffers that are kept outside the retention period may be lost across a
|
|
||||||
restart. We will fix that in a subsequent patch release.
|
|
||||||
- To use the new log viewer (which is only working when starting cc-backend with
|
|
||||||
systemd) in the admin interface the user under which the cc-backend process is
|
|
||||||
running has to be allowed to execute the journalctl command.
|
|
||||||
- The user configuration keys for the ui have changed. Therefore old user
|
|
||||||
configuration persisted in the database is not used anymore. It is recommended
|
|
||||||
to configure the metrics shown in the ui-config sestion and remove all records
|
|
||||||
in the table after the update.
|
|
||||||
- Currently energy footprint metrics of type energy are ignored for calculating
|
|
||||||
total energy.
|
|
||||||
- With energy footprint metrics of type power the unit is ignored and it is
|
|
||||||
assumed the metric has the unit Watt.
|
|
||||||
|
|
||||||
## Changes in 1.5.1
|
## Changes in 1.5.1
|
||||||
|
|
||||||
### Database
|
### Database
|
||||||
@@ -326,3 +309,12 @@ _The sections below document all features and changes introduced in the 1.5.0 ma
|
|||||||
- If using the archive retention feature, configure the `target-format` option
|
- If using the archive retention feature, configure the `target-format` option
|
||||||
to choose between `json` (default) and `parquet` output formats
|
to choose between `json` (default) and `parquet` output formats
|
||||||
- Consider enabling nodestate retention if you track node states over time
|
- Consider enabling nodestate retention if you track node states over time
|
||||||
|
|
||||||
|
## Known issues
|
||||||
|
|
||||||
|
- The new dynamic memory management is not bullet proof yet across restarts. We
|
||||||
|
will fix that in a subsequent patch release
|
||||||
|
- Currently energy footprint metrics of type energy are ignored for calculating
|
||||||
|
total energy.
|
||||||
|
- With energy footprint metrics of type power the unit is ignored and it is
|
||||||
|
assumed the metric has the unit Watt.
|
||||||
|
|||||||
@@ -126,9 +126,6 @@ func initDatabase() error {
|
|||||||
if dc.ConnectionMaxIdleTimeMins > 0 {
|
if dc.ConnectionMaxIdleTimeMins > 0 {
|
||||||
cfg.ConnectionMaxIdleTime = time.Duration(dc.ConnectionMaxIdleTimeMins) * time.Minute
|
cfg.ConnectionMaxIdleTime = time.Duration(dc.ConnectionMaxIdleTimeMins) * time.Minute
|
||||||
}
|
}
|
||||||
if dc.BusyTimeoutMs > 0 {
|
|
||||||
cfg.BusyTimeoutMs = dc.BusyTimeoutMs
|
|
||||||
}
|
|
||||||
repository.SetConfig(cfg)
|
repository.SetConfig(cfg)
|
||||||
}
|
}
|
||||||
repository.Connect(config.Keys.DB)
|
repository.Connect(config.Keys.DB)
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -9,7 +9,7 @@ tool (
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/99designs/gqlgen v0.17.88
|
github.com/99designs/gqlgen v0.17.88
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.9.0
|
github.com/ClusterCockpit/cc-lib/v2 v2.8.2
|
||||||
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0
|
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0
|
||||||
github.com/Masterminds/squirrel v1.5.4
|
github.com/Masterminds/squirrel v1.5.4
|
||||||
github.com/aws/aws-sdk-go-v2 v1.41.3
|
github.com/aws/aws-sdk-go-v2 v1.41.3
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -6,8 +6,6 @@ github.com/Azure/go-ntlmssp v0.1.0 h1:DjFo6YtWzNqNvQdrwEyr/e4nhU3vRiwenz5QX7sFz+
|
|||||||
github.com/Azure/go-ntlmssp v0.1.0/go.mod h1:NYqdhxd/8aAct/s4qSYZEerdPuH1liG2/X9DiVTbhpk=
|
github.com/Azure/go-ntlmssp v0.1.0/go.mod h1:NYqdhxd/8aAct/s4qSYZEerdPuH1liG2/X9DiVTbhpk=
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.8.2 h1:rCLZk8wz8yq8xBnBEdVKigvA2ngR8dPmHbEFwxxb3jw=
|
github.com/ClusterCockpit/cc-lib/v2 v2.8.2 h1:rCLZk8wz8yq8xBnBEdVKigvA2ngR8dPmHbEFwxxb3jw=
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.8.2/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
|
github.com/ClusterCockpit/cc-lib/v2 v2.8.2/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.9.0 h1:mzUYakcjwb+UP5II4jOvr36rSYct90gXBbtUg+nvm9c=
|
|
||||||
github.com/ClusterCockpit/cc-lib/v2 v2.9.0/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
|
|
||||||
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 h1:hIzxgTBWcmCIHtoDKDkSCsKCOCOwUC34sFsbD2wcW0Q=
|
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 h1:hIzxgTBWcmCIHtoDKDkSCsKCOCOwUC34sFsbD2wcW0Q=
|
||||||
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0/go.mod h1:y42qUu+YFmu5fdNuUAS4VbbIKxVjxCvbVqFdpdh8ahY=
|
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0/go.mod h1:y42qUu+YFmu5fdNuUAS4VbbIKxVjxCvbVqFdpdh8ahY=
|
||||||
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
|
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ package api
|
|||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"maps"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -16,10 +15,7 @@ import (
|
|||||||
"github.com/ClusterCockpit/cc-backend/internal/archiver"
|
"github.com/ClusterCockpit/cc-backend/internal/archiver"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/importer"
|
"github.com/ClusterCockpit/cc-backend/internal/importer"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
|
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
|
|
||||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||||
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
|
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
|
||||||
"github.com/ClusterCockpit/cc-lib/v2/nats"
|
"github.com/ClusterCockpit/cc-lib/v2/nats"
|
||||||
@@ -28,12 +24,6 @@ import (
|
|||||||
influx "github.com/ClusterCockpit/cc-line-protocol/v2/lineprotocol"
|
influx "github.com/ClusterCockpit/cc-line-protocol/v2/lineprotocol"
|
||||||
)
|
)
|
||||||
|
|
||||||
// natsMessage wraps a raw NATS message with its subject for channel-based processing.
|
|
||||||
type natsMessage struct {
|
|
||||||
subject string
|
|
||||||
data []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
// NatsAPI provides NATS subscription-based handlers for Job and Node operations.
|
// NatsAPI provides NATS subscription-based handlers for Job and Node operations.
|
||||||
// It mirrors the functionality of the REST API but uses NATS messaging with
|
// It mirrors the functionality of the REST API but uses NATS messaging with
|
||||||
// InfluxDB line protocol as the message format.
|
// InfluxDB line protocol as the message format.
|
||||||
@@ -74,60 +64,16 @@ type NatsAPI struct {
|
|||||||
// RepositoryMutex protects job creation operations from race conditions
|
// RepositoryMutex protects job creation operations from race conditions
|
||||||
// when checking for duplicate jobs during startJob calls.
|
// when checking for duplicate jobs during startJob calls.
|
||||||
RepositoryMutex sync.Mutex
|
RepositoryMutex sync.Mutex
|
||||||
// jobCh receives job event messages for processing by worker goroutines.
|
|
||||||
jobCh chan natsMessage
|
|
||||||
// nodeCh receives node state messages for processing by worker goroutines.
|
|
||||||
nodeCh chan natsMessage
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNatsAPI creates a new NatsAPI instance with channel-based worker pools.
|
// NewNatsAPI creates a new NatsAPI instance with default dependencies.
|
||||||
// Concurrency is configured via NATSConfig (defaults: JobConcurrency=8, NodeConcurrency=2).
|
|
||||||
func NewNatsAPI() *NatsAPI {
|
func NewNatsAPI() *NatsAPI {
|
||||||
jobConc := 8
|
return &NatsAPI{
|
||||||
nodeConc := 2
|
|
||||||
|
|
||||||
if s := config.Keys.APISubjects; s != nil {
|
|
||||||
if s.JobConcurrency > 0 {
|
|
||||||
jobConc = s.JobConcurrency
|
|
||||||
}
|
|
||||||
if s.NodeConcurrency > 0 {
|
|
||||||
nodeConc = s.NodeConcurrency
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
api := &NatsAPI{
|
|
||||||
JobRepository: repository.GetJobRepository(),
|
JobRepository: repository.GetJobRepository(),
|
||||||
jobCh: make(chan natsMessage, jobConc),
|
|
||||||
nodeCh: make(chan natsMessage, nodeConc),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start worker goroutines
|
|
||||||
for range jobConc {
|
|
||||||
go api.jobWorker()
|
|
||||||
}
|
|
||||||
for range nodeConc {
|
|
||||||
go api.nodeWorker()
|
|
||||||
}
|
|
||||||
|
|
||||||
return api
|
|
||||||
}
|
|
||||||
|
|
||||||
// jobWorker processes job event messages from the job channel.
|
|
||||||
func (api *NatsAPI) jobWorker() {
|
|
||||||
for msg := range api.jobCh {
|
|
||||||
api.handleJobEvent(msg.subject, msg.data)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// nodeWorker processes node state messages from the node channel.
|
|
||||||
func (api *NatsAPI) nodeWorker() {
|
|
||||||
for msg := range api.nodeCh {
|
|
||||||
api.handleNodeState(msg.subject, msg.data)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartSubscriptions registers all NATS subscriptions for Job and Node APIs.
|
// StartSubscriptions registers all NATS subscriptions for Job and Node APIs.
|
||||||
// Messages are delivered to buffered channels and processed by worker goroutines.
|
|
||||||
// Returns an error if the NATS client is not available or subscription fails.
|
// Returns an error if the NATS client is not available or subscription fails.
|
||||||
func (api *NatsAPI) StartSubscriptions() error {
|
func (api *NatsAPI) StartSubscriptions() error {
|
||||||
client := nats.GetClient()
|
client := nats.GetClient()
|
||||||
@@ -137,17 +83,14 @@ func (api *NatsAPI) StartSubscriptions() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if config.Keys.APISubjects != nil {
|
if config.Keys.APISubjects != nil {
|
||||||
|
|
||||||
s := config.Keys.APISubjects
|
s := config.Keys.APISubjects
|
||||||
|
|
||||||
if err := client.Subscribe(s.SubjectJobEvent, func(subject string, data []byte) {
|
if err := client.Subscribe(s.SubjectJobEvent, api.handleJobEvent); err != nil {
|
||||||
api.jobCh <- natsMessage{subject: subject, data: data}
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := client.Subscribe(s.SubjectNodeState, func(subject string, data []byte) {
|
if err := client.Subscribe(s.SubjectNodeState, api.handleNodeState); err != nil {
|
||||||
api.nodeCh <- natsMessage{subject: subject, data: data}
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -402,66 +345,22 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) {
|
|||||||
repo := repository.GetNodeRepository()
|
repo := repository.GetNodeRepository()
|
||||||
requestReceived := time.Now().Unix()
|
requestReceived := time.Now().Unix()
|
||||||
|
|
||||||
// Build nodeList per subcluster for health check
|
|
||||||
m := make(map[string][]string)
|
|
||||||
metricNames := make(map[string][]string)
|
|
||||||
healthResults := make(map[string]metricstore.HealthCheckResult)
|
|
||||||
|
|
||||||
for _, node := range req.Nodes {
|
|
||||||
if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil {
|
|
||||||
m[sc] = append(m[sc], node.Hostname)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for sc := range m {
|
|
||||||
if sc != "" {
|
|
||||||
metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc)
|
|
||||||
metricNames[sc] = metricListToNames(metricList)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Perform health check against metric store
|
|
||||||
healthRepo, err := metricdispatch.GetHealthCheckRepo(req.Cluster)
|
|
||||||
if err != nil {
|
|
||||||
cclog.Warnf("NATS nodestate: no metric store for cluster %s, skipping health check: %v", req.Cluster, err)
|
|
||||||
} else {
|
|
||||||
for sc, nl := range m {
|
|
||||||
if sc != "" {
|
|
||||||
if results, err := healthRepo.HealthCheck(req.Cluster, nl, metricNames[sc]); err == nil {
|
|
||||||
maps.Copy(healthResults, results)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes))
|
|
||||||
for _, node := range req.Nodes {
|
for _, node := range req.Nodes {
|
||||||
state := determineState(node.States)
|
state := determineState(node.States)
|
||||||
healthState := schema.MonitoringStateFailed
|
|
||||||
var healthMetrics string
|
|
||||||
if result, ok := healthResults[node.Hostname]; ok {
|
|
||||||
healthState = result.State
|
|
||||||
healthMetrics = result.HealthMetrics
|
|
||||||
}
|
|
||||||
nodeState := schema.NodeStateDB{
|
nodeState := schema.NodeStateDB{
|
||||||
TimeStamp: requestReceived,
|
TimeStamp: requestReceived,
|
||||||
NodeState: state,
|
NodeState: state,
|
||||||
CpusAllocated: node.CpusAllocated,
|
CpusAllocated: node.CpusAllocated,
|
||||||
MemoryAllocated: node.MemoryAllocated,
|
MemoryAllocated: node.MemoryAllocated,
|
||||||
GpusAllocated: node.GpusAllocated,
|
GpusAllocated: node.GpusAllocated,
|
||||||
HealthState: healthState,
|
HealthState: schema.MonitoringStateFull,
|
||||||
HealthMetrics: healthMetrics,
|
|
||||||
JobsRunning: node.JobsRunning,
|
JobsRunning: node.JobsRunning,
|
||||||
}
|
}
|
||||||
updates = append(updates, repository.NodeStateUpdate{
|
|
||||||
Hostname: node.Hostname,
|
|
||||||
Cluster: req.Cluster,
|
|
||||||
NodeState: &nodeState,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := repo.BatchUpdateNodeStates(updates); err != nil {
|
if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil {
|
||||||
cclog.Errorf("NATS nodestate: batch update for cluster %s failed: %v", req.Cluster, err)
|
cclog.Errorf("NATS nodestate: updating node state for %s on %s failed: %v",
|
||||||
|
node.Hostname, req.Cluster, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cclog.Debugf("NATS nodestate: updated %d node states for cluster %s", len(req.Nodes), req.Cluster)
|
cclog.Debugf("NATS nodestate: updated %d node states for cluster %s", len(req.Nodes), req.Cluster)
|
||||||
|
|||||||
@@ -116,7 +116,6 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
|
|||||||
cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs))
|
cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs))
|
||||||
startDB := time.Now()
|
startDB := time.Now()
|
||||||
|
|
||||||
updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes))
|
|
||||||
for _, node := range req.Nodes {
|
for _, node := range req.Nodes {
|
||||||
state := determineState(node.States)
|
state := determineState(node.States)
|
||||||
healthState := schema.MonitoringStateFailed
|
healthState := schema.MonitoringStateFailed
|
||||||
@@ -135,15 +134,11 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
|
|||||||
HealthMetrics: healthMetrics,
|
HealthMetrics: healthMetrics,
|
||||||
JobsRunning: node.JobsRunning,
|
JobsRunning: node.JobsRunning,
|
||||||
}
|
}
|
||||||
updates = append(updates, repository.NodeStateUpdate{
|
|
||||||
Hostname: node.Hostname,
|
|
||||||
Cluster: req.Cluster,
|
|
||||||
NodeState: &nodeState,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := repo.BatchUpdateNodeStates(updates); err != nil {
|
if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil {
|
||||||
cclog.Errorf("updateNodeStates: batch update for cluster %s failed: %v", req.Cluster, err)
|
cclog.Errorf("updateNodeStates: updating node state for %s on %s failed: %v",
|
||||||
|
node.Hostname, req.Cluster, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cclog.Debugf("Timer updateNodeStates, SQLite Inserts: %s", time.Since(startDB))
|
cclog.Debugf("Timer updateNodeStates, SQLite Inserts: %s", time.Since(startDB))
|
||||||
|
|||||||
@@ -210,7 +210,7 @@ func (la *LdapAuthenticator) Sync() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
cclog.Debugf("sync: add %v (name: %v, roles: [user], ldap: true)", username, name)
|
cclog.Debugf("sync: add %v (name: %v, roles: [user], ldap: true)", username, name)
|
||||||
if err := ur.AddUserIfNotExists(user); err != nil {
|
if err := ur.AddUser(user); err != nil {
|
||||||
cclog.Errorf("User '%s' LDAP: Insert into DB failed", username)
|
cclog.Errorf("User '%s' LDAP: Insert into DB failed", username)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -88,7 +88,6 @@ type DbConfig struct {
|
|||||||
MaxOpenConnections int `json:"max-open-connections"`
|
MaxOpenConnections int `json:"max-open-connections"`
|
||||||
MaxIdleConnections int `json:"max-idle-connections"`
|
MaxIdleConnections int `json:"max-idle-connections"`
|
||||||
ConnectionMaxIdleTimeMins int `json:"max-idle-time-minutes"`
|
ConnectionMaxIdleTimeMins int `json:"max-idle-time-minutes"`
|
||||||
BusyTimeoutMs int `json:"busy-timeout-ms"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type NodeStateRetention struct {
|
type NodeStateRetention struct {
|
||||||
@@ -117,8 +116,6 @@ type ResampleConfig struct {
|
|||||||
type NATSConfig struct {
|
type NATSConfig struct {
|
||||||
SubjectJobEvent string `json:"subject-job-event"`
|
SubjectJobEvent string `json:"subject-job-event"`
|
||||||
SubjectNodeState string `json:"subject-node-state"`
|
SubjectNodeState string `json:"subject-node-state"`
|
||||||
JobConcurrency int `json:"job-concurrency"`
|
|
||||||
NodeConcurrency int `json:"node-concurrency"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type IntRange struct {
|
type IntRange struct {
|
||||||
|
|||||||
@@ -121,14 +121,6 @@ var configSchema = `
|
|||||||
"subject-node-state": {
|
"subject-node-state": {
|
||||||
"description": "NATS subject for node state updates",
|
"description": "NATS subject for node state updates",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
},
|
|
||||||
"job-concurrency": {
|
|
||||||
"description": "Number of concurrent worker goroutines for processing job events (default: 8).",
|
|
||||||
"type": "integer"
|
|
||||||
},
|
|
||||||
"node-concurrency": {
|
|
||||||
"description": "Number of concurrent worker goroutines for processing node state events (default: 2).",
|
|
||||||
"type": "integer"
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"required": ["subject-job-event", "subject-node-state"]
|
"required": ["subject-job-event", "subject-node-state"]
|
||||||
@@ -209,10 +201,6 @@ var configSchema = `
|
|||||||
"max-idle-time-minutes": {
|
"max-idle-time-minutes": {
|
||||||
"description": "Maximum idle time for a connection in minutes (default: 10).",
|
"description": "Maximum idle time for a connection in minutes (default: 10).",
|
||||||
"type": "integer"
|
"type": "integer"
|
||||||
},
|
|
||||||
"busy-timeout-ms": {
|
|
||||||
"description": "SQLite busy timeout in milliseconds. When a write is blocked, SQLite retries with backoff for up to this duration before returning SQLITE_BUSY (default: 60000).",
|
|
||||||
"type": "integer"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,12 +46,6 @@ type RepositoryConfig struct {
|
|||||||
// It's a soft limit — queries won't fail, but cache eviction becomes more aggressive.
|
// It's a soft limit — queries won't fail, but cache eviction becomes more aggressive.
|
||||||
// Default: 16384 (16GB)
|
// Default: 16384 (16GB)
|
||||||
DbSoftHeapLimitMB int
|
DbSoftHeapLimitMB int
|
||||||
|
|
||||||
// BusyTimeoutMs is the SQLite busy_timeout in milliseconds.
|
|
||||||
// When a write is blocked by another writer, SQLite retries internally
|
|
||||||
// using a backoff mechanism for up to this duration before returning SQLITE_BUSY.
|
|
||||||
// Default: 60000 (60 seconds)
|
|
||||||
BusyTimeoutMs int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultConfig returns the default repository configuration.
|
// DefaultConfig returns the default repository configuration.
|
||||||
@@ -66,7 +60,6 @@ func DefaultConfig() *RepositoryConfig {
|
|||||||
MinRunningJobDuration: 600, // 10 minutes
|
MinRunningJobDuration: 600, // 10 minutes
|
||||||
DbCacheSizeMB: 2048, // 2GB per connection
|
DbCacheSizeMB: 2048, // 2GB per connection
|
||||||
DbSoftHeapLimitMB: 16384, // 16GB process-wide
|
DbSoftHeapLimitMB: 16384, // 16GB process-wide
|
||||||
BusyTimeoutMs: 60000, // 60 seconds
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ func Connect(db string) {
|
|||||||
connectionURLParams := make(url.Values)
|
connectionURLParams := make(url.Values)
|
||||||
connectionURLParams.Add("_txlock", "immediate")
|
connectionURLParams.Add("_txlock", "immediate")
|
||||||
connectionURLParams.Add("_journal_mode", "WAL")
|
connectionURLParams.Add("_journal_mode", "WAL")
|
||||||
connectionURLParams.Add("_busy_timeout", fmt.Sprintf("%d", repoConfig.BusyTimeoutMs))
|
connectionURLParams.Add("_busy_timeout", "5000")
|
||||||
connectionURLParams.Add("_synchronous", "NORMAL")
|
connectionURLParams.Add("_synchronous", "NORMAL")
|
||||||
cacheSizeKiB := repoConfig.DbCacheSizeMB * 1024 // Convert MB to KiB
|
cacheSizeKiB := repoConfig.DbCacheSizeMB * 1024 // Convert MB to KiB
|
||||||
connectionURLParams.Add("_cache_size", fmt.Sprintf("-%d", cacheSizeKiB))
|
connectionURLParams.Add("_cache_size", fmt.Sprintf("-%d", cacheSizeKiB))
|
||||||
|
|||||||
@@ -92,33 +92,20 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
|
|||||||
jobs = append(jobs, job)
|
jobs = append(jobs, job)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Transfer cached jobs to main table and clear cache in a single transaction.
|
|
||||||
tx, err := r.DB.Beginx()
|
|
||||||
if err != nil {
|
|
||||||
cclog.Errorf("SyncJobs: begin transaction: %v", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer tx.Rollback()
|
|
||||||
|
|
||||||
// Use INSERT OR IGNORE to skip jobs already transferred by the stop path
|
// Use INSERT OR IGNORE to skip jobs already transferred by the stop path
|
||||||
_, err = tx.Exec(
|
_, err = r.DB.Exec(
|
||||||
"INSERT OR IGNORE INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache")
|
"INSERT OR IGNORE INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("Error while Job sync: %v", err)
|
cclog.Errorf("Error while Job sync: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec("DELETE FROM job_cache")
|
_, err = r.DB.Exec("DELETE FROM job_cache")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("Error while Job cache clean: %v", err)
|
cclog.Errorf("Error while Job cache clean: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tx.Commit(); err != nil {
|
|
||||||
cclog.Errorf("SyncJobs: commit transaction: %v", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Resolve correct job.id from the job table. The IDs read from job_cache
|
// Resolve correct job.id from the job table. The IDs read from job_cache
|
||||||
// are from a different auto-increment sequence and must not be used to
|
// are from a different auto-increment sequence and must not be used to
|
||||||
// query the job table.
|
// query the job table.
|
||||||
@@ -141,13 +128,7 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
|
|||||||
// TransferCachedJobToMain moves a job from job_cache to the job table.
|
// TransferCachedJobToMain moves a job from job_cache to the job table.
|
||||||
// Caller must hold r.Mutex. Returns the new job table ID.
|
// Caller must hold r.Mutex. Returns the new job table ID.
|
||||||
func (r *JobRepository) TransferCachedJobToMain(cacheID int64) (int64, error) {
|
func (r *JobRepository) TransferCachedJobToMain(cacheID int64) (int64, error) {
|
||||||
tx, err := r.DB.Beginx()
|
res, err := r.DB.Exec(
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("TransferCachedJobToMain: begin transaction: %w", err)
|
|
||||||
}
|
|
||||||
defer tx.Rollback()
|
|
||||||
|
|
||||||
res, err := tx.Exec(
|
|
||||||
"INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache WHERE id = ?",
|
"INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache WHERE id = ?",
|
||||||
cacheID)
|
cacheID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -159,15 +140,11 @@ func (r *JobRepository) TransferCachedJobToMain(cacheID int64) (int64, error) {
|
|||||||
return 0, fmt.Errorf("getting new job ID after transfer failed: %w", err)
|
return 0, fmt.Errorf("getting new job ID after transfer failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec("DELETE FROM job_cache WHERE id = ?", cacheID)
|
_, err = r.DB.Exec("DELETE FROM job_cache WHERE id = ?", cacheID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("deleting cached job %d after transfer failed: %w", cacheID, err)
|
return 0, fmt.Errorf("deleting cached job %d after transfer failed: %w", cacheID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tx.Commit(); err != nil {
|
|
||||||
return 0, fmt.Errorf("TransferCachedJobToMain: commit: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return newID, nil
|
return newID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -244,77 +244,6 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeStateUpdate holds the data needed to update one node's state in a batch operation.
|
|
||||||
type NodeStateUpdate struct {
|
|
||||||
Hostname string
|
|
||||||
Cluster string
|
|
||||||
NodeState *schema.NodeStateDB
|
|
||||||
}
|
|
||||||
|
|
||||||
// BatchUpdateNodeStates inserts node state rows for multiple nodes in a single transaction.
|
|
||||||
// For each node, it looks up (or creates) the node row, then inserts the state row.
|
|
||||||
// This reduces lock acquisitions from 2*N to 1 for N nodes.
|
|
||||||
func (r *NodeRepository) BatchUpdateNodeStates(updates []NodeStateUpdate) error {
|
|
||||||
tx, err := r.DB.Beginx()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("BatchUpdateNodeStates: begin transaction: %w", err)
|
|
||||||
}
|
|
||||||
defer tx.Rollback()
|
|
||||||
|
|
||||||
stmtLookup, err := tx.Preparex("SELECT id FROM node WHERE hostname = ? AND cluster = ?")
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("BatchUpdateNodeStates: prepare lookup: %w", err)
|
|
||||||
}
|
|
||||||
defer stmtLookup.Close()
|
|
||||||
|
|
||||||
stmtInsertNode, err := tx.PrepareNamed(NamedNodeInsert)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("BatchUpdateNodeStates: prepare node insert: %w", err)
|
|
||||||
}
|
|
||||||
defer stmtInsertNode.Close()
|
|
||||||
|
|
||||||
stmtInsertState, err := tx.PrepareNamed(NamedNodeStateInsert)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("BatchUpdateNodeStates: prepare state insert: %w", err)
|
|
||||||
}
|
|
||||||
defer stmtInsertState.Close()
|
|
||||||
|
|
||||||
for _, u := range updates {
|
|
||||||
var id int64
|
|
||||||
if err := stmtLookup.QueryRow(u.Hostname, u.Cluster).Scan(&id); err != nil {
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
subcluster, scErr := archive.GetSubClusterByNode(u.Cluster, u.Hostname)
|
|
||||||
if scErr != nil {
|
|
||||||
cclog.Errorf("BatchUpdateNodeStates: subcluster lookup for '%s' in '%s': %v", u.Hostname, u.Cluster, scErr)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
node := schema.NodeDB{
|
|
||||||
Hostname: u.Hostname, Cluster: u.Cluster, SubCluster: subcluster,
|
|
||||||
}
|
|
||||||
res, insertErr := stmtInsertNode.Exec(&node)
|
|
||||||
if insertErr != nil {
|
|
||||||
cclog.Errorf("BatchUpdateNodeStates: insert node '%s': %v", u.Hostname, insertErr)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
id, _ = res.LastInsertId()
|
|
||||||
} else {
|
|
||||||
cclog.Errorf("BatchUpdateNodeStates: lookup node '%s': %v", u.Hostname, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
u.NodeState.NodeID = id
|
|
||||||
if _, err := stmtInsertState.Exec(u.NodeState); err != nil {
|
|
||||||
cclog.Errorf("BatchUpdateNodeStates: insert state for '%s': %v", u.Hostname, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := tx.Commit(); err != nil {
|
|
||||||
return fmt.Errorf("BatchUpdateNodeStates: commit: %w", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// func (r *NodeRepository) UpdateHealthState(hostname string, healthState *schema.MonitoringState) error {
|
// func (r *NodeRepository) UpdateHealthState(hostname string, healthState *schema.MonitoringState) error {
|
||||||
// if _, err := sq.Update("node").Set("health_state", healthState).Where("node.id = ?", id).RunWith(r.DB).Exec(); err != nil {
|
// if _, err := sq.Update("node").Set("health_state", healthState).Where("node.id = ?", id).RunWith(r.DB).Exec(); err != nil {
|
||||||
// cclog.Errorf("error while updating node '%d'", id)
|
// cclog.Errorf("error while updating node '%d'", id)
|
||||||
|
|||||||
@@ -188,21 +188,6 @@ func (r *UserRepository) AddUser(user *schema.User) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddUserIfNotExists inserts a user only if the username does not already exist.
|
|
||||||
// Uses INSERT OR IGNORE to avoid UNIQUE constraint errors when a user is
|
|
||||||
// concurrently created (e.g., by a login while LDAP sync is running).
|
|
||||||
// Unlike AddUser, this intentionally skips the deprecated default metrics config insertion.
|
|
||||||
func (r *UserRepository) AddUserIfNotExists(user *schema.User) error {
|
|
||||||
rolesJson, _ := json.Marshal(user.Roles)
|
|
||||||
projectsJson, _ := json.Marshal(user.Projects)
|
|
||||||
|
|
||||||
cols := "username, name, roles, projects, ldap"
|
|
||||||
_, err := r.DB.Exec(
|
|
||||||
`INSERT OR IGNORE INTO hpc_user (`+cols+`) VALUES (?, ?, ?, ?, ?)`,
|
|
||||||
user.Username, user.Name, string(rolesJson), string(projectsJson), int(user.AuthSource))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *UserRepository) UpdateUser(dbUser *schema.User, user *schema.User) error {
|
func (r *UserRepository) UpdateUser(dbUser *schema.User, user *schema.User) error {
|
||||||
// user contains updated info -> Apply to dbUser
|
// user contains updated info -> Apply to dbUser
|
||||||
// --- Simple Name Update ---
|
// --- Simple Name Update ---
|
||||||
|
|||||||
@@ -102,7 +102,7 @@ func (sa *SqliteArchive) Init(rawConfig json.RawMessage) (uint64, error) {
|
|||||||
"PRAGMA journal_mode=WAL",
|
"PRAGMA journal_mode=WAL",
|
||||||
"PRAGMA synchronous=NORMAL",
|
"PRAGMA synchronous=NORMAL",
|
||||||
"PRAGMA cache_size=-64000", // 64MB cache
|
"PRAGMA cache_size=-64000", // 64MB cache
|
||||||
"PRAGMA busy_timeout=60000",
|
"PRAGMA busy_timeout=5000",
|
||||||
}
|
}
|
||||||
for _, pragma := range pragmas {
|
for _, pragma := range pragmas {
|
||||||
if _, err := sa.db.Exec(pragma); err != nil {
|
if _, err := sa.db.Exec(pragma); err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user