Merge pull request #521 from ClusterCockpit/hotfix

Hotfix
This commit is contained in:
Jan Eitzinger
2026-03-17 09:23:59 +01:00
committed by GitHub
19 changed files with 305 additions and 38 deletions

View File

@@ -151,9 +151,11 @@ applied automatically on startup. Version tracking in `version` table.
## Configuration
- **config.json**: Main configuration (clusters, metric repositories, archive settings)
- `main.apiSubjects`: NATS subject configuration (optional)
- `subjectJobEvent`: Subject for job start/stop events (e.g., "cc.job.event")
- `subjectNodeState`: Subject for node state updates (e.g., "cc.node.state")
- `main.api-subjects`: NATS subject configuration (optional)
- `subject-job-event`: Subject for job start/stop events (e.g., "cc.job.event")
- `subject-node-state`: Subject for node state updates (e.g., "cc.node.state")
- `job-concurrency`: Worker goroutines for job events (default: 8)
- `node-concurrency`: Worker goroutines for node state events (default: 2)
- `nats`: NATS client connection configuration (optional)
- `address`: NATS server address (e.g., "nats://localhost:4222")
- `username`: Authentication username (optional)
@@ -241,13 +243,19 @@ The backend supports a NATS-based API as an alternative to the REST API for job
```json
{
"main": {
"apiSubjects": {
"subjectJobEvent": "cc.job.event",
"subjectNodeState": "cc.node.state"
"api-subjects": {
"subject-job-event": "cc.job.event",
"subject-node-state": "cc.node.state",
"job-concurrency": 8,
"node-concurrency": 2
}
}
}
```
- `subject-job-event` (required): NATS subject for job start/stop events
- `subject-node-state` (required): NATS subject for node state updates
- `job-concurrency` (optional, default: 8): Number of concurrent worker goroutines for job events
- `node-concurrency` (optional, default: 2): Number of concurrent worker goroutines for node state events
### Message Format
@@ -292,9 +300,10 @@ job,function=stop_job event="{\"jobId\":123,\"cluster\":\"test\",\"startTime\":1
### Implementation Notes
- NATS API mirrors REST API functionality but uses messaging
- Job start/stop events are processed asynchronously
- Job start/stop events are processed asynchronously via configurable worker pools
- Duplicate job detection is handled (same as REST API)
- All validation rules from REST API apply
- Node state updates include health checks against the metric store (identical to REST handler): nodes are grouped by subcluster, metric configurations are fetched, and `HealthCheck()` is called per subcluster. Nodes default to `MonitoringStateFailed` if no health data is available.
- Messages are logged; no responses are sent back to publishers
- If NATS client is unavailable, API subscriptions are skipped (logged as warning)

View File

@@ -148,7 +148,8 @@ usage can be tuned via the optional `db-config` section in config.json under
"soft-heap-limit-mb": 16384,
"max-open-connections": 4,
"max-idle-connections": 4,
"max-idle-time-minutes": 10
"max-idle-time-minutes": 10,
"busy-timeout-ms": 60000
}
}
}
@@ -166,6 +167,7 @@ are used.
| `max-open-connections` | 4 | Maximum number of open database connections. |
| `max-idle-connections` | 4 | Maximum number of idle database connections kept in the pool. |
| `max-idle-time-minutes` | 10 | Maximum time in minutes a connection can sit idle before being closed. |
| `busy-timeout-ms` | 60000 | SQLite busy timeout in milliseconds. When a write is blocked by another writer, SQLite retries internally with backoff for up to this duration before returning `SQLITE_BUSY`. |
### Sizing Guidelines

View File

@@ -126,6 +126,9 @@ func initDatabase() error {
if dc.ConnectionMaxIdleTimeMins > 0 {
cfg.ConnectionMaxIdleTime = time.Duration(dc.ConnectionMaxIdleTimeMins) * time.Minute
}
if dc.BusyTimeoutMs > 0 {
cfg.BusyTimeoutMs = dc.BusyTimeoutMs
}
repository.SetConfig(cfg)
}
repository.Connect(config.Keys.DB)

2
go.mod
View File

@@ -9,7 +9,7 @@ tool (
require (
github.com/99designs/gqlgen v0.17.88
github.com/ClusterCockpit/cc-lib/v2 v2.8.2
github.com/ClusterCockpit/cc-lib/v2 v2.9.0
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0
github.com/Masterminds/squirrel v1.5.4
github.com/aws/aws-sdk-go-v2 v1.41.3

2
go.sum
View File

@@ -6,6 +6,8 @@ github.com/Azure/go-ntlmssp v0.1.0 h1:DjFo6YtWzNqNvQdrwEyr/e4nhU3vRiwenz5QX7sFz+
github.com/Azure/go-ntlmssp v0.1.0/go.mod h1:NYqdhxd/8aAct/s4qSYZEerdPuH1liG2/X9DiVTbhpk=
github.com/ClusterCockpit/cc-lib/v2 v2.8.2 h1:rCLZk8wz8yq8xBnBEdVKigvA2ngR8dPmHbEFwxxb3jw=
github.com/ClusterCockpit/cc-lib/v2 v2.8.2/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
github.com/ClusterCockpit/cc-lib/v2 v2.9.0 h1:mzUYakcjwb+UP5II4jOvr36rSYct90gXBbtUg+nvm9c=
github.com/ClusterCockpit/cc-lib/v2 v2.9.0/go.mod h1:FwD8vnTIbBM3ngeLNKmCvp9FoSjQZm7xnuaVxEKR23o=
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0 h1:hIzxgTBWcmCIHtoDKDkSCsKCOCOwUC34sFsbD2wcW0Q=
github.com/ClusterCockpit/cc-line-protocol/v2 v2.4.0/go.mod h1:y42qUu+YFmu5fdNuUAS4VbbIKxVjxCvbVqFdpdh8ahY=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=

View File

@@ -8,6 +8,7 @@ package api
import (
"database/sql"
"encoding/json"
"maps"
"strings"
"sync"
"time"
@@ -15,7 +16,10 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/archiver"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/importer"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage"
"github.com/ClusterCockpit/cc-lib/v2/nats"
@@ -24,6 +28,12 @@ import (
influx "github.com/ClusterCockpit/cc-line-protocol/v2/lineprotocol"
)
// natsMessage wraps a raw NATS message with its subject for channel-based processing.
type natsMessage struct {
subject string
data []byte
}
// NatsAPI provides NATS subscription-based handlers for Job and Node operations.
// It mirrors the functionality of the REST API but uses NATS messaging with
// InfluxDB line protocol as the message format.
@@ -64,16 +74,60 @@ type NatsAPI struct {
// RepositoryMutex protects job creation operations from race conditions
// when checking for duplicate jobs during startJob calls.
RepositoryMutex sync.Mutex
// jobCh receives job event messages for processing by worker goroutines.
jobCh chan natsMessage
// nodeCh receives node state messages for processing by worker goroutines.
nodeCh chan natsMessage
}
// NewNatsAPI creates a new NatsAPI instance with default dependencies.
// NewNatsAPI creates a new NatsAPI instance with channel-based worker pools.
// Concurrency is configured via NATSConfig (defaults: JobConcurrency=8, NodeConcurrency=2).
func NewNatsAPI() *NatsAPI {
return &NatsAPI{
jobConc := 8
nodeConc := 2
if s := config.Keys.APISubjects; s != nil {
if s.JobConcurrency > 0 {
jobConc = s.JobConcurrency
}
if s.NodeConcurrency > 0 {
nodeConc = s.NodeConcurrency
}
}
api := &NatsAPI{
JobRepository: repository.GetJobRepository(),
jobCh: make(chan natsMessage, jobConc),
nodeCh: make(chan natsMessage, nodeConc),
}
// Start worker goroutines
for range jobConc {
go api.jobWorker()
}
for range nodeConc {
go api.nodeWorker()
}
return api
}
// jobWorker processes job event messages from the job channel.
func (api *NatsAPI) jobWorker() {
for msg := range api.jobCh {
api.handleJobEvent(msg.subject, msg.data)
}
}
// nodeWorker processes node state messages from the node channel.
func (api *NatsAPI) nodeWorker() {
for msg := range api.nodeCh {
api.handleNodeState(msg.subject, msg.data)
}
}
// StartSubscriptions registers all NATS subscriptions for Job and Node APIs.
// Messages are delivered to buffered channels and processed by worker goroutines.
// Returns an error if the NATS client is not available or subscription fails.
func (api *NatsAPI) StartSubscriptions() error {
client := nats.GetClient()
@@ -83,14 +137,17 @@ func (api *NatsAPI) StartSubscriptions() error {
}
if config.Keys.APISubjects != nil {
s := config.Keys.APISubjects
if err := client.Subscribe(s.SubjectJobEvent, api.handleJobEvent); err != nil {
if err := client.Subscribe(s.SubjectJobEvent, func(subject string, data []byte) {
api.jobCh <- natsMessage{subject: subject, data: data}
}); err != nil {
return err
}
if err := client.Subscribe(s.SubjectNodeState, api.handleNodeState); err != nil {
if err := client.Subscribe(s.SubjectNodeState, func(subject string, data []byte) {
api.nodeCh <- natsMessage{subject: subject, data: data}
}); err != nil {
return err
}
@@ -345,22 +402,66 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) {
repo := repository.GetNodeRepository()
requestReceived := time.Now().Unix()
// Build nodeList per subcluster for health check
m := make(map[string][]string)
metricNames := make(map[string][]string)
healthResults := make(map[string]metricstore.HealthCheckResult)
for _, node := range req.Nodes {
if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil {
m[sc] = append(m[sc], node.Hostname)
}
}
for sc := range m {
if sc != "" {
metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc)
metricNames[sc] = metricListToNames(metricList)
}
}
// Perform health check against metric store
healthRepo, err := metricdispatch.GetHealthCheckRepo(req.Cluster)
if err != nil {
cclog.Warnf("NATS nodestate: no metric store for cluster %s, skipping health check: %v", req.Cluster, err)
} else {
for sc, nl := range m {
if sc != "" {
if results, err := healthRepo.HealthCheck(req.Cluster, nl, metricNames[sc]); err == nil {
maps.Copy(healthResults, results)
}
}
}
}
updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes))
for _, node := range req.Nodes {
state := determineState(node.States)
healthState := schema.MonitoringStateFailed
var healthMetrics string
if result, ok := healthResults[node.Hostname]; ok {
healthState = result.State
healthMetrics = result.HealthMetrics
}
nodeState := schema.NodeStateDB{
TimeStamp: requestReceived,
NodeState: state,
CpusAllocated: node.CpusAllocated,
MemoryAllocated: node.MemoryAllocated,
GpusAllocated: node.GpusAllocated,
HealthState: schema.MonitoringStateFull,
HealthState: healthState,
HealthMetrics: healthMetrics,
JobsRunning: node.JobsRunning,
}
if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil {
cclog.Errorf("NATS nodestate: updating node state for %s on %s failed: %v",
node.Hostname, req.Cluster, err)
updates = append(updates, repository.NodeStateUpdate{
Hostname: node.Hostname,
Cluster: req.Cluster,
NodeState: &nodeState,
})
}
if err := repo.BatchUpdateNodeStates(updates); err != nil {
cclog.Errorf("NATS nodestate: batch update for cluster %s failed: %v", req.Cluster, err)
}
cclog.Debugf("NATS nodestate: updated %d node states for cluster %s", len(req.Nodes), req.Cluster)

View File

@@ -116,6 +116,7 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs))
startDB := time.Now()
updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes))
for _, node := range req.Nodes {
state := determineState(node.States)
healthState := schema.MonitoringStateFailed
@@ -134,11 +135,15 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
HealthMetrics: healthMetrics,
JobsRunning: node.JobsRunning,
}
if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil {
cclog.Errorf("updateNodeStates: updating node state for %s on %s failed: %v",
node.Hostname, req.Cluster, err)
updates = append(updates, repository.NodeStateUpdate{
Hostname: node.Hostname,
Cluster: req.Cluster,
NodeState: &nodeState,
})
}
if err := repo.BatchUpdateNodeStates(updates); err != nil {
cclog.Errorf("updateNodeStates: batch update for cluster %s failed: %v", req.Cluster, err)
}
cclog.Debugf("Timer updateNodeStates, SQLite Inserts: %s", time.Since(startDB))

View File

@@ -16,6 +16,7 @@ import (
"net/http"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
@@ -184,7 +185,8 @@ type DefaultAPIResponse struct {
// handleError writes a standardized JSON error response with the given status code.
// It logs the error at WARN level and ensures proper Content-Type headers are set.
func handleError(err error, statusCode int, rw http.ResponseWriter) {
cclog.Warnf("REST ERROR : %s", err.Error())
_, file, line, _ := runtime.Caller(1)
cclog.Warnf("REST ERROR (%s:%d): %s", filepath.Base(file), line, err.Error())
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(statusCode)
if err := json.NewEncoder(rw).Encode(ErrorResponse{

View File

@@ -210,7 +210,7 @@ func (la *LdapAuthenticator) Sync() error {
}
cclog.Debugf("sync: add %v (name: %v, roles: [user], ldap: true)", username, name)
if err := ur.AddUser(user); err != nil {
if err := ur.AddUserIfNotExists(user); err != nil {
cclog.Errorf("User '%s' LDAP: Insert into DB failed", username)
return err
}

View File

@@ -88,6 +88,7 @@ type DbConfig struct {
MaxOpenConnections int `json:"max-open-connections"`
MaxIdleConnections int `json:"max-idle-connections"`
ConnectionMaxIdleTimeMins int `json:"max-idle-time-minutes"`
BusyTimeoutMs int `json:"busy-timeout-ms"`
}
type NodeStateRetention struct {
@@ -116,6 +117,8 @@ type ResampleConfig struct {
type NATSConfig struct {
SubjectJobEvent string `json:"subject-job-event"`
SubjectNodeState string `json:"subject-node-state"`
JobConcurrency int `json:"job-concurrency"`
NodeConcurrency int `json:"node-concurrency"`
}
type IntRange struct {

View File

@@ -121,6 +121,14 @@ var configSchema = `
"subject-node-state": {
"description": "NATS subject for node state updates",
"type": "string"
},
"job-concurrency": {
"description": "Number of concurrent worker goroutines for processing job events (default: 8).",
"type": "integer"
},
"node-concurrency": {
"description": "Number of concurrent worker goroutines for processing node state events (default: 2).",
"type": "integer"
}
},
"required": ["subject-job-event", "subject-node-state"]
@@ -201,6 +209,10 @@ var configSchema = `
"max-idle-time-minutes": {
"description": "Maximum idle time for a connection in minutes (default: 10).",
"type": "integer"
},
"busy-timeout-ms": {
"description": "SQLite busy timeout in milliseconds. When a write is blocked, SQLite retries with backoff for up to this duration before returning SQLITE_BUSY (default: 60000).",
"type": "integer"
}
}
}

View File

@@ -46,6 +46,12 @@ type RepositoryConfig struct {
// It's a soft limit — queries won't fail, but cache eviction becomes more aggressive.
// Default: 16384 (16GB)
DbSoftHeapLimitMB int
// BusyTimeoutMs is the SQLite busy_timeout in milliseconds.
// When a write is blocked by another writer, SQLite retries internally
// using a backoff mechanism for up to this duration before returning SQLITE_BUSY.
// Default: 60000 (60 seconds)
BusyTimeoutMs int
}
// DefaultConfig returns the default repository configuration.
@@ -60,6 +66,7 @@ func DefaultConfig() *RepositoryConfig {
MinRunningJobDuration: 600, // 10 minutes
DbCacheSizeMB: 2048, // 2GB per connection
DbSoftHeapLimitMB: 16384, // 16GB process-wide
BusyTimeoutMs: 60000, // 60 seconds
}
}

View File

@@ -70,7 +70,7 @@ func Connect(db string) {
connectionURLParams := make(url.Values)
connectionURLParams.Add("_txlock", "immediate")
connectionURLParams.Add("_journal_mode", "WAL")
connectionURLParams.Add("_busy_timeout", "5000")
connectionURLParams.Add("_busy_timeout", fmt.Sprintf("%d", repoConfig.BusyTimeoutMs))
connectionURLParams.Add("_synchronous", "NORMAL")
cacheSizeKiB := repoConfig.DbCacheSizeMB * 1024 // Convert MB to KiB
connectionURLParams.Add("_cache_size", fmt.Sprintf("-%d", cacheSizeKiB))

View File

@@ -66,6 +66,8 @@ import (
"fmt"
"maps"
"math"
"path/filepath"
"runtime"
"sort"
"strconv"
"sync"
@@ -159,7 +161,10 @@ func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) {
&job.StartTime, &job.Partition, &job.ArrayJobID, &job.NumNodes, &job.NumHWThreads,
&job.NumAcc, &job.Shared, &job.MonitoringStatus, &job.SMT, &job.State,
&job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint, &job.Energy); err != nil {
cclog.Warnf("Error while scanning rows (Job): %v", err)
if err != sql.ErrNoRows {
_, file, line, _ := runtime.Caller(1)
cclog.Warnf("Error while scanning rows (Job) (%s:%d): %v", filepath.Base(file), line, err)
}
return nil, err
}

View File

@@ -92,20 +92,33 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
jobs = append(jobs, job)
}
// Transfer cached jobs to main table and clear cache in a single transaction.
tx, err := r.DB.Beginx()
if err != nil {
cclog.Errorf("SyncJobs: begin transaction: %v", err)
return nil, err
}
defer tx.Rollback()
// Use INSERT OR IGNORE to skip jobs already transferred by the stop path
_, err = r.DB.Exec(
_, err = tx.Exec(
"INSERT OR IGNORE INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache")
if err != nil {
cclog.Errorf("Error while Job sync: %v", err)
return nil, err
}
_, err = r.DB.Exec("DELETE FROM job_cache")
_, err = tx.Exec("DELETE FROM job_cache")
if err != nil {
cclog.Errorf("Error while Job cache clean: %v", err)
return nil, err
}
if err := tx.Commit(); err != nil {
cclog.Errorf("SyncJobs: commit transaction: %v", err)
return nil, err
}
// Resolve correct job.id from the job table. The IDs read from job_cache
// are from a different auto-increment sequence and must not be used to
// query the job table.
@@ -128,7 +141,13 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
// TransferCachedJobToMain moves a job from job_cache to the job table.
// Caller must hold r.Mutex. Returns the new job table ID.
func (r *JobRepository) TransferCachedJobToMain(cacheID int64) (int64, error) {
res, err := r.DB.Exec(
tx, err := r.DB.Beginx()
if err != nil {
return 0, fmt.Errorf("TransferCachedJobToMain: begin transaction: %w", err)
}
defer tx.Rollback()
res, err := tx.Exec(
"INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache WHERE id = ?",
cacheID)
if err != nil {
@@ -140,11 +159,15 @@ func (r *JobRepository) TransferCachedJobToMain(cacheID int64) (int64, error) {
return 0, fmt.Errorf("getting new job ID after transfer failed: %w", err)
}
_, err = r.DB.Exec("DELETE FROM job_cache WHERE id = ?", cacheID)
_, err = tx.Exec("DELETE FROM job_cache WHERE id = ?", cacheID)
if err != nil {
return 0, fmt.Errorf("deleting cached job %d after transfer failed: %w", cacheID, err)
}
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("TransferCachedJobToMain: commit: %w", err)
}
return newID, nil
}

View File

@@ -244,6 +244,77 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt
return nil
}
// NodeStateUpdate holds the data needed to update one node's state in a batch operation.
type NodeStateUpdate struct {
Hostname string
Cluster string
NodeState *schema.NodeStateDB
}
// BatchUpdateNodeStates inserts node state rows for multiple nodes in a single transaction.
// For each node, it looks up (or creates) the node row, then inserts the state row.
// This reduces lock acquisitions from 2*N to 1 for N nodes.
func (r *NodeRepository) BatchUpdateNodeStates(updates []NodeStateUpdate) error {
tx, err := r.DB.Beginx()
if err != nil {
return fmt.Errorf("BatchUpdateNodeStates: begin transaction: %w", err)
}
defer tx.Rollback()
stmtLookup, err := tx.Preparex("SELECT id FROM node WHERE hostname = ? AND cluster = ?")
if err != nil {
return fmt.Errorf("BatchUpdateNodeStates: prepare lookup: %w", err)
}
defer stmtLookup.Close()
stmtInsertNode, err := tx.PrepareNamed(NamedNodeInsert)
if err != nil {
return fmt.Errorf("BatchUpdateNodeStates: prepare node insert: %w", err)
}
defer stmtInsertNode.Close()
stmtInsertState, err := tx.PrepareNamed(NamedNodeStateInsert)
if err != nil {
return fmt.Errorf("BatchUpdateNodeStates: prepare state insert: %w", err)
}
defer stmtInsertState.Close()
for _, u := range updates {
var id int64
if err := stmtLookup.QueryRow(u.Hostname, u.Cluster).Scan(&id); err != nil {
if err == sql.ErrNoRows {
subcluster, scErr := archive.GetSubClusterByNode(u.Cluster, u.Hostname)
if scErr != nil {
cclog.Errorf("BatchUpdateNodeStates: subcluster lookup for '%s' in '%s': %v", u.Hostname, u.Cluster, scErr)
continue
}
node := schema.NodeDB{
Hostname: u.Hostname, Cluster: u.Cluster, SubCluster: subcluster,
}
res, insertErr := stmtInsertNode.Exec(&node)
if insertErr != nil {
cclog.Errorf("BatchUpdateNodeStates: insert node '%s': %v", u.Hostname, insertErr)
continue
}
id, _ = res.LastInsertId()
} else {
cclog.Errorf("BatchUpdateNodeStates: lookup node '%s': %v", u.Hostname, err)
continue
}
}
u.NodeState.NodeID = id
if _, err := stmtInsertState.Exec(u.NodeState); err != nil {
cclog.Errorf("BatchUpdateNodeStates: insert state for '%s': %v", u.Hostname, err)
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("BatchUpdateNodeStates: commit: %w", err)
}
return nil
}
// func (r *NodeRepository) UpdateHealthState(hostname string, healthState *schema.MonitoringState) error {
// if _, err := sq.Update("node").Set("health_state", healthState).Where("node.id = ?", id).RunWith(r.DB).Exec(); err != nil {
// cclog.Errorf("error while updating node '%d'", id)

View File

@@ -2,6 +2,7 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
@@ -10,7 +11,9 @@ import (
"encoding/json"
"errors"
"fmt"
"path/filepath"
"reflect"
"runtime"
"strings"
"sync"
@@ -73,7 +76,11 @@ func (r *UserRepository) GetUser(username string) (*schema.User, error) {
if err := sq.Select("password", "ldap", "name", "roles", "email", "projects").From("hpc_user").
Where("hpc_user.username = ?", username).RunWith(r.DB).
QueryRow().Scan(&hashedPassword, &user.AuthSource, &name, &rawRoles, &email, &rawProjects); err != nil {
cclog.Warnf("Error while querying user '%v' from database", username)
if err != sql.ErrNoRows {
_, file, line, _ := runtime.Caller(1)
cclog.Warnf("Error while querying user '%v' from database (%s:%d): %v",
username, filepath.Base(file), line, err)
}
return nil, err
}
@@ -124,11 +131,11 @@ func (r *UserRepository) GetLdapUsernames() ([]string, error) {
// Required fields: Username, Roles
// Optional fields: Name, Email, Password, Projects, AuthSource
func (r *UserRepository) AddUser(user *schema.User) error {
rolesJson, _ := json.Marshal(user.Roles)
projectsJson, _ := json.Marshal(user.Projects)
rolesJSON, _ := json.Marshal(user.Roles)
projectsJSON, _ := json.Marshal(user.Projects)
cols := []string{"username", "roles", "projects"}
vals := []any{user.Username, string(rolesJson), string(projectsJson)}
vals := []any{user.Username, string(rolesJSON), string(projectsJSON)}
if user.Name != "" {
cols = append(cols, "name")
@@ -157,7 +164,7 @@ func (r *UserRepository) AddUser(user *schema.User) error {
return err
}
cclog.Infof("new user %#v created (roles: %s, auth-source: %d, projects: %s)", user.Username, rolesJson, user.AuthSource, projectsJson)
cclog.Infof("new user %#v created (roles: %s, auth-source: %d, projects: %s)", user.Username, rolesJSON, user.AuthSource, projectsJSON)
// DEPRECATED: SUPERSEDED BY NEW USER CONFIG - userConfig.go / web.go
defaultMetricsCfg, err := config.LoadDefaultMetricsConfig()
@@ -188,6 +195,21 @@ func (r *UserRepository) AddUser(user *schema.User) error {
return nil
}
// AddUserIfNotExists inserts a user only if the username does not already exist.
// Uses INSERT OR IGNORE to avoid UNIQUE constraint errors when a user is
// concurrently created (e.g., by a login while LDAP sync is running).
// Unlike AddUser, this intentionally skips the deprecated default metrics config insertion.
func (r *UserRepository) AddUserIfNotExists(user *schema.User) error {
rolesJSON, _ := json.Marshal(user.Roles)
projectsJSON, _ := json.Marshal(user.Projects)
cols := "username, name, roles, projects, ldap"
_, err := r.DB.Exec(
`INSERT OR IGNORE INTO hpc_user (`+cols+`) VALUES (?, ?, ?, ?, ?)`,
user.Username, user.Name, string(rolesJSON), string(projectsJSON), int(user.AuthSource))
return err
}
func (r *UserRepository) UpdateUser(dbUser *schema.User, user *schema.User) error {
// user contains updated info -> Apply to dbUser
// --- Simple Name Update ---

View File

@@ -102,7 +102,7 @@ func (sa *SqliteArchive) Init(rawConfig json.RawMessage) (uint64, error) {
"PRAGMA journal_mode=WAL",
"PRAGMA synchronous=NORMAL",
"PRAGMA cache_size=-64000", // 64MB cache
"PRAGMA busy_timeout=5000",
"PRAGMA busy_timeout=60000",
}
for _, pragma := range pragmas {
if _, err := sa.db.Exec(pragma); err != nil {

View File

@@ -288,7 +288,7 @@ func FetchData(req APIQueryRequest) (*APIQueryResponse, error) {
data.Error = &msg
res = append(res, data)
} else {
cclog.Warnf("failed to fetch '%s' from host '%s' (cluster: %s): %s", query.Metric, query.Hostname, req.Cluster, err.Error())
cclog.Debugf("failed to fetch '%s' from host '%s' (cluster: %s): %s", query.Metric, query.Hostname, req.Cluster, err.Error())
}
continue
}