diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index ce721b7..eb42287 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -325,7 +325,7 @@ func runServer(ctx context.Context) error { } // Start archiver and task manager - archiver.Start(repository.GetJobRepository()) + archiver.Start(repository.GetJobRepository(), ctx) taskManager.Start(ccconf.GetPackageConfig("cron"), ccconf.GetPackageConfig("archive")) // Initialize web UI diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index 498ffc2..520fd69 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -373,6 +373,8 @@ func (s *Server) Shutdown(ctx context.Context) { memorystore.Shutdown() } - // Then, wait for any async archivings still pending... - archiver.WaitForArchiving() + // Shutdown archiver with 10 second timeout for fast shutdown + if err := archiver.Shutdown(10 * time.Second); err != nil { + cclog.Warnf("Archiver shutdown: %v", err) + } } diff --git a/internal/api/api_test.go b/internal/api/api_test.go index bf8cd75..230613f 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -175,7 +175,7 @@ func setup(t *testing.T) *api.RestApi { t.Fatal(err) } - archiver.Start(repository.GetJobRepository()) + archiver.Start(repository.GetJobRepository(), context.Background()) if cfg := ccconf.GetPackageConfig("auth"); cfg != nil { auth.Init(&cfg) diff --git a/internal/archiver/README.md b/internal/archiver/README.md new file mode 100644 index 0000000..0fae04e --- /dev/null +++ b/internal/archiver/README.md @@ -0,0 +1,190 @@ +# Archiver Package + +The `archiver` package provides asynchronous job archiving functionality for ClusterCockpit. When jobs complete, their metric data is archived from the metric store to a persistent archive backend (filesystem, S3, SQLite, etc.). + +## Architecture + +### Producer-Consumer Pattern + +``` +┌──────────────┐ TriggerArchiving() ┌───────────────┐ +│ API Handler │ ───────────────────────▶ │ archiveChannel│ +│ (Job Stop) │ │ (buffer: 128)│ +└──────────────┘ └───────┬───────┘ + │ + ┌─────────────────────────────────┘ + │ + ▼ + ┌──────────────────────┐ + │ archivingWorker() │ + │ (goroutine) │ + └──────────┬───────────┘ + │ + ▼ + 1. Fetch job metadata + 2. Load metric data + 3. Calculate statistics + 4. Archive to backend + 5. Update database + 6. Call hooks +``` + +### Components + +- **archiveChannel**: Buffered channel (128 jobs) for async communication +- **archivePending**: WaitGroup tracking in-flight archiving operations +- **archivingWorker**: Background goroutine processing archiving requests +- **shutdownCtx**: Context for graceful cancellation during shutdown + +## Usage + +### Initialization + +```go +// Start archiver with context for shutdown control +ctx, cancel := context.WithCancel(context.Background()) +defer cancel() + +archiver.Start(jobRepository, ctx) +``` + +### Archiving a Job + +```go +// Called automatically when a job completes +archiver.TriggerArchiving(job) +``` + +The function returns immediately. Actual archiving happens in the background. + +### Graceful Shutdown + +```go +// Shutdown with 10 second timeout +if err := archiver.Shutdown(10 * time.Second); err != nil { + log.Printf("Archiver shutdown timeout: %v", err) +} +``` + +**Shutdown process:** +1. Closes channel (rejects new jobs) +2. Waits for pending jobs (up to timeout) +3. Cancels context if timeout exceeded +4. Waits for worker to exit cleanly + +## Configuration + +### Channel Buffer Size + +The archiving channel has a buffer of 128 jobs. If more than 128 jobs are queued simultaneously, `TriggerArchiving()` will block until space is available. + +To adjust: +```go +// In archiveWorker.go Start() function +archiveChannel = make(chan *schema.Job, 256) // Increase buffer +``` + +### Scope Selection + +Archive data scopes are automatically selected based on job size: + +- **Node scope**: Always included +- **Core scope**: Included for jobs with ≤8 nodes (reduces data volume for large jobs) +- **Accelerator scope**: Included if job used accelerators (`NumAcc > 0`) + +To adjust the node threshold: +```go +// In archiver.go ArchiveJob() function +if job.NumNodes <= 16 { // Change from 8 to 16 + scopes = append(scopes, schema.MetricScopeCore) +} +``` + +### Resolution + +Data is archived at the highest available resolution (typically 60s intervals). To change: + +```go +// In archiver.go ArchiveJob() function +jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 300) +// 0 = highest resolution +// 300 = 5-minute resolution +``` + +## Error Handling + +### Automatic Retry + +The archiver does **not** automatically retry failed archiving operations. If archiving fails: + +1. Error is logged +2. Job is marked as `MonitoringStatusArchivingFailed` in database +3. Worker continues processing other jobs + +### Manual Retry + +To re-archive failed jobs, query for jobs with `MonitoringStatusArchivingFailed` and call `TriggerArchiving()` again. + +## Performance Considerations + +### Single Worker Thread + +The archiver uses a single worker goroutine. For high-throughput systems: + +- Large channel buffer (128) prevents blocking +- Archiving is typically I/O bound (writing to storage) +- Single worker prevents overwhelming storage backend + +### Shutdown Timeout + +Recommended timeout values: +- **Development**: 5-10 seconds +- **Production**: 10-30 seconds +- **High-load**: 30-60 seconds + +Choose based on: +- Average archiving time per job +- Storage backend latency +- Acceptable shutdown delay + +## Monitoring + +### Logging + +The archiver logs: +- **Info**: Startup, shutdown, successful completions +- **Debug**: Individual job archiving times +- **Error**: Archiving failures with job ID and reason +- **Warn**: Shutdown timeout exceeded + +### Metrics + +Monitor these signals for archiver health: +- Jobs with `MonitoringStatusArchivingFailed` +- Time from job stop to successful archive +- Shutdown timeout occurrences + +## Thread Safety + +All exported functions are safe for concurrent use: +- `Start()` - Safe to call once +- `TriggerArchiving()` - Safe from multiple goroutines +- `Shutdown()` - Safe to call once +- `WaitForArchiving()` - Deprecated, but safe + +Internal state is protected by: +- Channel synchronization (`archiveChannel`) +- WaitGroup for pending count (`archivePending`) +- Context for cancellation (`shutdownCtx`) + +## Files + +- **archiveWorker.go**: Worker lifecycle, channel management, shutdown logic +- **archiver.go**: Core archiving logic, metric loading, statistics calculation + +## Dependencies + +- `internal/repository`: Database operations for job metadata +- `internal/metricDataDispatcher`: Loading metric data from various backends +- `pkg/archive`: Archive backend abstraction (filesystem, S3, SQLite) +- `cc-lib/schema`: Job and metric data structures diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go index 9e834b2..1bb5a49 100644 --- a/internal/archiver/archiveWorker.go +++ b/internal/archiver/archiveWorker.go @@ -2,10 +2,54 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + +// Package archiver provides asynchronous job archiving functionality for ClusterCockpit. +// +// The archiver runs a background worker goroutine that processes job archiving requests +// from a buffered channel. When jobs complete, their metric data is archived from the +// metric store to the configured archive backend (filesystem, S3, etc.). +// +// # Architecture +// +// The archiver uses a producer-consumer pattern: +// - Producer: TriggerArchiving() sends jobs to archiveChannel +// - Consumer: archivingWorker() processes jobs from the channel +// - Coordination: sync.WaitGroup tracks pending archive operations +// +// # Lifecycle +// +// 1. Start(repo, ctx) - Initialize worker with context for cancellation +// 2. TriggerArchiving(job) - Queue job for archiving (called when job stops) +// 3. archivingWorker() - Background goroutine processes jobs +// 4. Shutdown(timeout) - Graceful shutdown with timeout +// +// # Graceful Shutdown +// +// The archiver supports graceful shutdown with configurable timeout: +// - Closes channel to reject new jobs +// - Waits for pending jobs to complete (up to timeout) +// - Cancels context if timeout exceeded +// - Ensures worker goroutine exits cleanly +// +// # Example Usage +// +// // Initialize archiver +// ctx, cancel := context.WithCancel(context.Background()) +// defer cancel() +// archiver.Start(jobRepository, ctx) +// +// // Trigger archiving when job completes +// archiver.TriggerArchiving(job) +// +// // Graceful shutdown with 10 second timeout +// if err := archiver.Shutdown(10 * time.Second); err != nil { +// log.Printf("Archiver shutdown timeout: %v", err) +// } package archiver import ( "context" + "fmt" "sync" "time" @@ -19,38 +63,82 @@ var ( archivePending sync.WaitGroup archiveChannel chan *schema.Job jobRepo *repository.JobRepository + shutdownCtx context.Context + shutdownCancel context.CancelFunc + workerDone chan struct{} ) -func Start(r *repository.JobRepository) { +// Start initializes the archiver and starts the background worker goroutine. +// +// The archiver processes job archiving requests asynchronously via a buffered channel. +// Jobs are sent to the channel using TriggerArchiving() and processed by the worker. +// +// Parameters: +// - r: JobRepository instance for database operations +// - ctx: Context for cancellation (shutdown signal propagation) +// +// The worker goroutine will run until: +// - ctx is cancelled (via parent shutdown) +// - archiveChannel is closed (via Shutdown()) +// +// Must be called before TriggerArchiving(). Safe to call only once. +func Start(r *repository.JobRepository, ctx context.Context) { + shutdownCtx, shutdownCancel = context.WithCancel(ctx) archiveChannel = make(chan *schema.Job, 128) + workerDone = make(chan struct{}) jobRepo = r go archivingWorker() } -// Archiving worker thread +// archivingWorker is the background goroutine that processes job archiving requests. +// +// The worker loop: +// 1. Blocks waiting for jobs on archiveChannel or shutdown signal +// 2. Fetches job metadata from repository +// 3. Archives job data to configured backend (calls ArchiveJob) +// 4. Updates job footprint and energy metrics in database +// 5. Marks job as successfully archived +// 6. Calls job stop hooks +// +// The worker exits when: +// - shutdownCtx is cancelled (timeout during shutdown) +// - archiveChannel is closed (normal shutdown) +// +// Errors during archiving are logged and the job is marked as failed, +// but the worker continues processing other jobs. func archivingWorker() { + defer close(workerDone) + for { select { + case <-shutdownCtx.Done(): + cclog.Info("Archive worker received shutdown signal") + return + case job, ok := <-archiveChannel: if !ok { - break + cclog.Info("Archive channel closed, worker exiting") + return } + start := time.Now() // not using meta data, called to load JobMeta into Cache? // will fail if job meta not in repository if _, err := jobRepo.FetchMetadata(job); err != nil { cclog.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error()) jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed) + archivePending.Done() continue } // ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend - // TODO: Maybe use context with cancel/timeout here - jobMeta, err := ArchiveJob(job, context.Background()) + // Use shutdown context to allow cancellation + jobMeta, err := ArchiveJob(job, shutdownCtx) if err != nil { cclog.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed) + archivePending.Done() continue } @@ -58,16 +146,19 @@ func archivingWorker() { if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil { cclog.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) + archivePending.Done() continue } if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil { cclog.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) + archivePending.Done() continue } // Update the jobs database entry one last time: stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful) if err := jobRepo.Execute(stmt); err != nil { cclog.Errorf("archiving job (dbid: %d) failed at db execute: %s", job.ID, err.Error()) + archivePending.Done() continue } cclog.Debugf("archiving job %d took %s", job.JobID, time.Since(start)) @@ -75,13 +166,24 @@ func archivingWorker() { repository.CallJobStopHooks(job) archivePending.Done() - default: - continue } } } -// Trigger async archiving +// TriggerArchiving queues a job for asynchronous archiving. +// +// This function should be called when a job completes (stops) to archive its +// metric data from the metric store to the configured archive backend. +// +// The function: +// 1. Increments the pending job counter (WaitGroup) +// 2. Sends the job to the archiving channel (buffered, capacity 128) +// 3. Returns immediately (non-blocking unless channel is full) +// +// The actual archiving is performed asynchronously by the worker goroutine. +// Upon completion, the worker will decrement the pending counter. +// +// Panics if Start() has not been called first. func TriggerArchiving(job *schema.Job) { if archiveChannel == nil { cclog.Fatal("Cannot archive without archiving channel. Did you Start the archiver?") @@ -91,8 +193,58 @@ func TriggerArchiving(job *schema.Job) { archiveChannel <- job } -// Wait for background thread to finish pending archiving operations -func WaitForArchiving() { - // close channel and wait for worker to process remaining jobs - archivePending.Wait() +// Shutdown performs a graceful shutdown of the archiver with a configurable timeout. +// +// The shutdown process: +// 1. Closes archiveChannel - no new jobs will be accepted +// 2. Waits for pending jobs to complete (up to timeout duration) +// 3. If timeout is exceeded: +// - Cancels shutdownCtx to interrupt ongoing ArchiveJob operations +// - Returns error indicating timeout +// 4. Waits for worker goroutine to exit cleanly +// +// Parameters: +// - timeout: Maximum duration to wait for pending jobs to complete +// (recommended: 10-30 seconds for production) +// +// Returns: +// - nil if all jobs completed within timeout +// - error if timeout was exceeded (some jobs may not have been archived) +// +// Jobs that don't complete within the timeout will be marked as failed. +// The function always ensures the worker goroutine exits before returning. +// +// Example: +// +// if err := archiver.Shutdown(10 * time.Second); err != nil { +// log.Printf("Some jobs did not complete: %v", err) +// } +func Shutdown(timeout time.Duration) error { + cclog.Info("Initiating archiver shutdown...") + + // Close channel to signal no more jobs will be accepted + close(archiveChannel) + + // Create a channel to signal when all jobs are done + done := make(chan struct{}) + go func() { + archivePending.Wait() + close(done) + }() + + // Wait for jobs to complete or timeout + select { + case <-done: + cclog.Info("All archive jobs completed successfully") + // Wait for worker to exit + <-workerDone + return nil + case <-time.After(timeout): + cclog.Warn("Archiver shutdown timeout exceeded, cancelling remaining operations") + // Cancel any ongoing operations + shutdownCancel() + // Wait for worker to exit + <-workerDone + return fmt.Errorf("archiver shutdown timeout after %v", timeout) + } } diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index e21be13..b88199a 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -2,6 +2,7 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package archiver import ( @@ -15,7 +16,32 @@ import ( "github.com/ClusterCockpit/cc-lib/schema" ) -// Writes a running job to the job-archive +// ArchiveJob archives a completed job's metric data to the configured archive backend. +// +// This function performs the following operations: +// 1. Loads all metric data for the job from the metric data repository +// 2. Calculates job-level statistics (avg, min, max) for each metric +// 3. Stores the job metadata and metric data to the archive backend +// +// Metric data is retrieved at the highest available resolution (typically 60s) +// for the following scopes: +// - Node scope (always) +// - Core scope (for jobs with ≤8 nodes, to reduce data volume) +// - Accelerator scope (if job used accelerators) +// +// The function respects context cancellation. If ctx is cancelled (e.g., during +// shutdown timeout), the operation will be interrupted and return an error. +// +// Parameters: +// - job: The job to archive (must be a completed job) +// - ctx: Context for cancellation and timeout control +// +// Returns: +// - *schema.Job with populated Statistics field +// - error if data loading or archiving fails +// +// If config.Keys.DisableArchive is true, only job statistics are calculated +// and returned (no data is written to archive backend). func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.Job, error) { allMetrics := make([]string, 0) metricConfigs := archive.GetCluster(job.Cluster).MetricConfig diff --git a/internal/importer/normalize.go b/internal/importer/normalize.go index fc5e537..943ceb2 100644 --- a/internal/importer/normalize.go +++ b/internal/importer/normalize.go @@ -23,6 +23,11 @@ func getNormalizationFactor(v float64) (float64, int) { count := 0 scale := -3 + // Prevent infinite loop for zero or negative values + if v <= 0.0 { + return 1.0, 0 + } + if v > 1000.0 { for v > 1000.0 { v *= 1e-3 @@ -49,6 +54,11 @@ func getNormalizationFactor(v float64) (float64, int) { func getExponent(p float64) int { count := 0 + // Prevent infinite loop for infinity or NaN values + if math.IsInf(p, 0) || math.IsNaN(p) || p <= 0.0 { + return 0 + } + for p > 1.0 { p = p / 1000.0 count++