6 Commits

Author SHA1 Message Date
09d0ba71d2 Provide idential nodestate functionality in NATS API
Entire-Checkpoint: 3a40b75edd68
2026-03-16 12:13:14 +01:00
df93dbed63 Add busyTimeout config setting
Entire-Checkpoint: 81097a6c52a2
2026-03-16 11:30:21 +01:00
e4f3fa9ba0 Wrap SyncJobs in transaction
Entire-Checkpoint: d4f6c79a8dc1
2026-03-16 11:25:49 +01:00
51517f8031 Reduce insert pressure in db. Increase sqlite timeout value
Entire-Checkpoint: a1e2931d4deb
2026-03-16 11:17:47 +01:00
0aad8f01c8 Upgrade cc-lib
Fixes panic in AddNodeScope

Entire-Checkpoint: afef27e07ec9
2026-03-16 08:55:56 +01:00
973ca87bd1 Extend known issues in ReleaseNotes 2026-03-15 07:02:54 +01:00
17 changed files with 300 additions and 39 deletions

View File

@@ -151,9 +151,11 @@ applied automatically on startup. Version tracking in `version` table.
## Configuration ## Configuration
- **config.json**: Main configuration (clusters, metric repositories, archive settings) - **config.json**: Main configuration (clusters, metric repositories, archive settings)
- `main.apiSubjects`: NATS subject configuration (optional) - `main.api-subjects`: NATS subject configuration (optional)
- `subjectJobEvent`: Subject for job start/stop events (e.g., "cc.job.event") - `subject-job-event`: Subject for job start/stop events (e.g., "cc.job.event")
- `subjectNodeState`: Subject for node state updates (e.g., "cc.node.state") - `subject-node-state`: Subject for node state updates (e.g., "cc.node.state")
- `job-concurrency`: Worker goroutines for job events (default: 8)
- `node-concurrency`: Worker goroutines for node state events (default: 2)
- `nats`: NATS client connection configuration (optional) - `nats`: NATS client connection configuration (optional)
- `address`: NATS server address (e.g., "nats://localhost:4222") - `address`: NATS server address (e.g., "nats://localhost:4222")
- `username`: Authentication username (optional) - `username`: Authentication username (optional)
@@ -241,13 +243,19 @@ The backend supports a NATS-based API as an alternative to the REST API for job
```json ```json
{ {
"main": { "main": {
"apiSubjects": { "api-subjects": {
"subjectJobEvent": "cc.job.event", "subject-job-event": "cc.job.event",
"subjectNodeState": "cc.node.state" "subject-node-state": "cc.node.state",
"job-concurrency": 8,
"node-concurrency": 2
} }
} }
} }
``` ```
- `subject-job-event` (required): NATS subject for job start/stop events
- `subject-node-state` (required): NATS subject for node state updates
- `job-concurrency` (optional, default: 8): Number of concurrent worker goroutines for job events
- `node-concurrency` (optional, default: 2): Number of concurrent worker goroutines for node state events
### Message Format ### Message Format
@@ -292,9 +300,10 @@ job,function=stop_job event="{\"jobId\":123,\"cluster\":\"test\",\"startTime\":1
### Implementation Notes ### Implementation Notes
- NATS API mirrors REST API functionality but uses messaging - NATS API mirrors REST API functionality but uses messaging
- Job start/stop events are processed asynchronously - Job start/stop events are processed asynchronously via configurable worker pools
- Duplicate job detection is handled (same as REST API) - Duplicate job detection is handled (same as REST API)
- All validation rules from REST API apply - All validation rules from REST API apply
- Node state updates include health checks against the metric store (identical to REST handler): nodes are grouped by subcluster, metric configurations are fetched, and `HealthCheck()` is called per subcluster. Nodes default to `MonitoringStateFailed` if no health data is available.
- Messages are logged; no responses are sent back to publishers - Messages are logged; no responses are sent back to publishers
- If NATS client is unavailable, API subscriptions are skipped (logged as warning) - If NATS client is unavailable, API subscriptions are skipped (logged as warning)

View File

@@ -148,7 +148,8 @@ usage can be tuned via the optional `db-config` section in config.json under
"soft-heap-limit-mb": 16384, "soft-heap-limit-mb": 16384,
"max-open-connections": 4, "max-open-connections": 4,
"max-idle-connections": 4, "max-idle-connections": 4,
"max-idle-time-minutes": 10 "max-idle-time-minutes": 10,
"busy-timeout-ms": 60000
} }
} }
} }
@@ -166,6 +167,7 @@ are used.
| `max-open-connections` | 4 | Maximum number of open database connections. | | `max-open-connections` | 4 | Maximum number of open database connections. |
| `max-idle-connections` | 4 | Maximum number of idle database connections kept in the pool. | | `max-idle-connections` | 4 | Maximum number of idle database connections kept in the pool. |
| `max-idle-time-minutes` | 10 | Maximum time in minutes a connection can sit idle before being closed. | | `max-idle-time-minutes` | 10 | Maximum time in minutes a connection can sit idle before being closed. |
| `busy-timeout-ms` | 60000 | SQLite busy timeout in milliseconds. When a write is blocked by another writer, SQLite retries internally with backoff for up to this duration before returning `SQLITE_BUSY`. |
### Sizing Guidelines ### Sizing Guidelines

View File

@@ -11,6 +11,23 @@ recommended to apply the new `optimize-db` flag, which runs the sqlite `ANALYZE`
and `VACUUM` commands. Depending on your database size (more then 40GB) the and `VACUUM` commands. Depending on your database size (more then 40GB) the
`VACUUM` may take up to 2h. `VACUUM` may take up to 2h.
## Known issues
- The new dynamic memory management is not bullet proof yet across restarts.
Buffers that are kept outside the retention period may be lost across a
restart. We will fix that in a subsequent patch release.
- To use the new log viewer (which is only working when starting cc-backend with
systemd) in the admin interface the user under which the cc-backend process is
running has to be allowed to execute the journalctl command.
- The user configuration keys for the ui have changed. Therefore old user
configuration persisted in the database is not used anymore. It is recommended
to configure the metrics shown in the ui-config sestion and remove all records
in the table after the update.
- Currently energy footprint metrics of type energy are ignored for calculating
total energy.
- With energy footprint metrics of type power the unit is ignored and it is
assumed the metric has the unit Watt.
## Changes in 1.5.1 ## Changes in 1.5.1
### Database ### Database
@@ -309,12 +326,3 @@ _The sections below document all features and changes introduced in the 1.5.0 ma
- If using the archive retention feature, configure the `target-format` option - If using the archive retention feature, configure the `target-format` option
to choose between `json` (default) and `parquet` output formats to choose between `json` (default) and `parquet` output formats
- Consider enabling nodestate retention if you track node states over time - Consider enabling nodestate retention if you track node states over time
## Known issues
- The new dynamic memory management is not bullet proof yet across restarts. We
will fix that in a subsequent patch release
- Currently energy footprint metrics of type energy are ignored for calculating
total energy.
- With energy footprint metrics of type power the unit is ignored and it is
assumed the metric has the unit Watt.

View File

@@ -126,6 +126,9 @@ func initDatabase() error {
if dc.ConnectionMaxIdleTimeMins > 0 { if dc.ConnectionMaxIdleTimeMins > 0 {
cfg.ConnectionMaxIdleTime = time.Duration(dc.ConnectionMaxIdleTimeMins) * time.Minute cfg.ConnectionMaxIdleTime = time.Duration(dc.ConnectionMaxIdleTimeMins) * time.Minute
} }
if dc.BusyTimeoutMs > 0 {
cfg.BusyTimeoutMs = dc.BusyTimeoutMs
}
repository.SetConfig(cfg) repository.SetConfig(cfg)
} }
repository.Connect(config.Keys.DB) repository.Connect(config.Keys.DB)

2
go.mod
View File

@@ -9,7 +9,7 @@ tool (
require ( require (
github.com/99designs/gqlgen v0.17.88 github.com/99designs/gqlgen v0.17.88
github.com/ClusterCockpit/cc-lib/v2 v2.8.2 github.com/ClusterCockpit/cc-lib/v2 v2.9.0
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0
github.com/Masterminds/squirrel v1.5.4 github.com/Masterminds/squirrel v1.5.4
github.com/aws/aws-sdk-go-v2 v1.41.3 github.com/aws/aws-sdk-go-v2 v1.41.3

2
go.sum
View File

@@ -6,6 +6,8 @@ github.com/Azure/go-ntlmssp v0.1.0 h1:DjFo6YtWzNqNvQdrwEyr/e4nhU3vRiwenz5QX7sFz+
github.com/Azure/go-ntlmssp v0.1.0/go.mod h1:NYqdhxd/8aAct/s4qSYZEerdPuH1liG2/X9DiVTbhpk= github.com/Azure/go-ntlmssp v0.1.0/go.mod h1:NYqdhxd/8aAct/s4qSYZEerdPuH1liG2/X9DiVTbhpk=
github.com/ClusterCockpit/cc-lib/v2 v2.8.2 h1:rCLZk8wz8yq8xBnBEdVKigvA2ngR8dPmHbEFwxxb3jw= github.com/ClusterCockpit/cc-lib/v2 v2.8.2 h1:rCLZk8wz8yq8xBnBEdVKigvA2ngR8dPmHbEFwxxb3jw=
github.com/ClusterCockpit/cc-lib/v2 v2.8.2/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o= github.com/ClusterCockpit/cc-lib/v2 v2.8.2/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
github.com/ClusterCockpit/cc-lib/v2 v2.9.0 h1:mzUYakcjwb+UP5II4jOvr36rSYct90gXBbtUg+nvm9c=
github.com/ClusterCockpit/cc-lib/v2 v2.9.0/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 h1:hIzxgTBWcmCIHtoDKDkSCsKCOCOwUC34sFsbD2wcW0Q= github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 h1:hIzxgTBWcmCIHtoDKDkSCsKCOCOwUC34sFsbD2wcW0Q=
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0/go.mod h1:y42qUu+YFmu5fdNuUAS4VbbIKxVjxCvbVqFdpdh8ahY= github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0/go.mod h1:y42qUu+YFmu5fdNuUAS4VbbIKxVjxCvbVqFdpdh8ahY=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=

View File

@@ -8,6 +8,7 @@ package api
import ( import (
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"maps"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -15,7 +16,10 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/archiver" "github.com/ClusterCockpit/cc-backend/internal/archiver"
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/importer" "github.com/ClusterCockpit/cc-backend/internal/importer"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
"github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
"github.com/ClusterCockpit/cc-lib/v2/nats" "github.com/ClusterCockpit/cc-lib/v2/nats"
@@ -24,6 +28,12 @@ import (
influx "github.com/ClusterCockpit/cc-line-protocol/v2/lineprotocol" influx "github.com/ClusterCockpit/cc-line-protocol/v2/lineprotocol"
) )
// natsMessage wraps a raw NATS message with its subject for channel-based processing.
type natsMessage struct {
subject string
data []byte
}
// NatsAPI provides NATS subscription-based handlers for Job and Node operations. // NatsAPI provides NATS subscription-based handlers for Job and Node operations.
// It mirrors the functionality of the REST API but uses NATS messaging with // It mirrors the functionality of the REST API but uses NATS messaging with
// InfluxDB line protocol as the message format. // InfluxDB line protocol as the message format.
@@ -64,16 +74,60 @@ type NatsAPI struct {
// RepositoryMutex protects job creation operations from race conditions // RepositoryMutex protects job creation operations from race conditions
// when checking for duplicate jobs during startJob calls. // when checking for duplicate jobs during startJob calls.
RepositoryMutex sync.Mutex RepositoryMutex sync.Mutex
// jobCh receives job event messages for processing by worker goroutines.
jobCh chan natsMessage
// nodeCh receives node state messages for processing by worker goroutines.
nodeCh chan natsMessage
} }
// NewNatsAPI creates a new NatsAPI instance with default dependencies. // NewNatsAPI creates a new NatsAPI instance with channel-based worker pools.
// Concurrency is configured via NATSConfig (defaults: JobConcurrency=8, NodeConcurrency=2).
func NewNatsAPI() *NatsAPI { func NewNatsAPI() *NatsAPI {
return &NatsAPI{ jobConc := 8
nodeConc := 2
if s := config.Keys.APISubjects; s != nil {
if s.JobConcurrency > 0 {
jobConc = s.JobConcurrency
}
if s.NodeConcurrency > 0 {
nodeConc = s.NodeConcurrency
}
}
api := &NatsAPI{
JobRepository: repository.GetJobRepository(), JobRepository: repository.GetJobRepository(),
jobCh: make(chan natsMessage, jobConc),
nodeCh: make(chan natsMessage, nodeConc),
}
// Start worker goroutines
for range jobConc {
go api.jobWorker()
}
for range nodeConc {
go api.nodeWorker()
}
return api
}
// jobWorker processes job event messages from the job channel.
func (api *NatsAPI) jobWorker() {
for msg := range api.jobCh {
api.handleJobEvent(msg.subject, msg.data)
}
}
// nodeWorker processes node state messages from the node channel.
func (api *NatsAPI) nodeWorker() {
for msg := range api.nodeCh {
api.handleNodeState(msg.subject, msg.data)
} }
} }
// StartSubscriptions registers all NATS subscriptions for Job and Node APIs. // StartSubscriptions registers all NATS subscriptions for Job and Node APIs.
// Messages are delivered to buffered channels and processed by worker goroutines.
// Returns an error if the NATS client is not available or subscription fails. // Returns an error if the NATS client is not available or subscription fails.
func (api *NatsAPI) StartSubscriptions() error { func (api *NatsAPI) StartSubscriptions() error {
client := nats.GetClient() client := nats.GetClient()
@@ -83,14 +137,17 @@ func (api *NatsAPI) StartSubscriptions() error {
} }
if config.Keys.APISubjects != nil { if config.Keys.APISubjects != nil {
s := config.Keys.APISubjects s := config.Keys.APISubjects
if err := client.Subscribe(s.SubjectJobEvent, api.handleJobEvent); err != nil { if err := client.Subscribe(s.SubjectJobEvent, func(subject string, data []byte) {
api.jobCh <- natsMessage{subject: subject, data: data}
}); err != nil {
return err return err
} }
if err := client.Subscribe(s.SubjectNodeState, api.handleNodeState); err != nil { if err := client.Subscribe(s.SubjectNodeState, func(subject string, data []byte) {
api.nodeCh <- natsMessage{subject: subject, data: data}
}); err != nil {
return err return err
} }
@@ -345,22 +402,66 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) {
repo := repository.GetNodeRepository() repo := repository.GetNodeRepository()
requestReceived := time.Now().Unix() requestReceived := time.Now().Unix()
// Build nodeList per subcluster for health check
m := make(map[string][]string)
metricNames := make(map[string][]string)
healthResults := make(map[string]metricstore.HealthCheckResult)
for _, node := range req.Nodes {
if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil {
m[sc] = append(m[sc], node.Hostname)
}
}
for sc := range m {
if sc != "" {
metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc)
metricNames[sc] = metricListToNames(metricList)
}
}
// Perform health check against metric store
healthRepo, err := metricdispatch.GetHealthCheckRepo(req.Cluster)
if err != nil {
cclog.Warnf("NATS nodestate: no metric store for cluster %s, skipping health check: %v", req.Cluster, err)
} else {
for sc, nl := range m {
if sc != "" {
if results, err := healthRepo.HealthCheck(req.Cluster, nl, metricNames[sc]); err == nil {
maps.Copy(healthResults, results)
}
}
}
}
updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes))
for _, node := range req.Nodes { for _, node := range req.Nodes {
state := determineState(node.States) state := determineState(node.States)
healthState := schema.MonitoringStateFailed
var healthMetrics string
if result, ok := healthResults[node.Hostname]; ok {
healthState = result.State
healthMetrics = result.HealthMetrics
}
nodeState := schema.NodeStateDB{ nodeState := schema.NodeStateDB{
TimeStamp: requestReceived, TimeStamp: requestReceived,
NodeState: state, NodeState: state,
CpusAllocated: node.CpusAllocated, CpusAllocated: node.CpusAllocated,
MemoryAllocated: node.MemoryAllocated, MemoryAllocated: node.MemoryAllocated,
GpusAllocated: node.GpusAllocated, GpusAllocated: node.GpusAllocated,
HealthState: schema.MonitoringStateFull, HealthState: healthState,
HealthMetrics: healthMetrics,
JobsRunning: node.JobsRunning, JobsRunning: node.JobsRunning,
} }
updates = append(updates, repository.NodeStateUpdate{
Hostname: node.Hostname,
Cluster: req.Cluster,
NodeState: &nodeState,
})
}
if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil { if err := repo.BatchUpdateNodeStates(updates); err != nil {
cclog.Errorf("NATS nodestate: updating node state for %s on %s failed: %v", cclog.Errorf("NATS nodestate: batch update for cluster %s failed: %v", req.Cluster, err)
node.Hostname, req.Cluster, err)
}
} }
cclog.Debugf("NATS nodestate: updated %d node states for cluster %s", len(req.Nodes), req.Cluster) cclog.Debugf("NATS nodestate: updated %d node states for cluster %s", len(req.Nodes), req.Cluster)

View File

@@ -116,6 +116,7 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs)) cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs))
startDB := time.Now() startDB := time.Now()
updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes))
for _, node := range req.Nodes { for _, node := range req.Nodes {
state := determineState(node.States) state := determineState(node.States)
healthState := schema.MonitoringStateFailed healthState := schema.MonitoringStateFailed
@@ -134,11 +135,15 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
HealthMetrics: healthMetrics, HealthMetrics: healthMetrics,
JobsRunning: node.JobsRunning, JobsRunning: node.JobsRunning,
} }
updates = append(updates, repository.NodeStateUpdate{
Hostname: node.Hostname,
Cluster: req.Cluster,
NodeState: &nodeState,
})
}
if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil { if err := repo.BatchUpdateNodeStates(updates); err != nil {
cclog.Errorf("updateNodeStates: updating node state for %s on %s failed: %v", cclog.Errorf("updateNodeStates: batch update for cluster %s failed: %v", req.Cluster, err)
node.Hostname, req.Cluster, err)
}
} }
cclog.Debugf("Timer updateNodeStates, SQLite Inserts: %s", time.Since(startDB)) cclog.Debugf("Timer updateNodeStates, SQLite Inserts: %s", time.Since(startDB))

View File

@@ -210,7 +210,7 @@ func (la *LdapAuthenticator) Sync() error {
} }
cclog.Debugf("sync: add %v (name: %v, roles: [user], ldap: true)", username, name) cclog.Debugf("sync: add %v (name: %v, roles: [user], ldap: true)", username, name)
if err := ur.AddUser(user); err != nil { if err := ur.AddUserIfNotExists(user); err != nil {
cclog.Errorf("User '%s' LDAP: Insert into DB failed", username) cclog.Errorf("User '%s' LDAP: Insert into DB failed", username)
return err return err
} }

View File

@@ -88,6 +88,7 @@ type DbConfig struct {
MaxOpenConnections int `json:"max-open-connections"` MaxOpenConnections int `json:"max-open-connections"`
MaxIdleConnections int `json:"max-idle-connections"` MaxIdleConnections int `json:"max-idle-connections"`
ConnectionMaxIdleTimeMins int `json:"max-idle-time-minutes"` ConnectionMaxIdleTimeMins int `json:"max-idle-time-minutes"`
BusyTimeoutMs int `json:"busy-timeout-ms"`
} }
type NodeStateRetention struct { type NodeStateRetention struct {
@@ -116,6 +117,8 @@ type ResampleConfig struct {
type NATSConfig struct { type NATSConfig struct {
SubjectJobEvent string `json:"subject-job-event"` SubjectJobEvent string `json:"subject-job-event"`
SubjectNodeState string `json:"subject-node-state"` SubjectNodeState string `json:"subject-node-state"`
JobConcurrency int `json:"job-concurrency"`
NodeConcurrency int `json:"node-concurrency"`
} }
type IntRange struct { type IntRange struct {

View File

@@ -121,6 +121,14 @@ var configSchema = `
"subject-node-state": { "subject-node-state": {
"description": "NATS subject for node state updates", "description": "NATS subject for node state updates",
"type": "string" "type": "string"
},
"job-concurrency": {
"description": "Number of concurrent worker goroutines for processing job events (default: 8).",
"type": "integer"
},
"node-concurrency": {
"description": "Number of concurrent worker goroutines for processing node state events (default: 2).",
"type": "integer"
} }
}, },
"required": ["subject-job-event", "subject-node-state"] "required": ["subject-job-event", "subject-node-state"]
@@ -201,6 +209,10 @@ var configSchema = `
"max-idle-time-minutes": { "max-idle-time-minutes": {
"description": "Maximum idle time for a connection in minutes (default: 10).", "description": "Maximum idle time for a connection in minutes (default: 10).",
"type": "integer" "type": "integer"
},
"busy-timeout-ms": {
"description": "SQLite busy timeout in milliseconds. When a write is blocked, SQLite retries with backoff for up to this duration before returning SQLITE_BUSY (default: 60000).",
"type": "integer"
} }
} }
} }

View File

@@ -46,6 +46,12 @@ type RepositoryConfig struct {
// It's a soft limit — queries won't fail, but cache eviction becomes more aggressive. // It's a soft limit — queries won't fail, but cache eviction becomes more aggressive.
// Default: 16384 (16GB) // Default: 16384 (16GB)
DbSoftHeapLimitMB int DbSoftHeapLimitMB int
// BusyTimeoutMs is the SQLite busy_timeout in milliseconds.
// When a write is blocked by another writer, SQLite retries internally
// using a backoff mechanism for up to this duration before returning SQLITE_BUSY.
// Default: 60000 (60 seconds)
BusyTimeoutMs int
} }
// DefaultConfig returns the default repository configuration. // DefaultConfig returns the default repository configuration.
@@ -60,6 +66,7 @@ func DefaultConfig() *RepositoryConfig {
MinRunningJobDuration: 600, // 10 minutes MinRunningJobDuration: 600, // 10 minutes
DbCacheSizeMB: 2048, // 2GB per connection DbCacheSizeMB: 2048, // 2GB per connection
DbSoftHeapLimitMB: 16384, // 16GB process-wide DbSoftHeapLimitMB: 16384, // 16GB process-wide
BusyTimeoutMs: 60000, // 60 seconds
} }
} }

View File

@@ -70,7 +70,7 @@ func Connect(db string) {
connectionURLParams := make(url.Values) connectionURLParams := make(url.Values)
connectionURLParams.Add("_txlock", "immediate") connectionURLParams.Add("_txlock", "immediate")
connectionURLParams.Add("_journal_mode", "WAL") connectionURLParams.Add("_journal_mode", "WAL")
connectionURLParams.Add("_busy_timeout", "5000") connectionURLParams.Add("_busy_timeout", fmt.Sprintf("%d", repoConfig.BusyTimeoutMs))
connectionURLParams.Add("_synchronous", "NORMAL") connectionURLParams.Add("_synchronous", "NORMAL")
cacheSizeKiB := repoConfig.DbCacheSizeMB * 1024 // Convert MB to KiB cacheSizeKiB := repoConfig.DbCacheSizeMB * 1024 // Convert MB to KiB
connectionURLParams.Add("_cache_size", fmt.Sprintf("-%d", cacheSizeKiB)) connectionURLParams.Add("_cache_size", fmt.Sprintf("-%d", cacheSizeKiB))

View File

@@ -92,20 +92,33 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
jobs = append(jobs, job) jobs = append(jobs, job)
} }
// Transfer cached jobs to main table and clear cache in a single transaction.
tx, err := r.DB.Beginx()
if err != nil {
cclog.Errorf("SyncJobs: begin transaction: %v", err)
return nil, err
}
defer tx.Rollback()
// Use INSERT OR IGNORE to skip jobs already transferred by the stop path // Use INSERT OR IGNORE to skip jobs already transferred by the stop path
_, err = r.DB.Exec( _, err = tx.Exec(
"INSERT OR IGNORE INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache") "INSERT OR IGNORE INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache")
if err != nil { if err != nil {
cclog.Errorf("Error while Job sync: %v", err) cclog.Errorf("Error while Job sync: %v", err)
return nil, err return nil, err
} }
_, err = r.DB.Exec("DELETE FROM job_cache") _, err = tx.Exec("DELETE FROM job_cache")
if err != nil { if err != nil {
cclog.Errorf("Error while Job cache clean: %v", err) cclog.Errorf("Error while Job cache clean: %v", err)
return nil, err return nil, err
} }
if err := tx.Commit(); err != nil {
cclog.Errorf("SyncJobs: commit transaction: %v", err)
return nil, err
}
// Resolve correct job.id from the job table. The IDs read from job_cache // Resolve correct job.id from the job table. The IDs read from job_cache
// are from a different auto-increment sequence and must not be used to // are from a different auto-increment sequence and must not be used to
// query the job table. // query the job table.
@@ -128,7 +141,13 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
// TransferCachedJobToMain moves a job from job_cache to the job table. // TransferCachedJobToMain moves a job from job_cache to the job table.
// Caller must hold r.Mutex. Returns the new job table ID. // Caller must hold r.Mutex. Returns the new job table ID.
func (r *JobRepository) TransferCachedJobToMain(cacheID int64) (int64, error) { func (r *JobRepository) TransferCachedJobToMain(cacheID int64) (int64, error) {
res, err := r.DB.Exec( tx, err := r.DB.Beginx()
if err != nil {
return 0, fmt.Errorf("TransferCachedJobToMain: begin transaction: %w", err)
}
defer tx.Rollback()
res, err := tx.Exec(
"INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache WHERE id = ?", "INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache WHERE id = ?",
cacheID) cacheID)
if err != nil { if err != nil {
@@ -140,11 +159,15 @@ func (r *JobRepository) TransferCachedJobToMain(cacheID int64) (int64, error) {
return 0, fmt.Errorf("getting new job ID after transfer failed: %w", err) return 0, fmt.Errorf("getting new job ID after transfer failed: %w", err)
} }
_, err = r.DB.Exec("DELETE FROM job_cache WHERE id = ?", cacheID) _, err = tx.Exec("DELETE FROM job_cache WHERE id = ?", cacheID)
if err != nil { if err != nil {
return 0, fmt.Errorf("deleting cached job %d after transfer failed: %w", cacheID, err) return 0, fmt.Errorf("deleting cached job %d after transfer failed: %w", cacheID, err)
} }
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("TransferCachedJobToMain: commit: %w", err)
}
return newID, nil return newID, nil
} }

View File

@@ -244,6 +244,77 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt
return nil return nil
} }
// NodeStateUpdate holds the data needed to update one node's state in a batch operation.
type NodeStateUpdate struct {
Hostname string
Cluster string
NodeState *schema.NodeStateDB
}
// BatchUpdateNodeStates inserts node state rows for multiple nodes in a single transaction.
// For each node, it looks up (or creates) the node row, then inserts the state row.
// This reduces lock acquisitions from 2*N to 1 for N nodes.
func (r *NodeRepository) BatchUpdateNodeStates(updates []NodeStateUpdate) error {
tx, err := r.DB.Beginx()
if err != nil {
return fmt.Errorf("BatchUpdateNodeStates: begin transaction: %w", err)
}
defer tx.Rollback()
stmtLookup, err := tx.Preparex("SELECT id FROM node WHERE hostname = ? AND cluster = ?")
if err != nil {
return fmt.Errorf("BatchUpdateNodeStates: prepare lookup: %w", err)
}
defer stmtLookup.Close()
stmtInsertNode, err := tx.PrepareNamed(NamedNodeInsert)
if err != nil {
return fmt.Errorf("BatchUpdateNodeStates: prepare node insert: %w", err)
}
defer stmtInsertNode.Close()
stmtInsertState, err := tx.PrepareNamed(NamedNodeStateInsert)
if err != nil {
return fmt.Errorf("BatchUpdateNodeStates: prepare state insert: %w", err)
}
defer stmtInsertState.Close()
for _, u := range updates {
var id int64
if err := stmtLookup.QueryRow(u.Hostname, u.Cluster).Scan(&id); err != nil {
if err == sql.ErrNoRows {
subcluster, scErr := archive.GetSubClusterByNode(u.Cluster, u.Hostname)
if scErr != nil {
cclog.Errorf("BatchUpdateNodeStates: subcluster lookup for '%s' in '%s': %v", u.Hostname, u.Cluster, scErr)
continue
}
node := schema.NodeDB{
Hostname: u.Hostname, Cluster: u.Cluster, SubCluster: subcluster,
}
res, insertErr := stmtInsertNode.Exec(&node)
if insertErr != nil {
cclog.Errorf("BatchUpdateNodeStates: insert node '%s': %v", u.Hostname, insertErr)
continue
}
id, _ = res.LastInsertId()
} else {
cclog.Errorf("BatchUpdateNodeStates: lookup node '%s': %v", u.Hostname, err)
continue
}
}
u.NodeState.NodeID = id
if _, err := stmtInsertState.Exec(u.NodeState); err != nil {
cclog.Errorf("BatchUpdateNodeStates: insert state for '%s': %v", u.Hostname, err)
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("BatchUpdateNodeStates: commit: %w", err)
}
return nil
}
// func (r *NodeRepository) UpdateHealthState(hostname string, healthState *schema.MonitoringState) error { // func (r *NodeRepository) UpdateHealthState(hostname string, healthState *schema.MonitoringState) error {
// if _, err := sq.Update("node").Set("health_state", healthState).Where("node.id = ?", id).RunWith(r.DB).Exec(); err != nil { // if _, err := sq.Update("node").Set("health_state", healthState).Where("node.id = ?", id).RunWith(r.DB).Exec(); err != nil {
// cclog.Errorf("error while updating node '%d'", id) // cclog.Errorf("error while updating node '%d'", id)

View File

@@ -188,6 +188,21 @@ func (r *UserRepository) AddUser(user *schema.User) error {
return nil return nil
} }
// AddUserIfNotExists inserts a user only if the username does not already exist.
// Uses INSERT OR IGNORE to avoid UNIQUE constraint errors when a user is
// concurrently created (e.g., by a login while LDAP sync is running).
// Unlike AddUser, this intentionally skips the deprecated default metrics config insertion.
func (r *UserRepository) AddUserIfNotExists(user *schema.User) error {
rolesJson, _ := json.Marshal(user.Roles)
projectsJson, _ := json.Marshal(user.Projects)
cols := "username, name, roles, projects, ldap"
_, err := r.DB.Exec(
`INSERT OR IGNORE INTO hpc_user (`+cols+`) VALUES (?, ?, ?, ?, ?)`,
user.Username, user.Name, string(rolesJson), string(projectsJson), int(user.AuthSource))
return err
}
func (r *UserRepository) UpdateUser(dbUser *schema.User, user *schema.User) error { func (r *UserRepository) UpdateUser(dbUser *schema.User, user *schema.User) error {
// user contains updated info -> Apply to dbUser // user contains updated info -> Apply to dbUser
// --- Simple Name Update --- // --- Simple Name Update ---

View File

@@ -102,7 +102,7 @@ func (sa *SqliteArchive) Init(rawConfig json.RawMessage) (uint64, error) {
"PRAGMA journal_mode=WAL", "PRAGMA journal_mode=WAL",
"PRAGMA synchronous=NORMAL", "PRAGMA synchronous=NORMAL",
"PRAGMA cache_size=-64000", // 64MB cache "PRAGMA cache_size=-64000", // 64MB cache
"PRAGMA busy_timeout=5000", "PRAGMA busy_timeout=60000",
} }
for _, pragma := range pragmas { for _, pragma := range pragmas {
if _, err := sa.db.Exec(pragma); err != nil { if _, err := sa.db.Exec(pragma); err != nil {