Fix: Busywait loop in archiver and slow shutdown

Remove unblocking default in select
Add shutdown handler with context and timeout
This commit is contained in:
2025-12-11 09:29:10 +01:00
parent 6325793902
commit 8d44ac90ad
7 changed files with 397 additions and 17 deletions

View File

@@ -325,7 +325,7 @@ func runServer(ctx context.Context) error {
}
// Start archiver and task manager
archiver.Start(repository.GetJobRepository())
archiver.Start(repository.GetJobRepository(), ctx)
taskManager.Start(ccconf.GetPackageConfig("cron"), ccconf.GetPackageConfig("archive"))
// Initialize web UI

View File

@@ -373,6 +373,8 @@ func (s *Server) Shutdown(ctx context.Context) {
memorystore.Shutdown()
}
// Then, wait for any async archivings still pending...
archiver.WaitForArchiving()
// Shutdown archiver with 10 second timeout for fast shutdown
if err := archiver.Shutdown(10 * time.Second); err != nil {
cclog.Warnf("Archiver shutdown: %v", err)
}
}

View File

@@ -175,7 +175,7 @@ func setup(t *testing.T) *api.RestApi {
t.Fatal(err)
}
archiver.Start(repository.GetJobRepository())
archiver.Start(repository.GetJobRepository(), context.Background())
if cfg := ccconf.GetPackageConfig("auth"); cfg != nil {
auth.Init(&cfg)

190
internal/archiver/README.md Normal file
View File

@@ -0,0 +1,190 @@
# Archiver Package
The `archiver` package provides asynchronous job archiving functionality for ClusterCockpit. When jobs complete, their metric data is archived from the metric store to a persistent archive backend (filesystem, S3, SQLite, etc.).
## Architecture
### Producer-Consumer Pattern
```
┌──────────────┐ TriggerArchiving() ┌───────────────┐
│ API Handler │ ───────────────────────▶ │ archiveChannel│
│ (Job Stop) │ │ (buffer: 128)│
└──────────────┘ └───────┬───────┘
┌─────────────────────────────────┘
┌──────────────────────┐
│ archivingWorker() │
│ (goroutine) │
└──────────┬───────────┘
1. Fetch job metadata
2. Load metric data
3. Calculate statistics
4. Archive to backend
5. Update database
6. Call hooks
```
### Components
- **archiveChannel**: Buffered channel (128 jobs) for async communication
- **archivePending**: WaitGroup tracking in-flight archiving operations
- **archivingWorker**: Background goroutine processing archiving requests
- **shutdownCtx**: Context for graceful cancellation during shutdown
## Usage
### Initialization
```go
// Start archiver with context for shutdown control
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
archiver.Start(jobRepository, ctx)
```
### Archiving a Job
```go
// Called automatically when a job completes
archiver.TriggerArchiving(job)
```
The function returns immediately. Actual archiving happens in the background.
### Graceful Shutdown
```go
// Shutdown with 10 second timeout
if err := archiver.Shutdown(10 * time.Second); err != nil {
log.Printf("Archiver shutdown timeout: %v", err)
}
```
**Shutdown process:**
1. Closes channel (rejects new jobs)
2. Waits for pending jobs (up to timeout)
3. Cancels context if timeout exceeded
4. Waits for worker to exit cleanly
## Configuration
### Channel Buffer Size
The archiving channel has a buffer of 128 jobs. If more than 128 jobs are queued simultaneously, `TriggerArchiving()` will block until space is available.
To adjust:
```go
// In archiveWorker.go Start() function
archiveChannel = make(chan *schema.Job, 256) // Increase buffer
```
### Scope Selection
Archive data scopes are automatically selected based on job size:
- **Node scope**: Always included
- **Core scope**: Included for jobs with ≤8 nodes (reduces data volume for large jobs)
- **Accelerator scope**: Included if job used accelerators (`NumAcc > 0`)
To adjust the node threshold:
```go
// In archiver.go ArchiveJob() function
if job.NumNodes <= 16 { // Change from 8 to 16
scopes = append(scopes, schema.MetricScopeCore)
}
```
### Resolution
Data is archived at the highest available resolution (typically 60s intervals). To change:
```go
// In archiver.go ArchiveJob() function
jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 300)
// 0 = highest resolution
// 300 = 5-minute resolution
```
## Error Handling
### Automatic Retry
The archiver does **not** automatically retry failed archiving operations. If archiving fails:
1. Error is logged
2. Job is marked as `MonitoringStatusArchivingFailed` in database
3. Worker continues processing other jobs
### Manual Retry
To re-archive failed jobs, query for jobs with `MonitoringStatusArchivingFailed` and call `TriggerArchiving()` again.
## Performance Considerations
### Single Worker Thread
The archiver uses a single worker goroutine. For high-throughput systems:
- Large channel buffer (128) prevents blocking
- Archiving is typically I/O bound (writing to storage)
- Single worker prevents overwhelming storage backend
### Shutdown Timeout
Recommended timeout values:
- **Development**: 5-10 seconds
- **Production**: 10-30 seconds
- **High-load**: 30-60 seconds
Choose based on:
- Average archiving time per job
- Storage backend latency
- Acceptable shutdown delay
## Monitoring
### Logging
The archiver logs:
- **Info**: Startup, shutdown, successful completions
- **Debug**: Individual job archiving times
- **Error**: Archiving failures with job ID and reason
- **Warn**: Shutdown timeout exceeded
### Metrics
Monitor these signals for archiver health:
- Jobs with `MonitoringStatusArchivingFailed`
- Time from job stop to successful archive
- Shutdown timeout occurrences
## Thread Safety
All exported functions are safe for concurrent use:
- `Start()` - Safe to call once
- `TriggerArchiving()` - Safe from multiple goroutines
- `Shutdown()` - Safe to call once
- `WaitForArchiving()` - Deprecated, but safe
Internal state is protected by:
- Channel synchronization (`archiveChannel`)
- WaitGroup for pending count (`archivePending`)
- Context for cancellation (`shutdownCtx`)
## Files
- **archiveWorker.go**: Worker lifecycle, channel management, shutdown logic
- **archiver.go**: Core archiving logic, metric loading, statistics calculation
## Dependencies
- `internal/repository`: Database operations for job metadata
- `internal/metricDataDispatcher`: Loading metric data from various backends
- `pkg/archive`: Archive backend abstraction (filesystem, S3, SQLite)
- `cc-lib/schema`: Job and metric data structures

View File

@@ -2,10 +2,54 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
// Package archiver provides asynchronous job archiving functionality for ClusterCockpit.
//
// The archiver runs a background worker goroutine that processes job archiving requests
// from a buffered channel. When jobs complete, their metric data is archived from the
// metric store to the configured archive backend (filesystem, S3, etc.).
//
// # Architecture
//
// The archiver uses a producer-consumer pattern:
// - Producer: TriggerArchiving() sends jobs to archiveChannel
// - Consumer: archivingWorker() processes jobs from the channel
// - Coordination: sync.WaitGroup tracks pending archive operations
//
// # Lifecycle
//
// 1. Start(repo, ctx) - Initialize worker with context for cancellation
// 2. TriggerArchiving(job) - Queue job for archiving (called when job stops)
// 3. archivingWorker() - Background goroutine processes jobs
// 4. Shutdown(timeout) - Graceful shutdown with timeout
//
// # Graceful Shutdown
//
// The archiver supports graceful shutdown with configurable timeout:
// - Closes channel to reject new jobs
// - Waits for pending jobs to complete (up to timeout)
// - Cancels context if timeout exceeded
// - Ensures worker goroutine exits cleanly
//
// # Example Usage
//
// // Initialize archiver
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
// archiver.Start(jobRepository, ctx)
//
// // Trigger archiving when job completes
// archiver.TriggerArchiving(job)
//
// // Graceful shutdown with 10 second timeout
// if err := archiver.Shutdown(10 * time.Second); err != nil {
// log.Printf("Archiver shutdown timeout: %v", err)
// }
package archiver
import (
"context"
"fmt"
"sync"
"time"
@@ -19,38 +63,82 @@ var (
archivePending sync.WaitGroup
archiveChannel chan *schema.Job
jobRepo *repository.JobRepository
shutdownCtx context.Context
shutdownCancel context.CancelFunc
workerDone chan struct{}
)
func Start(r *repository.JobRepository) {
// Start initializes the archiver and starts the background worker goroutine.
//
// The archiver processes job archiving requests asynchronously via a buffered channel.
// Jobs are sent to the channel using TriggerArchiving() and processed by the worker.
//
// Parameters:
// - r: JobRepository instance for database operations
// - ctx: Context for cancellation (shutdown signal propagation)
//
// The worker goroutine will run until:
// - ctx is cancelled (via parent shutdown)
// - archiveChannel is closed (via Shutdown())
//
// Must be called before TriggerArchiving(). Safe to call only once.
func Start(r *repository.JobRepository, ctx context.Context) {
shutdownCtx, shutdownCancel = context.WithCancel(ctx)
archiveChannel = make(chan *schema.Job, 128)
workerDone = make(chan struct{})
jobRepo = r
go archivingWorker()
}
// Archiving worker thread
// archivingWorker is the background goroutine that processes job archiving requests.
//
// The worker loop:
// 1. Blocks waiting for jobs on archiveChannel or shutdown signal
// 2. Fetches job metadata from repository
// 3. Archives job data to configured backend (calls ArchiveJob)
// 4. Updates job footprint and energy metrics in database
// 5. Marks job as successfully archived
// 6. Calls job stop hooks
//
// The worker exits when:
// - shutdownCtx is cancelled (timeout during shutdown)
// - archiveChannel is closed (normal shutdown)
//
// Errors during archiving are logged and the job is marked as failed,
// but the worker continues processing other jobs.
func archivingWorker() {
defer close(workerDone)
for {
select {
case <-shutdownCtx.Done():
cclog.Info("Archive worker received shutdown signal")
return
case job, ok := <-archiveChannel:
if !ok {
break
cclog.Info("Archive channel closed, worker exiting")
return
}
start := time.Now()
// not using meta data, called to load JobMeta into Cache?
// will fail if job meta not in repository
if _, err := jobRepo.FetchMetadata(job); err != nil {
cclog.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error())
jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed)
archivePending.Done()
continue
}
// ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
// TODO: Maybe use context with cancel/timeout here
jobMeta, err := ArchiveJob(job, context.Background())
// Use shutdown context to allow cancellation
jobMeta, err := ArchiveJob(job, shutdownCtx)
if err != nil {
cclog.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error())
jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed)
archivePending.Done()
continue
}
@@ -58,16 +146,19 @@ func archivingWorker() {
if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil {
cclog.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error())
archivePending.Done()
continue
}
if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil {
cclog.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error())
archivePending.Done()
continue
}
// Update the jobs database entry one last time:
stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful)
if err := jobRepo.Execute(stmt); err != nil {
cclog.Errorf("archiving job (dbid: %d) failed at db execute: %s", job.ID, err.Error())
archivePending.Done()
continue
}
cclog.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
@@ -75,13 +166,24 @@ func archivingWorker() {
repository.CallJobStopHooks(job)
archivePending.Done()
default:
continue
}
}
}
// Trigger async archiving
// TriggerArchiving queues a job for asynchronous archiving.
//
// This function should be called when a job completes (stops) to archive its
// metric data from the metric store to the configured archive backend.
//
// The function:
// 1. Increments the pending job counter (WaitGroup)
// 2. Sends the job to the archiving channel (buffered, capacity 128)
// 3. Returns immediately (non-blocking unless channel is full)
//
// The actual archiving is performed asynchronously by the worker goroutine.
// Upon completion, the worker will decrement the pending counter.
//
// Panics if Start() has not been called first.
func TriggerArchiving(job *schema.Job) {
if archiveChannel == nil {
cclog.Fatal("Cannot archive without archiving channel. Did you Start the archiver?")
@@ -91,8 +193,58 @@ func TriggerArchiving(job *schema.Job) {
archiveChannel <- job
}
// Wait for background thread to finish pending archiving operations
func WaitForArchiving() {
// close channel and wait for worker to process remaining jobs
archivePending.Wait()
// Shutdown performs a graceful shutdown of the archiver with a configurable timeout.
//
// The shutdown process:
// 1. Closes archiveChannel - no new jobs will be accepted
// 2. Waits for pending jobs to complete (up to timeout duration)
// 3. If timeout is exceeded:
// - Cancels shutdownCtx to interrupt ongoing ArchiveJob operations
// - Returns error indicating timeout
// 4. Waits for worker goroutine to exit cleanly
//
// Parameters:
// - timeout: Maximum duration to wait for pending jobs to complete
// (recommended: 10-30 seconds for production)
//
// Returns:
// - nil if all jobs completed within timeout
// - error if timeout was exceeded (some jobs may not have been archived)
//
// Jobs that don't complete within the timeout will be marked as failed.
// The function always ensures the worker goroutine exits before returning.
//
// Example:
//
// if err := archiver.Shutdown(10 * time.Second); err != nil {
// log.Printf("Some jobs did not complete: %v", err)
// }
func Shutdown(timeout time.Duration) error {
cclog.Info("Initiating archiver shutdown...")
// Close channel to signal no more jobs will be accepted
close(archiveChannel)
// Create a channel to signal when all jobs are done
done := make(chan struct{})
go func() {
archivePending.Wait()
close(done)
}()
// Wait for jobs to complete or timeout
select {
case <-done:
cclog.Info("All archive jobs completed successfully")
// Wait for worker to exit
<-workerDone
return nil
case <-time.After(timeout):
cclog.Warn("Archiver shutdown timeout exceeded, cancelling remaining operations")
// Cancel any ongoing operations
shutdownCancel()
// Wait for worker to exit
<-workerDone
return fmt.Errorf("archiver shutdown timeout after %v", timeout)
}
}

View File

@@ -2,6 +2,7 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package archiver
import (
@@ -15,7 +16,32 @@ import (
"github.com/ClusterCockpit/cc-lib/schema"
)
// Writes a running job to the job-archive
// ArchiveJob archives a completed job's metric data to the configured archive backend.
//
// This function performs the following operations:
// 1. Loads all metric data for the job from the metric data repository
// 2. Calculates job-level statistics (avg, min, max) for each metric
// 3. Stores the job metadata and metric data to the archive backend
//
// Metric data is retrieved at the highest available resolution (typically 60s)
// for the following scopes:
// - Node scope (always)
// - Core scope (for jobs with ≤8 nodes, to reduce data volume)
// - Accelerator scope (if job used accelerators)
//
// The function respects context cancellation. If ctx is cancelled (e.g., during
// shutdown timeout), the operation will be interrupted and return an error.
//
// Parameters:
// - job: The job to archive (must be a completed job)
// - ctx: Context for cancellation and timeout control
//
// Returns:
// - *schema.Job with populated Statistics field
// - error if data loading or archiving fails
//
// If config.Keys.DisableArchive is true, only job statistics are calculated
// and returned (no data is written to archive backend).
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.Job, error) {
allMetrics := make([]string, 0)
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig

View File

@@ -23,6 +23,11 @@ func getNormalizationFactor(v float64) (float64, int) {
count := 0
scale := -3
// Prevent infinite loop for zero or negative values
if v <= 0.0 {
return 1.0, 0
}
if v > 1000.0 {
for v > 1000.0 {
v *= 1e-3
@@ -49,6 +54,11 @@ func getNormalizationFactor(v float64) (float64, int) {
func getExponent(p float64) int {
count := 0
// Prevent infinite loop for infinity or NaN values
if math.IsInf(p, 0) || math.IsNaN(p) || p <= 0.0 {
return 0
}
for p > 1.0 {
p = p / 1000.0
count++