mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-12-12 18:36:16 +01:00
@@ -220,7 +220,7 @@ func addUser(userSpec string) error {
|
||||
return fmt.Errorf("adding user '%s' with roles '%s': %w", parts[0], parts[1], err)
|
||||
}
|
||||
|
||||
cclog.Printf("Add User: Added new user '%s' with roles '%s'.\n", parts[0], parts[1])
|
||||
cclog.Infof("Add User: Added new user '%s' with roles '%s'", parts[0], parts[1])
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -229,7 +229,7 @@ func delUser(username string) error {
|
||||
if err := ur.DelUser(username); err != nil {
|
||||
return fmt.Errorf("deleting user '%s': %w", username, err)
|
||||
}
|
||||
cclog.Printf("Delete User: Deleted user '%s' from DB.\n", username)
|
||||
cclog.Infof("Delete User: Deleted user '%s' from DB", username)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -262,7 +262,7 @@ func generateJWT(authHandle *auth.Authentication, username string) error {
|
||||
return fmt.Errorf("generating JWT for user '%s': %w", user.Username, err)
|
||||
}
|
||||
|
||||
cclog.Printf("JWT: Successfully generated JWT for user '%s': %s\n", user.Username, jwt)
|
||||
cclog.Infof("JWT: Successfully generated JWT for user '%s': %s", user.Username, jwt)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -294,7 +294,7 @@ func initSubsystems() error {
|
||||
if err := importer.HandleImportFlag(flagImportJob); err != nil {
|
||||
return fmt.Errorf("importing job: %w", err)
|
||||
}
|
||||
cclog.Printf("Import Job: Imported Job '%s' into DB.\n", flagImportJob)
|
||||
cclog.Infof("Import Job: Imported Job '%s' into DB", flagImportJob)
|
||||
}
|
||||
|
||||
// Initialize taggers
|
||||
@@ -325,7 +325,7 @@ func runServer(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// Start archiver and task manager
|
||||
archiver.Start(repository.GetJobRepository())
|
||||
archiver.Start(repository.GetJobRepository(), ctx)
|
||||
taskManager.Start(ccconf.GetPackageConfig("cron"), ccconf.GetPackageConfig("archive"))
|
||||
|
||||
// Initialize web UI
|
||||
|
||||
@@ -330,9 +330,9 @@ func (s *Server) Start(ctx context.Context) error {
|
||||
MinVersion: tls.VersionTLS12,
|
||||
PreferServerCipherSuites: true,
|
||||
})
|
||||
cclog.Printf("HTTPS server listening at %s...\n", config.Keys.Addr)
|
||||
cclog.Infof("HTTPS server listening at %s...", config.Keys.Addr)
|
||||
} else {
|
||||
cclog.Printf("HTTP server listening at %s...\n", config.Keys.Addr)
|
||||
cclog.Infof("HTTP server listening at %s...", config.Keys.Addr)
|
||||
}
|
||||
//
|
||||
// Because this program will want to bind to a privileged port (like 80), the listener must
|
||||
@@ -373,6 +373,8 @@ func (s *Server) Shutdown(ctx context.Context) {
|
||||
memorystore.Shutdown()
|
||||
}
|
||||
|
||||
// Then, wait for any async archivings still pending...
|
||||
archiver.WaitForArchiving()
|
||||
// Shutdown archiver with 10 second timeout for fast shutdown
|
||||
if err := archiver.Shutdown(10 * time.Second); err != nil {
|
||||
cclog.Warnf("Archiver shutdown: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -175,7 +175,7 @@ func setup(t *testing.T) *api.RestApi {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
archiver.Start(repository.GetJobRepository())
|
||||
archiver.Start(repository.GetJobRepository(), context.Background())
|
||||
|
||||
if cfg := ccconf.GetPackageConfig("auth"); cfg != nil {
|
||||
auth.Init(&cfg)
|
||||
@@ -190,6 +190,10 @@ func setup(t *testing.T) *api.RestApi {
|
||||
}
|
||||
|
||||
func cleanup() {
|
||||
// Gracefully shutdown archiver with timeout
|
||||
if err := archiver.Shutdown(5 * time.Second); err != nil {
|
||||
cclog.Warnf("Archiver shutdown timeout in tests: %v", err)
|
||||
}
|
||||
// TODO: Clear all caches, reset all modules, etc...
|
||||
}
|
||||
|
||||
@@ -333,7 +337,7 @@ func TestRestApi(t *testing.T) {
|
||||
t.Fatal(response.Status, recorder.Body.String())
|
||||
}
|
||||
|
||||
archiver.WaitForArchiving()
|
||||
// Archiving happens asynchronously, will be completed in cleanup
|
||||
job, err := restapi.JobRepository.Find(&TestJobId, &TestClusterName, &TestStartTime)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -446,7 +450,7 @@ func TestRestApi(t *testing.T) {
|
||||
t.Fatal(response.Status, recorder.Body.String())
|
||||
}
|
||||
|
||||
archiver.WaitForArchiving()
|
||||
// Archiving happens asynchronously, will be completed in cleanup
|
||||
jobid, cluster := int64(12345), "testcluster"
|
||||
job, err := restapi.JobRepository.Find(&jobid, &cluster, nil)
|
||||
if err != nil {
|
||||
|
||||
@@ -669,7 +669,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
cclog.Printf("REST: %s\n", req.GoString())
|
||||
cclog.Debugf("REST: %s", req.GoString())
|
||||
req.State = schema.JobStateRunning
|
||||
|
||||
if err := importer.SanityChecks(&req); err != nil {
|
||||
@@ -713,7 +713,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
cclog.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime)
|
||||
cclog.Infof("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime)
|
||||
rw.Header().Add("Content-Type", "application/json")
|
||||
rw.WriteHeader(http.StatusCreated)
|
||||
if err := json.NewEncoder(rw).Encode(DefaultApiResponse{
|
||||
@@ -946,7 +946,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
|
||||
}
|
||||
}
|
||||
|
||||
cclog.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State)
|
||||
cclog.Infof("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State)
|
||||
|
||||
// Send a response (with status OK). This means that errors that happen from here on forward
|
||||
// can *NOT* be communicated to the client. If reading from a MetricDataRepository or
|
||||
|
||||
190
internal/archiver/README.md
Normal file
190
internal/archiver/README.md
Normal file
@@ -0,0 +1,190 @@
|
||||
# Archiver Package
|
||||
|
||||
The `archiver` package provides asynchronous job archiving functionality for ClusterCockpit. When jobs complete, their metric data is archived from the metric store to a persistent archive backend (filesystem, S3, SQLite, etc.).
|
||||
|
||||
## Architecture
|
||||
|
||||
### Producer-Consumer Pattern
|
||||
|
||||
```
|
||||
┌──────────────┐ TriggerArchiving() ┌───────────────┐
|
||||
│ API Handler │ ───────────────────────▶ │ archiveChannel│
|
||||
│ (Job Stop) │ │ (buffer: 128)│
|
||||
└──────────────┘ └───────┬───────┘
|
||||
│
|
||||
┌─────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌──────────────────────┐
|
||||
│ archivingWorker() │
|
||||
│ (goroutine) │
|
||||
└──────────┬───────────┘
|
||||
│
|
||||
▼
|
||||
1. Fetch job metadata
|
||||
2. Load metric data
|
||||
3. Calculate statistics
|
||||
4. Archive to backend
|
||||
5. Update database
|
||||
6. Call hooks
|
||||
```
|
||||
|
||||
### Components
|
||||
|
||||
- **archiveChannel**: Buffered channel (128 jobs) for async communication
|
||||
- **archivePending**: WaitGroup tracking in-flight archiving operations
|
||||
- **archivingWorker**: Background goroutine processing archiving requests
|
||||
- **shutdownCtx**: Context for graceful cancellation during shutdown
|
||||
|
||||
## Usage
|
||||
|
||||
### Initialization
|
||||
|
||||
```go
|
||||
// Start archiver with context for shutdown control
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
archiver.Start(jobRepository, ctx)
|
||||
```
|
||||
|
||||
### Archiving a Job
|
||||
|
||||
```go
|
||||
// Called automatically when a job completes
|
||||
archiver.TriggerArchiving(job)
|
||||
```
|
||||
|
||||
The function returns immediately. Actual archiving happens in the background.
|
||||
|
||||
### Graceful Shutdown
|
||||
|
||||
```go
|
||||
// Shutdown with 10 second timeout
|
||||
if err := archiver.Shutdown(10 * time.Second); err != nil {
|
||||
log.Printf("Archiver shutdown timeout: %v", err)
|
||||
}
|
||||
```
|
||||
|
||||
**Shutdown process:**
|
||||
1. Closes channel (rejects new jobs)
|
||||
2. Waits for pending jobs (up to timeout)
|
||||
3. Cancels context if timeout exceeded
|
||||
4. Waits for worker to exit cleanly
|
||||
|
||||
## Configuration
|
||||
|
||||
### Channel Buffer Size
|
||||
|
||||
The archiving channel has a buffer of 128 jobs. If more than 128 jobs are queued simultaneously, `TriggerArchiving()` will block until space is available.
|
||||
|
||||
To adjust:
|
||||
```go
|
||||
// In archiveWorker.go Start() function
|
||||
archiveChannel = make(chan *schema.Job, 256) // Increase buffer
|
||||
```
|
||||
|
||||
### Scope Selection
|
||||
|
||||
Archive data scopes are automatically selected based on job size:
|
||||
|
||||
- **Node scope**: Always included
|
||||
- **Core scope**: Included for jobs with ≤8 nodes (reduces data volume for large jobs)
|
||||
- **Accelerator scope**: Included if job used accelerators (`NumAcc > 0`)
|
||||
|
||||
To adjust the node threshold:
|
||||
```go
|
||||
// In archiver.go ArchiveJob() function
|
||||
if job.NumNodes <= 16 { // Change from 8 to 16
|
||||
scopes = append(scopes, schema.MetricScopeCore)
|
||||
}
|
||||
```
|
||||
|
||||
### Resolution
|
||||
|
||||
Data is archived at the highest available resolution (typically 60s intervals). To change:
|
||||
|
||||
```go
|
||||
// In archiver.go ArchiveJob() function
|
||||
jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 300)
|
||||
// 0 = highest resolution
|
||||
// 300 = 5-minute resolution
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
|
||||
### Automatic Retry
|
||||
|
||||
The archiver does **not** automatically retry failed archiving operations. If archiving fails:
|
||||
|
||||
1. Error is logged
|
||||
2. Job is marked as `MonitoringStatusArchivingFailed` in database
|
||||
3. Worker continues processing other jobs
|
||||
|
||||
### Manual Retry
|
||||
|
||||
To re-archive failed jobs, query for jobs with `MonitoringStatusArchivingFailed` and call `TriggerArchiving()` again.
|
||||
|
||||
## Performance Considerations
|
||||
|
||||
### Single Worker Thread
|
||||
|
||||
The archiver uses a single worker goroutine. For high-throughput systems:
|
||||
|
||||
- Large channel buffer (128) prevents blocking
|
||||
- Archiving is typically I/O bound (writing to storage)
|
||||
- Single worker prevents overwhelming storage backend
|
||||
|
||||
### Shutdown Timeout
|
||||
|
||||
Recommended timeout values:
|
||||
- **Development**: 5-10 seconds
|
||||
- **Production**: 10-30 seconds
|
||||
- **High-load**: 30-60 seconds
|
||||
|
||||
Choose based on:
|
||||
- Average archiving time per job
|
||||
- Storage backend latency
|
||||
- Acceptable shutdown delay
|
||||
|
||||
## Monitoring
|
||||
|
||||
### Logging
|
||||
|
||||
The archiver logs:
|
||||
- **Info**: Startup, shutdown, successful completions
|
||||
- **Debug**: Individual job archiving times
|
||||
- **Error**: Archiving failures with job ID and reason
|
||||
- **Warn**: Shutdown timeout exceeded
|
||||
|
||||
### Metrics
|
||||
|
||||
Monitor these signals for archiver health:
|
||||
- Jobs with `MonitoringStatusArchivingFailed`
|
||||
- Time from job stop to successful archive
|
||||
- Shutdown timeout occurrences
|
||||
|
||||
## Thread Safety
|
||||
|
||||
All exported functions are safe for concurrent use:
|
||||
- `Start()` - Safe to call once
|
||||
- `TriggerArchiving()` - Safe from multiple goroutines
|
||||
- `Shutdown()` - Safe to call once
|
||||
- `WaitForArchiving()` - Deprecated, but safe
|
||||
|
||||
Internal state is protected by:
|
||||
- Channel synchronization (`archiveChannel`)
|
||||
- WaitGroup for pending count (`archivePending`)
|
||||
- Context for cancellation (`shutdownCtx`)
|
||||
|
||||
## Files
|
||||
|
||||
- **archiveWorker.go**: Worker lifecycle, channel management, shutdown logic
|
||||
- **archiver.go**: Core archiving logic, metric loading, statistics calculation
|
||||
|
||||
## Dependencies
|
||||
|
||||
- `internal/repository`: Database operations for job metadata
|
||||
- `internal/metricDataDispatcher`: Loading metric data from various backends
|
||||
- `pkg/archive`: Archive backend abstraction (filesystem, S3, SQLite)
|
||||
- `cc-lib/schema`: Job and metric data structures
|
||||
@@ -2,10 +2,54 @@
|
||||
// All rights reserved. This file is part of cc-backend.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package archiver provides asynchronous job archiving functionality for ClusterCockpit.
|
||||
//
|
||||
// The archiver runs a background worker goroutine that processes job archiving requests
|
||||
// from a buffered channel. When jobs complete, their metric data is archived from the
|
||||
// metric store to the configured archive backend (filesystem, S3, etc.).
|
||||
//
|
||||
// # Architecture
|
||||
//
|
||||
// The archiver uses a producer-consumer pattern:
|
||||
// - Producer: TriggerArchiving() sends jobs to archiveChannel
|
||||
// - Consumer: archivingWorker() processes jobs from the channel
|
||||
// - Coordination: sync.WaitGroup tracks pending archive operations
|
||||
//
|
||||
// # Lifecycle
|
||||
//
|
||||
// 1. Start(repo, ctx) - Initialize worker with context for cancellation
|
||||
// 2. TriggerArchiving(job) - Queue job for archiving (called when job stops)
|
||||
// 3. archivingWorker() - Background goroutine processes jobs
|
||||
// 4. Shutdown(timeout) - Graceful shutdown with timeout
|
||||
//
|
||||
// # Graceful Shutdown
|
||||
//
|
||||
// The archiver supports graceful shutdown with configurable timeout:
|
||||
// - Closes channel to reject new jobs
|
||||
// - Waits for pending jobs to complete (up to timeout)
|
||||
// - Cancels context if timeout exceeded
|
||||
// - Ensures worker goroutine exits cleanly
|
||||
//
|
||||
// # Example Usage
|
||||
//
|
||||
// // Initialize archiver
|
||||
// ctx, cancel := context.WithCancel(context.Background())
|
||||
// defer cancel()
|
||||
// archiver.Start(jobRepository, ctx)
|
||||
//
|
||||
// // Trigger archiving when job completes
|
||||
// archiver.TriggerArchiving(job)
|
||||
//
|
||||
// // Graceful shutdown with 10 second timeout
|
||||
// if err := archiver.Shutdown(10 * time.Second); err != nil {
|
||||
// log.Printf("Archiver shutdown timeout: %v", err)
|
||||
// }
|
||||
package archiver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -19,38 +63,82 @@ var (
|
||||
archivePending sync.WaitGroup
|
||||
archiveChannel chan *schema.Job
|
||||
jobRepo *repository.JobRepository
|
||||
shutdownCtx context.Context
|
||||
shutdownCancel context.CancelFunc
|
||||
workerDone chan struct{}
|
||||
)
|
||||
|
||||
func Start(r *repository.JobRepository) {
|
||||
// Start initializes the archiver and starts the background worker goroutine.
|
||||
//
|
||||
// The archiver processes job archiving requests asynchronously via a buffered channel.
|
||||
// Jobs are sent to the channel using TriggerArchiving() and processed by the worker.
|
||||
//
|
||||
// Parameters:
|
||||
// - r: JobRepository instance for database operations
|
||||
// - ctx: Context for cancellation (shutdown signal propagation)
|
||||
//
|
||||
// The worker goroutine will run until:
|
||||
// - ctx is cancelled (via parent shutdown)
|
||||
// - archiveChannel is closed (via Shutdown())
|
||||
//
|
||||
// Must be called before TriggerArchiving(). Safe to call only once.
|
||||
func Start(r *repository.JobRepository, ctx context.Context) {
|
||||
shutdownCtx, shutdownCancel = context.WithCancel(ctx)
|
||||
archiveChannel = make(chan *schema.Job, 128)
|
||||
workerDone = make(chan struct{})
|
||||
jobRepo = r
|
||||
|
||||
go archivingWorker()
|
||||
}
|
||||
|
||||
// Archiving worker thread
|
||||
// archivingWorker is the background goroutine that processes job archiving requests.
|
||||
//
|
||||
// The worker loop:
|
||||
// 1. Blocks waiting for jobs on archiveChannel or shutdown signal
|
||||
// 2. Fetches job metadata from repository
|
||||
// 3. Archives job data to configured backend (calls ArchiveJob)
|
||||
// 4. Updates job footprint and energy metrics in database
|
||||
// 5. Marks job as successfully archived
|
||||
// 6. Calls job stop hooks
|
||||
//
|
||||
// The worker exits when:
|
||||
// - shutdownCtx is cancelled (timeout during shutdown)
|
||||
// - archiveChannel is closed (normal shutdown)
|
||||
//
|
||||
// Errors during archiving are logged and the job is marked as failed,
|
||||
// but the worker continues processing other jobs.
|
||||
func archivingWorker() {
|
||||
defer close(workerDone)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-shutdownCtx.Done():
|
||||
cclog.Info("Archive worker received shutdown signal")
|
||||
return
|
||||
|
||||
case job, ok := <-archiveChannel:
|
||||
if !ok {
|
||||
break
|
||||
cclog.Info("Archive channel closed, worker exiting")
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
// not using meta data, called to load JobMeta into Cache?
|
||||
// will fail if job meta not in repository
|
||||
if _, err := jobRepo.FetchMetadata(job); err != nil {
|
||||
cclog.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error())
|
||||
jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed)
|
||||
archivePending.Done()
|
||||
continue
|
||||
}
|
||||
|
||||
// ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
|
||||
// TODO: Maybe use context with cancel/timeout here
|
||||
jobMeta, err := ArchiveJob(job, context.Background())
|
||||
// Use shutdown context to allow cancellation
|
||||
jobMeta, err := ArchiveJob(job, shutdownCtx)
|
||||
if err != nil {
|
||||
cclog.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error())
|
||||
jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed)
|
||||
archivePending.Done()
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -58,30 +146,44 @@ func archivingWorker() {
|
||||
|
||||
if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil {
|
||||
cclog.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error())
|
||||
archivePending.Done()
|
||||
continue
|
||||
}
|
||||
if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil {
|
||||
cclog.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error())
|
||||
archivePending.Done()
|
||||
continue
|
||||
}
|
||||
// Update the jobs database entry one last time:
|
||||
stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful)
|
||||
if err := jobRepo.Execute(stmt); err != nil {
|
||||
cclog.Errorf("archiving job (dbid: %d) failed at db execute: %s", job.ID, err.Error())
|
||||
archivePending.Done()
|
||||
continue
|
||||
}
|
||||
cclog.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
|
||||
cclog.Printf("archiving job (dbid: %d) successful", job.ID)
|
||||
cclog.Infof("archiving job (dbid: %d) successful", job.ID)
|
||||
|
||||
repository.CallJobStopHooks(job)
|
||||
archivePending.Done()
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Trigger async archiving
|
||||
// TriggerArchiving queues a job for asynchronous archiving.
|
||||
//
|
||||
// This function should be called when a job completes (stops) to archive its
|
||||
// metric data from the metric store to the configured archive backend.
|
||||
//
|
||||
// The function:
|
||||
// 1. Increments the pending job counter (WaitGroup)
|
||||
// 2. Sends the job to the archiving channel (buffered, capacity 128)
|
||||
// 3. Returns immediately (non-blocking unless channel is full)
|
||||
//
|
||||
// The actual archiving is performed asynchronously by the worker goroutine.
|
||||
// Upon completion, the worker will decrement the pending counter.
|
||||
//
|
||||
// Panics if Start() has not been called first.
|
||||
func TriggerArchiving(job *schema.Job) {
|
||||
if archiveChannel == nil {
|
||||
cclog.Fatal("Cannot archive without archiving channel. Did you Start the archiver?")
|
||||
@@ -91,8 +193,58 @@ func TriggerArchiving(job *schema.Job) {
|
||||
archiveChannel <- job
|
||||
}
|
||||
|
||||
// Wait for background thread to finish pending archiving operations
|
||||
func WaitForArchiving() {
|
||||
// close channel and wait for worker to process remaining jobs
|
||||
archivePending.Wait()
|
||||
// Shutdown performs a graceful shutdown of the archiver with a configurable timeout.
|
||||
//
|
||||
// The shutdown process:
|
||||
// 1. Closes archiveChannel - no new jobs will be accepted
|
||||
// 2. Waits for pending jobs to complete (up to timeout duration)
|
||||
// 3. If timeout is exceeded:
|
||||
// - Cancels shutdownCtx to interrupt ongoing ArchiveJob operations
|
||||
// - Returns error indicating timeout
|
||||
// 4. Waits for worker goroutine to exit cleanly
|
||||
//
|
||||
// Parameters:
|
||||
// - timeout: Maximum duration to wait for pending jobs to complete
|
||||
// (recommended: 10-30 seconds for production)
|
||||
//
|
||||
// Returns:
|
||||
// - nil if all jobs completed within timeout
|
||||
// - error if timeout was exceeded (some jobs may not have been archived)
|
||||
//
|
||||
// Jobs that don't complete within the timeout will be marked as failed.
|
||||
// The function always ensures the worker goroutine exits before returning.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// if err := archiver.Shutdown(10 * time.Second); err != nil {
|
||||
// log.Printf("Some jobs did not complete: %v", err)
|
||||
// }
|
||||
func Shutdown(timeout time.Duration) error {
|
||||
cclog.Info("Initiating archiver shutdown...")
|
||||
|
||||
// Close channel to signal no more jobs will be accepted
|
||||
close(archiveChannel)
|
||||
|
||||
// Create a channel to signal when all jobs are done
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
archivePending.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Wait for jobs to complete or timeout
|
||||
select {
|
||||
case <-done:
|
||||
cclog.Info("All archive jobs completed successfully")
|
||||
// Wait for worker to exit
|
||||
<-workerDone
|
||||
return nil
|
||||
case <-time.After(timeout):
|
||||
cclog.Warn("Archiver shutdown timeout exceeded, cancelling remaining operations")
|
||||
// Cancel any ongoing operations
|
||||
shutdownCancel()
|
||||
// Wait for worker to exit
|
||||
<-workerDone
|
||||
return fmt.Errorf("archiver shutdown timeout after %v", timeout)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
// All rights reserved. This file is part of cc-backend.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package archiver
|
||||
|
||||
import (
|
||||
@@ -15,7 +16,32 @@ import (
|
||||
"github.com/ClusterCockpit/cc-lib/schema"
|
||||
)
|
||||
|
||||
// Writes a running job to the job-archive
|
||||
// ArchiveJob archives a completed job's metric data to the configured archive backend.
|
||||
//
|
||||
// This function performs the following operations:
|
||||
// 1. Loads all metric data for the job from the metric data repository
|
||||
// 2. Calculates job-level statistics (avg, min, max) for each metric
|
||||
// 3. Stores the job metadata and metric data to the archive backend
|
||||
//
|
||||
// Metric data is retrieved at the highest available resolution (typically 60s)
|
||||
// for the following scopes:
|
||||
// - Node scope (always)
|
||||
// - Core scope (for jobs with ≤8 nodes, to reduce data volume)
|
||||
// - Accelerator scope (if job used accelerators)
|
||||
//
|
||||
// The function respects context cancellation. If ctx is cancelled (e.g., during
|
||||
// shutdown timeout), the operation will be interrupted and return an error.
|
||||
//
|
||||
// Parameters:
|
||||
// - job: The job to archive (must be a completed job)
|
||||
// - ctx: Context for cancellation and timeout control
|
||||
//
|
||||
// Returns:
|
||||
// - *schema.Job with populated Statistics field
|
||||
// - error if data loading or archiving fails
|
||||
//
|
||||
// If config.Keys.DisableArchive is true, only job statistics are calculated
|
||||
// and returned (no data is written to archive backend).
|
||||
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.Job, error) {
|
||||
allMetrics := make([]string, 0)
|
||||
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
|
||||
|
||||
@@ -74,6 +74,11 @@ func InitDB() error {
|
||||
for jobContainer := range ar.Iter(false) {
|
||||
|
||||
jobMeta := jobContainer.Meta
|
||||
if jobMeta == nil {
|
||||
cclog.Warn("skipping job with nil metadata")
|
||||
errorOccured++
|
||||
continue
|
||||
}
|
||||
|
||||
// Bundle 100 inserts into one transaction for better performance
|
||||
if i%100 == 0 {
|
||||
@@ -144,7 +149,7 @@ func InitDB() error {
|
||||
}
|
||||
|
||||
r.TransactionEnd(t)
|
||||
cclog.Printf("A total of %d jobs have been registered in %.3f seconds.\n", i, time.Since(starttime).Seconds())
|
||||
cclog.Infof("A total of %d jobs have been registered in %.3f seconds.", i, time.Since(starttime).Seconds())
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -23,6 +23,11 @@ func getNormalizationFactor(v float64) (float64, int) {
|
||||
count := 0
|
||||
scale := -3
|
||||
|
||||
// Prevent infinite loop for zero or negative values
|
||||
if v <= 0.0 {
|
||||
return 1.0, 0
|
||||
}
|
||||
|
||||
if v > 1000.0 {
|
||||
for v > 1000.0 {
|
||||
v *= 1e-3
|
||||
@@ -49,6 +54,11 @@ func getNormalizationFactor(v float64) (float64, int) {
|
||||
func getExponent(p float64) int {
|
||||
count := 0
|
||||
|
||||
// Prevent infinite loop for infinity or NaN values
|
||||
if math.IsInf(p, 0) || math.IsNaN(p) || p <= 0.0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
for p > 1.0 {
|
||||
p = p / 1000.0
|
||||
count++
|
||||
|
||||
@@ -44,14 +44,14 @@ func Archiving(wg *sync.WaitGroup, ctx context.Context) {
|
||||
return
|
||||
case <-ticks:
|
||||
t := time.Now().Add(-d)
|
||||
cclog.Printf("[METRICSTORE]> start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339))
|
||||
cclog.Infof("[METRICSTORE]> start archiving checkpoints (older than %s)...", t.Format(time.RFC3339))
|
||||
n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir,
|
||||
Keys.Archive.RootDir, t.Unix(), Keys.Archive.DeleteInstead)
|
||||
|
||||
if err != nil {
|
||||
cclog.Printf("[METRICSTORE]> archiving failed: %s\n", err.Error())
|
||||
cclog.Errorf("[METRICSTORE]> archiving failed: %s", err.Error())
|
||||
} else {
|
||||
cclog.Printf("[METRICSTORE]> done: %d files zipped and moved to archive\n", n)
|
||||
cclog.Infof("[METRICSTORE]> done: %d files zipped and moved to archive", n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,14 +82,14 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticks:
|
||||
cclog.Printf("[METRICSTORE]> start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339))
|
||||
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", lastCheckpoint.Format(time.RFC3339))
|
||||
now := time.Now()
|
||||
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir,
|
||||
lastCheckpoint.Unix(), now.Unix())
|
||||
if err != nil {
|
||||
cclog.Printf("[METRICSTORE]> checkpointing failed: %s\n", err.Error())
|
||||
cclog.Errorf("[METRICSTORE]> checkpointing failed: %s", err.Error())
|
||||
} else {
|
||||
cclog.Printf("[METRICSTORE]> done: %d checkpoint files created\n", n)
|
||||
cclog.Infof("[METRICSTORE]> done: %d checkpoint files created", n)
|
||||
lastCheckpoint = now
|
||||
}
|
||||
}
|
||||
@@ -194,7 +194,7 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) {
|
||||
continue
|
||||
}
|
||||
|
||||
cclog.Printf("[METRICSTORE]> error while checkpointing %#v: %s", workItem.selector, err.Error())
|
||||
cclog.Errorf("[METRICSTORE]> error while checkpointing %#v: %s", workItem.selector, err.Error())
|
||||
atomic.AddInt32(&errs, 1)
|
||||
} else {
|
||||
atomic.AddInt32(&n, 1)
|
||||
@@ -394,7 +394,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
|
||||
if err != nil {
|
||||
cclog.Fatalf("[METRICSTORE]> Error creating directory: %#v\n", err)
|
||||
}
|
||||
cclog.Printf("[METRICSTORE]> %#v Directory created successfully.\n", dir)
|
||||
cclog.Debugf("[METRICSTORE]> %#v Directory created successfully", dir)
|
||||
}
|
||||
|
||||
// Config read (replace with your actual config read)
|
||||
@@ -413,7 +413,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
|
||||
if found, err := checkFilesWithExtension(dir, fileFormat); err != nil {
|
||||
return 0, fmt.Errorf("[METRICSTORE]> error checking files with extension: %v", err)
|
||||
} else if found {
|
||||
cclog.Printf("[METRICSTORE]> Loading %s files because fileformat is %s\n", fileFormat, fileFormat)
|
||||
cclog.Infof("[METRICSTORE]> Loading %s files because fileformat is %s", fileFormat, fileFormat)
|
||||
return m.FromCheckpoint(dir, from, fileFormat)
|
||||
}
|
||||
|
||||
@@ -422,7 +422,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
|
||||
if found, err := checkFilesWithExtension(dir, altFormat); err != nil {
|
||||
return 0, fmt.Errorf("[METRICSTORE]> error checking files with extension: %v", err)
|
||||
} else if found {
|
||||
cclog.Printf("[METRICSTORE]> Loading %s files but fileformat is %s\n", altFormat, fileFormat)
|
||||
cclog.Infof("[METRICSTORE]> Loading %s files but fileformat is %s", altFormat, fileFormat)
|
||||
return m.FromCheckpoint(dir, from, altFormat)
|
||||
}
|
||||
|
||||
|
||||
@@ -119,7 +119,7 @@ func ReceiveNats(conf *(NatsConfig),
|
||||
for m := range msgs {
|
||||
dec := lineprotocol.NewDecoderWithBytes(m.Data)
|
||||
if err := DecodeLine(dec, ms, clusterTag); err != nil {
|
||||
cclog.Printf("error: %s\n", err.Error())
|
||||
cclog.Errorf("error: %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,7 +134,7 @@ func ReceiveNats(conf *(NatsConfig),
|
||||
sub, err = nc.Subscribe(sc.SubscribeTo, func(m *nats.Msg) {
|
||||
dec := lineprotocol.NewDecoderWithBytes(m.Data)
|
||||
if err := DecodeLine(dec, ms, clusterTag); err != nil {
|
||||
cclog.Printf("error: %s\n", err.Error())
|
||||
cclog.Errorf("error: %s", err.Error())
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -142,7 +142,7 @@ func ReceiveNats(conf *(NatsConfig),
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cclog.Printf("NATS subscription to '%s' on '%s' established\n", sc.SubscribeTo, conf.Address)
|
||||
cclog.Infof("NATS subscription to '%s' on '%s' established", sc.SubscribeTo, conf.Address)
|
||||
subs = append(subs, sub)
|
||||
}
|
||||
|
||||
@@ -150,7 +150,7 @@ func ReceiveNats(conf *(NatsConfig),
|
||||
for _, sub := range subs {
|
||||
err = sub.Unsubscribe()
|
||||
if err != nil {
|
||||
cclog.Printf("NATS unsubscribe failed: %s", err.Error())
|
||||
cclog.Errorf("NATS unsubscribe failed: %s", err.Error())
|
||||
}
|
||||
}
|
||||
close(msgs)
|
||||
|
||||
@@ -27,7 +27,7 @@ func RegisterLdapSyncService(ds string) {
|
||||
gocron.NewTask(
|
||||
func() {
|
||||
t := time.Now()
|
||||
cclog.Printf("ldap sync started at %s", t.Format(time.RFC3339))
|
||||
cclog.Infof("ldap sync started at %s", t.Format(time.RFC3339))
|
||||
if err := auth.LdapAuth.Sync(); err != nil {
|
||||
cclog.Errorf("ldap sync failed: %s", err.Error())
|
||||
}
|
||||
|
||||
@@ -25,8 +25,8 @@ func RegisterUpdateDurationWorker() {
|
||||
gocron.NewTask(
|
||||
func() {
|
||||
start := time.Now()
|
||||
cclog.Printf("Update duration started at %s\n", start.Format(time.RFC3339))
|
||||
cclog.Infof("Update duration started at %s", start.Format(time.RFC3339))
|
||||
jobRepo.UpdateDuration()
|
||||
cclog.Printf("Update duration is done and took %s\n", time.Since(start))
|
||||
cclog.Infof("Update duration is done and took %s", time.Since(start))
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ func RegisterFootprintWorker() {
|
||||
c := 0
|
||||
ce := 0
|
||||
cl := 0
|
||||
cclog.Printf("Update Footprints started at %s\n", s.Format(time.RFC3339))
|
||||
cclog.Infof("Update Footprints started at %s", s.Format(time.RFC3339))
|
||||
|
||||
for _, cluster := range archive.Clusters {
|
||||
s_cluster := time.Now()
|
||||
@@ -136,6 +136,6 @@ func RegisterFootprintWorker() {
|
||||
}
|
||||
cclog.Debugf("Finish Cluster %s, took %s\n", cluster.Name, time.Since(s_cluster))
|
||||
}
|
||||
cclog.Printf("Updating %d (of %d; Skipped %d) Footprints is done and took %s\n", c, cl, ce, time.Since(s))
|
||||
cclog.Infof("Updating %d (of %d; Skipped %d) Footprints is done and took %s", c, cl, ce, time.Since(s))
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -50,7 +50,7 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, err
|
||||
|
||||
// Create channels for job distribution
|
||||
jobs := make(chan archive.JobContainer, numWorkers*2)
|
||||
|
||||
|
||||
// WaitGroup to track worker completion
|
||||
var wg sync.WaitGroup
|
||||
|
||||
@@ -127,8 +127,6 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, err
|
||||
return finalImported, finalFailed, nil
|
||||
}
|
||||
|
||||
|
||||
|
||||
func main() {
|
||||
var srcPath, flagConfigFile, flagLogLevel, flagRemoveCluster, flagRemoveAfter, flagRemoveBefore string
|
||||
var flagSrcConfig, flagDstConfig string
|
||||
@@ -147,7 +145,6 @@ func main() {
|
||||
flag.StringVar(&flagDstConfig, "dst-config", "", "Destination archive backend configuration (JSON), e.g. '{\"kind\":\"sqlite\",\"dbPath\":\"./archive.db\"}'")
|
||||
flag.Parse()
|
||||
|
||||
|
||||
archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", srcPath)
|
||||
|
||||
cclog.Init(flagLogLevel, flagLogDateTime)
|
||||
@@ -189,7 +186,6 @@ func main() {
|
||||
|
||||
ccconf.Init(flagConfigFile)
|
||||
|
||||
|
||||
// Load and check main configuration
|
||||
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {
|
||||
if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil {
|
||||
@@ -209,7 +205,7 @@ func main() {
|
||||
if flagValidate {
|
||||
config.Keys.Validate = true
|
||||
for job := range ar.Iter(true) {
|
||||
cclog.Printf("Validate %s - %d\n", job.Meta.Cluster, job.Meta.JobID)
|
||||
cclog.Debugf("Validate %s - %d", job.Meta.Cluster, job.Meta.JobID)
|
||||
}
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user