diff --git a/CLAUDE.md b/CLAUDE.md index a8d56571..ef030cd5 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -151,9 +151,11 @@ applied automatically on startup. Version tracking in `version` table. ## Configuration - **config.json**: Main configuration (clusters, metric repositories, archive settings) - - `main.apiSubjects`: NATS subject configuration (optional) - - `subjectJobEvent`: Subject for job start/stop events (e.g., "cc.job.event") - - `subjectNodeState`: Subject for node state updates (e.g., "cc.node.state") + - `main.api-subjects`: NATS subject configuration (optional) + - `subject-job-event`: Subject for job start/stop events (e.g., "cc.job.event") + - `subject-node-state`: Subject for node state updates (e.g., "cc.node.state") + - `job-concurrency`: Worker goroutines for job events (default: 8) + - `node-concurrency`: Worker goroutines for node state events (default: 2) - `nats`: NATS client connection configuration (optional) - `address`: NATS server address (e.g., "nats://localhost:4222") - `username`: Authentication username (optional) @@ -241,13 +243,19 @@ The backend supports a NATS-based API as an alternative to the REST API for job ```json { "main": { - "apiSubjects": { - "subjectJobEvent": "cc.job.event", - "subjectNodeState": "cc.node.state" + "api-subjects": { + "subject-job-event": "cc.job.event", + "subject-node-state": "cc.node.state", + "job-concurrency": 8, + "node-concurrency": 2 } } } ``` + - `subject-job-event` (required): NATS subject for job start/stop events + - `subject-node-state` (required): NATS subject for node state updates + - `job-concurrency` (optional, default: 8): Number of concurrent worker goroutines for job events + - `node-concurrency` (optional, default: 2): Number of concurrent worker goroutines for node state events ### Message Format @@ -292,9 +300,10 @@ job,function=stop_job event="{\"jobId\":123,\"cluster\":\"test\",\"startTime\":1 ### Implementation Notes - NATS API mirrors REST API functionality but uses messaging -- Job start/stop events are processed asynchronously +- Job start/stop events are processed asynchronously via configurable worker pools - Duplicate job detection is handled (same as REST API) - All validation rules from REST API apply +- Node state updates include health checks against the metric store (identical to REST handler): nodes are grouped by subcluster, metric configurations are fetched, and `HealthCheck()` is called per subcluster. Nodes default to `MonitoringStateFailed` if no health data is available. - Messages are logged; no responses are sent back to publishers - If NATS client is unavailable, API subscriptions are skipped (logged as warning) diff --git a/README.md b/README.md index bc56db03..030c9bd3 100644 --- a/README.md +++ b/README.md @@ -148,7 +148,8 @@ usage can be tuned via the optional `db-config` section in config.json under "soft-heap-limit-mb": 16384, "max-open-connections": 4, "max-idle-connections": 4, - "max-idle-time-minutes": 10 + "max-idle-time-minutes": 10, + "busy-timeout-ms": 60000 } } } @@ -166,6 +167,7 @@ are used. | `max-open-connections` | 4 | Maximum number of open database connections. | | `max-idle-connections` | 4 | Maximum number of idle database connections kept in the pool. | | `max-idle-time-minutes` | 10 | Maximum time in minutes a connection can sit idle before being closed. | +| `busy-timeout-ms` | 60000 | SQLite busy timeout in milliseconds. When a write is blocked by another writer, SQLite retries internally with backoff for up to this duration before returning `SQLITE_BUSY`. | ### Sizing Guidelines diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 9e1ea8aa..12faeae5 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -126,6 +126,9 @@ func initDatabase() error { if dc.ConnectionMaxIdleTimeMins > 0 { cfg.ConnectionMaxIdleTime = time.Duration(dc.ConnectionMaxIdleTimeMins) * time.Minute } + if dc.BusyTimeoutMs > 0 { + cfg.BusyTimeoutMs = dc.BusyTimeoutMs + } repository.SetConfig(cfg) } repository.Connect(config.Keys.DB) diff --git a/go.mod b/go.mod index 6bddb253..802bfb19 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ tool ( require ( github.com/99designs/gqlgen v0.17.88 - github.com/ClusterCockpit/cc-lib/v2 v2.8.2 + github.com/ClusterCockpit/cc-lib/v2 v2.9.0 github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 github.com/Masterminds/squirrel v1.5.4 github.com/aws/aws-sdk-go-v2 v1.41.3 diff --git a/go.sum b/go.sum index db306541..ec440467 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/Azure/go-ntlmssp v0.1.0 h1:DjFo6YtWzNqNvQdrwEyr/e4nhU3vRiwenz5QX7sFz+ github.com/Azure/go-ntlmssp v0.1.0/go.mod h1:NYqdhxd/8aAct/s4qSYZEerdPuH1liG2/X9DiVTbhpk= github.com/ClusterCockpit/cc-lib/v2 v2.8.2 h1:rCLZk8wz8yq8xBnBEdVKigvA2ngR8dPmHbEFwxxb3jw= github.com/ClusterCockpit/cc-lib/v2 v2.8.2/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o= +github.com/ClusterCockpit/cc-lib/v2 v2.9.0 h1:mzUYakcjwb+UP5II4jOvr36rSYct90gXBbtUg+nvm9c= +github.com/ClusterCockpit/cc-lib/v2 v2.9.0/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o= github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 h1:hIzxgTBWcmCIHtoDKDkSCsKCOCOwUC34sFsbD2wcW0Q= github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0/go.mod h1:y42qUu+YFmu5fdNuUAS4VbbIKxVjxCvbVqFdpdh8ahY= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= diff --git a/internal/api/nats.go b/internal/api/nats.go index efa4ab6f..db229a04 100644 --- a/internal/api/nats.go +++ b/internal/api/nats.go @@ -8,6 +8,7 @@ package api import ( "database/sql" "encoding/json" + "maps" "strings" "sync" "time" @@ -15,7 +16,10 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/archiver" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/importer" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatch" "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/metricstore" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" "github.com/ClusterCockpit/cc-lib/v2/nats" @@ -24,6 +28,12 @@ import ( influx "github.com/ClusterCockpit/cc-line-protocol/v2/lineprotocol" ) +// natsMessage wraps a raw NATS message with its subject for channel-based processing. +type natsMessage struct { + subject string + data []byte +} + // NatsAPI provides NATS subscription-based handlers for Job and Node operations. // It mirrors the functionality of the REST API but uses NATS messaging with // InfluxDB line protocol as the message format. @@ -64,16 +74,60 @@ type NatsAPI struct { // RepositoryMutex protects job creation operations from race conditions // when checking for duplicate jobs during startJob calls. RepositoryMutex sync.Mutex + // jobCh receives job event messages for processing by worker goroutines. + jobCh chan natsMessage + // nodeCh receives node state messages for processing by worker goroutines. + nodeCh chan natsMessage } -// NewNatsAPI creates a new NatsAPI instance with default dependencies. +// NewNatsAPI creates a new NatsAPI instance with channel-based worker pools. +// Concurrency is configured via NATSConfig (defaults: JobConcurrency=8, NodeConcurrency=2). func NewNatsAPI() *NatsAPI { - return &NatsAPI{ + jobConc := 8 + nodeConc := 2 + + if s := config.Keys.APISubjects; s != nil { + if s.JobConcurrency > 0 { + jobConc = s.JobConcurrency + } + if s.NodeConcurrency > 0 { + nodeConc = s.NodeConcurrency + } + } + + api := &NatsAPI{ JobRepository: repository.GetJobRepository(), + jobCh: make(chan natsMessage, jobConc), + nodeCh: make(chan natsMessage, nodeConc), + } + + // Start worker goroutines + for range jobConc { + go api.jobWorker() + } + for range nodeConc { + go api.nodeWorker() + } + + return api +} + +// jobWorker processes job event messages from the job channel. +func (api *NatsAPI) jobWorker() { + for msg := range api.jobCh { + api.handleJobEvent(msg.subject, msg.data) + } +} + +// nodeWorker processes node state messages from the node channel. +func (api *NatsAPI) nodeWorker() { + for msg := range api.nodeCh { + api.handleNodeState(msg.subject, msg.data) } } // StartSubscriptions registers all NATS subscriptions for Job and Node APIs. +// Messages are delivered to buffered channels and processed by worker goroutines. // Returns an error if the NATS client is not available or subscription fails. func (api *NatsAPI) StartSubscriptions() error { client := nats.GetClient() @@ -83,14 +137,17 @@ func (api *NatsAPI) StartSubscriptions() error { } if config.Keys.APISubjects != nil { - s := config.Keys.APISubjects - if err := client.Subscribe(s.SubjectJobEvent, api.handleJobEvent); err != nil { + if err := client.Subscribe(s.SubjectJobEvent, func(subject string, data []byte) { + api.jobCh <- natsMessage{subject: subject, data: data} + }); err != nil { return err } - if err := client.Subscribe(s.SubjectNodeState, api.handleNodeState); err != nil { + if err := client.Subscribe(s.SubjectNodeState, func(subject string, data []byte) { + api.nodeCh <- natsMessage{subject: subject, data: data} + }); err != nil { return err } @@ -345,22 +402,66 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) { repo := repository.GetNodeRepository() requestReceived := time.Now().Unix() + // Build nodeList per subcluster for health check + m := make(map[string][]string) + metricNames := make(map[string][]string) + healthResults := make(map[string]metricstore.HealthCheckResult) + + for _, node := range req.Nodes { + if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil { + m[sc] = append(m[sc], node.Hostname) + } + } + + for sc := range m { + if sc != "" { + metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc) + metricNames[sc] = metricListToNames(metricList) + } + } + + // Perform health check against metric store + healthRepo, err := metricdispatch.GetHealthCheckRepo(req.Cluster) + if err != nil { + cclog.Warnf("NATS nodestate: no metric store for cluster %s, skipping health check: %v", req.Cluster, err) + } else { + for sc, nl := range m { + if sc != "" { + if results, err := healthRepo.HealthCheck(req.Cluster, nl, metricNames[sc]); err == nil { + maps.Copy(healthResults, results) + } + } + } + } + + updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes)) for _, node := range req.Nodes { state := determineState(node.States) + healthState := schema.MonitoringStateFailed + var healthMetrics string + if result, ok := healthResults[node.Hostname]; ok { + healthState = result.State + healthMetrics = result.HealthMetrics + } nodeState := schema.NodeStateDB{ TimeStamp: requestReceived, NodeState: state, CpusAllocated: node.CpusAllocated, MemoryAllocated: node.MemoryAllocated, GpusAllocated: node.GpusAllocated, - HealthState: schema.MonitoringStateFull, + HealthState: healthState, + HealthMetrics: healthMetrics, JobsRunning: node.JobsRunning, } + updates = append(updates, repository.NodeStateUpdate{ + Hostname: node.Hostname, + Cluster: req.Cluster, + NodeState: &nodeState, + }) + } - if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil { - cclog.Errorf("NATS nodestate: updating node state for %s on %s failed: %v", - node.Hostname, req.Cluster, err) - } + if err := repo.BatchUpdateNodeStates(updates); err != nil { + cclog.Errorf("NATS nodestate: batch update for cluster %s failed: %v", req.Cluster, err) } cclog.Debugf("NATS nodestate: updated %d node states for cluster %s", len(req.Nodes), req.Cluster) diff --git a/internal/api/node.go b/internal/api/node.go index 5032ed7b..06787cd1 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -116,6 +116,7 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs)) startDB := time.Now() + updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes)) for _, node := range req.Nodes { state := determineState(node.States) healthState := schema.MonitoringStateFailed @@ -134,11 +135,15 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { HealthMetrics: healthMetrics, JobsRunning: node.JobsRunning, } + updates = append(updates, repository.NodeStateUpdate{ + Hostname: node.Hostname, + Cluster: req.Cluster, + NodeState: &nodeState, + }) + } - if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil { - cclog.Errorf("updateNodeStates: updating node state for %s on %s failed: %v", - node.Hostname, req.Cluster, err) - } + if err := repo.BatchUpdateNodeStates(updates); err != nil { + cclog.Errorf("updateNodeStates: batch update for cluster %s failed: %v", req.Cluster, err) } cclog.Debugf("Timer updateNodeStates, SQLite Inserts: %s", time.Since(startDB)) diff --git a/internal/api/rest.go b/internal/api/rest.go index 613867a8..cb041012 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -16,6 +16,7 @@ import ( "net/http" "os" "path/filepath" + "runtime" "strings" "sync" @@ -184,7 +185,8 @@ type DefaultAPIResponse struct { // handleError writes a standardized JSON error response with the given status code. // It logs the error at WARN level and ensures proper Content-Type headers are set. func handleError(err error, statusCode int, rw http.ResponseWriter) { - cclog.Warnf("REST ERROR : %s", err.Error()) + _, file, line, _ := runtime.Caller(1) + cclog.Warnf("REST ERROR (%s:%d): %s", filepath.Base(file), line, err.Error()) rw.Header().Add("Content-Type", "application/json") rw.WriteHeader(statusCode) if err := json.NewEncoder(rw).Encode(ErrorResponse{ diff --git a/internal/auth/ldap.go b/internal/auth/ldap.go index a174bb9d..b65e485f 100644 --- a/internal/auth/ldap.go +++ b/internal/auth/ldap.go @@ -210,7 +210,7 @@ func (la *LdapAuthenticator) Sync() error { } cclog.Debugf("sync: add %v (name: %v, roles: [user], ldap: true)", username, name) - if err := ur.AddUser(user); err != nil { + if err := ur.AddUserIfNotExists(user); err != nil { cclog.Errorf("User '%s' LDAP: Insert into DB failed", username) return err } diff --git a/internal/config/config.go b/internal/config/config.go index e04d0a72..677b24e4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -88,6 +88,7 @@ type DbConfig struct { MaxOpenConnections int `json:"max-open-connections"` MaxIdleConnections int `json:"max-idle-connections"` ConnectionMaxIdleTimeMins int `json:"max-idle-time-minutes"` + BusyTimeoutMs int `json:"busy-timeout-ms"` } type NodeStateRetention struct { @@ -116,6 +117,8 @@ type ResampleConfig struct { type NATSConfig struct { SubjectJobEvent string `json:"subject-job-event"` SubjectNodeState string `json:"subject-node-state"` + JobConcurrency int `json:"job-concurrency"` + NodeConcurrency int `json:"node-concurrency"` } type IntRange struct { diff --git a/internal/config/schema.go b/internal/config/schema.go index 64d9bcf8..195dfaeb 100644 --- a/internal/config/schema.go +++ b/internal/config/schema.go @@ -121,6 +121,14 @@ var configSchema = ` "subject-node-state": { "description": "NATS subject for node state updates", "type": "string" + }, + "job-concurrency": { + "description": "Number of concurrent worker goroutines for processing job events (default: 8).", + "type": "integer" + }, + "node-concurrency": { + "description": "Number of concurrent worker goroutines for processing node state events (default: 2).", + "type": "integer" } }, "required": ["subject-job-event", "subject-node-state"] @@ -201,6 +209,10 @@ var configSchema = ` "max-idle-time-minutes": { "description": "Maximum idle time for a connection in minutes (default: 10).", "type": "integer" + }, + "busy-timeout-ms": { + "description": "SQLite busy timeout in milliseconds. When a write is blocked, SQLite retries with backoff for up to this duration before returning SQLITE_BUSY (default: 60000).", + "type": "integer" } } } diff --git a/internal/repository/config.go b/internal/repository/config.go index 2bd4f749..a201bb6b 100644 --- a/internal/repository/config.go +++ b/internal/repository/config.go @@ -46,6 +46,12 @@ type RepositoryConfig struct { // It's a soft limit — queries won't fail, but cache eviction becomes more aggressive. // Default: 16384 (16GB) DbSoftHeapLimitMB int + + // BusyTimeoutMs is the SQLite busy_timeout in milliseconds. + // When a write is blocked by another writer, SQLite retries internally + // using a backoff mechanism for up to this duration before returning SQLITE_BUSY. + // Default: 60000 (60 seconds) + BusyTimeoutMs int } // DefaultConfig returns the default repository configuration. @@ -60,6 +66,7 @@ func DefaultConfig() *RepositoryConfig { MinRunningJobDuration: 600, // 10 minutes DbCacheSizeMB: 2048, // 2GB per connection DbSoftHeapLimitMB: 16384, // 16GB process-wide + BusyTimeoutMs: 60000, // 60 seconds } } diff --git a/internal/repository/dbConnection.go b/internal/repository/dbConnection.go index 23a19c93..80df2869 100644 --- a/internal/repository/dbConnection.go +++ b/internal/repository/dbConnection.go @@ -70,7 +70,7 @@ func Connect(db string) { connectionURLParams := make(url.Values) connectionURLParams.Add("_txlock", "immediate") connectionURLParams.Add("_journal_mode", "WAL") - connectionURLParams.Add("_busy_timeout", "5000") + connectionURLParams.Add("_busy_timeout", fmt.Sprintf("%d", repoConfig.BusyTimeoutMs)) connectionURLParams.Add("_synchronous", "NORMAL") cacheSizeKiB := repoConfig.DbCacheSizeMB * 1024 // Convert MB to KiB connectionURLParams.Add("_cache_size", fmt.Sprintf("-%d", cacheSizeKiB)) diff --git a/internal/repository/job.go b/internal/repository/job.go index 3b237bce..1c8b4a8d 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -66,6 +66,8 @@ import ( "fmt" "maps" "math" + "path/filepath" + "runtime" "sort" "strconv" "sync" @@ -159,7 +161,10 @@ func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) { &job.StartTime, &job.Partition, &job.ArrayJobID, &job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Shared, &job.MonitoringStatus, &job.SMT, &job.State, &job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint, &job.Energy); err != nil { - cclog.Warnf("Error while scanning rows (Job): %v", err) + if err != sql.ErrNoRows { + _, file, line, _ := runtime.Caller(1) + cclog.Warnf("Error while scanning rows (Job) (%s:%d): %v", filepath.Base(file), line, err) + } return nil, err } diff --git a/internal/repository/jobCreate.go b/internal/repository/jobCreate.go index 7047f12f..465d8d9c 100644 --- a/internal/repository/jobCreate.go +++ b/internal/repository/jobCreate.go @@ -92,20 +92,33 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) { jobs = append(jobs, job) } + // Transfer cached jobs to main table and clear cache in a single transaction. + tx, err := r.DB.Beginx() + if err != nil { + cclog.Errorf("SyncJobs: begin transaction: %v", err) + return nil, err + } + defer tx.Rollback() + // Use INSERT OR IGNORE to skip jobs already transferred by the stop path - _, err = r.DB.Exec( + _, err = tx.Exec( "INSERT OR IGNORE INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache") if err != nil { cclog.Errorf("Error while Job sync: %v", err) return nil, err } - _, err = r.DB.Exec("DELETE FROM job_cache") + _, err = tx.Exec("DELETE FROM job_cache") if err != nil { cclog.Errorf("Error while Job cache clean: %v", err) return nil, err } + if err := tx.Commit(); err != nil { + cclog.Errorf("SyncJobs: commit transaction: %v", err) + return nil, err + } + // Resolve correct job.id from the job table. The IDs read from job_cache // are from a different auto-increment sequence and must not be used to // query the job table. @@ -128,7 +141,13 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) { // TransferCachedJobToMain moves a job from job_cache to the job table. // Caller must hold r.Mutex. Returns the new job table ID. func (r *JobRepository) TransferCachedJobToMain(cacheID int64) (int64, error) { - res, err := r.DB.Exec( + tx, err := r.DB.Beginx() + if err != nil { + return 0, fmt.Errorf("TransferCachedJobToMain: begin transaction: %w", err) + } + defer tx.Rollback() + + res, err := tx.Exec( "INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache WHERE id = ?", cacheID) if err != nil { @@ -140,11 +159,15 @@ func (r *JobRepository) TransferCachedJobToMain(cacheID int64) (int64, error) { return 0, fmt.Errorf("getting new job ID after transfer failed: %w", err) } - _, err = r.DB.Exec("DELETE FROM job_cache WHERE id = ?", cacheID) + _, err = tx.Exec("DELETE FROM job_cache WHERE id = ?", cacheID) if err != nil { return 0, fmt.Errorf("deleting cached job %d after transfer failed: %w", cacheID, err) } + if err := tx.Commit(); err != nil { + return 0, fmt.Errorf("TransferCachedJobToMain: commit: %w", err) + } + return newID, nil } diff --git a/internal/repository/node.go b/internal/repository/node.go index 116b9868..2e174e95 100644 --- a/internal/repository/node.go +++ b/internal/repository/node.go @@ -244,6 +244,77 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt return nil } +// NodeStateUpdate holds the data needed to update one node's state in a batch operation. +type NodeStateUpdate struct { + Hostname string + Cluster string + NodeState *schema.NodeStateDB +} + +// BatchUpdateNodeStates inserts node state rows for multiple nodes in a single transaction. +// For each node, it looks up (or creates) the node row, then inserts the state row. +// This reduces lock acquisitions from 2*N to 1 for N nodes. +func (r *NodeRepository) BatchUpdateNodeStates(updates []NodeStateUpdate) error { + tx, err := r.DB.Beginx() + if err != nil { + return fmt.Errorf("BatchUpdateNodeStates: begin transaction: %w", err) + } + defer tx.Rollback() + + stmtLookup, err := tx.Preparex("SELECT id FROM node WHERE hostname = ? AND cluster = ?") + if err != nil { + return fmt.Errorf("BatchUpdateNodeStates: prepare lookup: %w", err) + } + defer stmtLookup.Close() + + stmtInsertNode, err := tx.PrepareNamed(NamedNodeInsert) + if err != nil { + return fmt.Errorf("BatchUpdateNodeStates: prepare node insert: %w", err) + } + defer stmtInsertNode.Close() + + stmtInsertState, err := tx.PrepareNamed(NamedNodeStateInsert) + if err != nil { + return fmt.Errorf("BatchUpdateNodeStates: prepare state insert: %w", err) + } + defer stmtInsertState.Close() + + for _, u := range updates { + var id int64 + if err := stmtLookup.QueryRow(u.Hostname, u.Cluster).Scan(&id); err != nil { + if err == sql.ErrNoRows { + subcluster, scErr := archive.GetSubClusterByNode(u.Cluster, u.Hostname) + if scErr != nil { + cclog.Errorf("BatchUpdateNodeStates: subcluster lookup for '%s' in '%s': %v", u.Hostname, u.Cluster, scErr) + continue + } + node := schema.NodeDB{ + Hostname: u.Hostname, Cluster: u.Cluster, SubCluster: subcluster, + } + res, insertErr := stmtInsertNode.Exec(&node) + if insertErr != nil { + cclog.Errorf("BatchUpdateNodeStates: insert node '%s': %v", u.Hostname, insertErr) + continue + } + id, _ = res.LastInsertId() + } else { + cclog.Errorf("BatchUpdateNodeStates: lookup node '%s': %v", u.Hostname, err) + continue + } + } + + u.NodeState.NodeID = id + if _, err := stmtInsertState.Exec(u.NodeState); err != nil { + cclog.Errorf("BatchUpdateNodeStates: insert state for '%s': %v", u.Hostname, err) + } + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("BatchUpdateNodeStates: commit: %w", err) + } + return nil +} + // func (r *NodeRepository) UpdateHealthState(hostname string, healthState *schema.MonitoringState) error { // if _, err := sq.Update("node").Set("health_state", healthState).Where("node.id = ?", id).RunWith(r.DB).Exec(); err != nil { // cclog.Errorf("error while updating node '%d'", id) diff --git a/internal/repository/user.go b/internal/repository/user.go index 8e45e16a..307916eb 100644 --- a/internal/repository/user.go +++ b/internal/repository/user.go @@ -2,6 +2,7 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package repository import ( @@ -10,7 +11,9 @@ import ( "encoding/json" "errors" "fmt" + "path/filepath" "reflect" + "runtime" "strings" "sync" @@ -73,7 +76,11 @@ func (r *UserRepository) GetUser(username string) (*schema.User, error) { if err := sq.Select("password", "ldap", "name", "roles", "email", "projects").From("hpc_user"). Where("hpc_user.username = ?", username).RunWith(r.DB). QueryRow().Scan(&hashedPassword, &user.AuthSource, &name, &rawRoles, &email, &rawProjects); err != nil { - cclog.Warnf("Error while querying user '%v' from database", username) + if err != sql.ErrNoRows { + _, file, line, _ := runtime.Caller(1) + cclog.Warnf("Error while querying user '%v' from database (%s:%d): %v", + username, filepath.Base(file), line, err) + } return nil, err } @@ -124,11 +131,11 @@ func (r *UserRepository) GetLdapUsernames() ([]string, error) { // Required fields: Username, Roles // Optional fields: Name, Email, Password, Projects, AuthSource func (r *UserRepository) AddUser(user *schema.User) error { - rolesJson, _ := json.Marshal(user.Roles) - projectsJson, _ := json.Marshal(user.Projects) + rolesJSON, _ := json.Marshal(user.Roles) + projectsJSON, _ := json.Marshal(user.Projects) cols := []string{"username", "roles", "projects"} - vals := []any{user.Username, string(rolesJson), string(projectsJson)} + vals := []any{user.Username, string(rolesJSON), string(projectsJSON)} if user.Name != "" { cols = append(cols, "name") @@ -157,7 +164,7 @@ func (r *UserRepository) AddUser(user *schema.User) error { return err } - cclog.Infof("new user %#v created (roles: %s, auth-source: %d, projects: %s)", user.Username, rolesJson, user.AuthSource, projectsJson) + cclog.Infof("new user %#v created (roles: %s, auth-source: %d, projects: %s)", user.Username, rolesJSON, user.AuthSource, projectsJSON) // DEPRECATED: SUPERSEDED BY NEW USER CONFIG - userConfig.go / web.go defaultMetricsCfg, err := config.LoadDefaultMetricsConfig() @@ -188,6 +195,21 @@ func (r *UserRepository) AddUser(user *schema.User) error { return nil } +// AddUserIfNotExists inserts a user only if the username does not already exist. +// Uses INSERT OR IGNORE to avoid UNIQUE constraint errors when a user is +// concurrently created (e.g., by a login while LDAP sync is running). +// Unlike AddUser, this intentionally skips the deprecated default metrics config insertion. +func (r *UserRepository) AddUserIfNotExists(user *schema.User) error { + rolesJSON, _ := json.Marshal(user.Roles) + projectsJSON, _ := json.Marshal(user.Projects) + + cols := "username, name, roles, projects, ldap" + _, err := r.DB.Exec( + `INSERT OR IGNORE INTO hpc_user (`+cols+`) VALUES (?, ?, ?, ?, ?)`, + user.Username, user.Name, string(rolesJSON), string(projectsJSON), int(user.AuthSource)) + return err +} + func (r *UserRepository) UpdateUser(dbUser *schema.User, user *schema.User) error { // user contains updated info -> Apply to dbUser // --- Simple Name Update --- diff --git a/pkg/archive/sqliteBackend.go b/pkg/archive/sqliteBackend.go index 3f214136..ff7bf333 100644 --- a/pkg/archive/sqliteBackend.go +++ b/pkg/archive/sqliteBackend.go @@ -102,7 +102,7 @@ func (sa *SqliteArchive) Init(rawConfig json.RawMessage) (uint64, error) { "PRAGMA journal_mode=WAL", "PRAGMA synchronous=NORMAL", "PRAGMA cache_size=-64000", // 64MB cache - "PRAGMA busy_timeout=5000", + "PRAGMA busy_timeout=60000", } for _, pragma := range pragmas { if _, err := sa.db.Exec(pragma); err != nil { diff --git a/pkg/metricstore/api.go b/pkg/metricstore/api.go index c091c6d0..01dec633 100644 --- a/pkg/metricstore/api.go +++ b/pkg/metricstore/api.go @@ -288,7 +288,7 @@ func FetchData(req APIQueryRequest) (*APIQueryResponse, error) { data.Error = &msg res = append(res, data) } else { - cclog.Warnf("failed to fetch '%s' from host '%s' (cluster: %s): %s", query.Metric, query.Hostname, req.Cluster, err.Error()) + cclog.Debugf("failed to fetch '%s' from host '%s' (cluster: %s): %s", query.Metric, query.Hostname, req.Cluster, err.Error()) } continue }