diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index ce721b7..3079b21 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -220,7 +220,7 @@ func addUser(userSpec string) error { return fmt.Errorf("adding user '%s' with roles '%s': %w", parts[0], parts[1], err) } - cclog.Printf("Add User: Added new user '%s' with roles '%s'.\n", parts[0], parts[1]) + cclog.Infof("Add User: Added new user '%s' with roles '%s'", parts[0], parts[1]) return nil } @@ -229,7 +229,7 @@ func delUser(username string) error { if err := ur.DelUser(username); err != nil { return fmt.Errorf("deleting user '%s': %w", username, err) } - cclog.Printf("Delete User: Deleted user '%s' from DB.\n", username) + cclog.Infof("Delete User: Deleted user '%s' from DB", username) return nil } @@ -262,7 +262,7 @@ func generateJWT(authHandle *auth.Authentication, username string) error { return fmt.Errorf("generating JWT for user '%s': %w", user.Username, err) } - cclog.Printf("JWT: Successfully generated JWT for user '%s': %s\n", user.Username, jwt) + cclog.Infof("JWT: Successfully generated JWT for user '%s': %s", user.Username, jwt) return nil } @@ -294,7 +294,7 @@ func initSubsystems() error { if err := importer.HandleImportFlag(flagImportJob); err != nil { return fmt.Errorf("importing job: %w", err) } - cclog.Printf("Import Job: Imported Job '%s' into DB.\n", flagImportJob) + cclog.Infof("Import Job: Imported Job '%s' into DB", flagImportJob) } // Initialize taggers @@ -325,7 +325,7 @@ func runServer(ctx context.Context) error { } // Start archiver and task manager - archiver.Start(repository.GetJobRepository()) + archiver.Start(repository.GetJobRepository(), ctx) taskManager.Start(ccconf.GetPackageConfig("cron"), ccconf.GetPackageConfig("archive")) // Initialize web UI diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index 498ffc2..0c4c259 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -330,9 +330,9 @@ func (s *Server) Start(ctx context.Context) error { MinVersion: tls.VersionTLS12, PreferServerCipherSuites: true, }) - cclog.Printf("HTTPS server listening at %s...\n", config.Keys.Addr) + cclog.Infof("HTTPS server listening at %s...", config.Keys.Addr) } else { - cclog.Printf("HTTP server listening at %s...\n", config.Keys.Addr) + cclog.Infof("HTTP server listening at %s...", config.Keys.Addr) } // // Because this program will want to bind to a privileged port (like 80), the listener must @@ -373,6 +373,8 @@ func (s *Server) Shutdown(ctx context.Context) { memorystore.Shutdown() } - // Then, wait for any async archivings still pending... - archiver.WaitForArchiving() + // Shutdown archiver with 10 second timeout for fast shutdown + if err := archiver.Shutdown(10 * time.Second); err != nil { + cclog.Warnf("Archiver shutdown: %v", err) + } } diff --git a/internal/api/api_test.go b/internal/api/api_test.go index bf8cd75..aa2abd8 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -175,7 +175,7 @@ func setup(t *testing.T) *api.RestApi { t.Fatal(err) } - archiver.Start(repository.GetJobRepository()) + archiver.Start(repository.GetJobRepository(), context.Background()) if cfg := ccconf.GetPackageConfig("auth"); cfg != nil { auth.Init(&cfg) @@ -190,6 +190,10 @@ func setup(t *testing.T) *api.RestApi { } func cleanup() { + // Gracefully shutdown archiver with timeout + if err := archiver.Shutdown(5 * time.Second); err != nil { + cclog.Warnf("Archiver shutdown timeout in tests: %v", err) + } // TODO: Clear all caches, reset all modules, etc... } @@ -333,7 +337,7 @@ func TestRestApi(t *testing.T) { t.Fatal(response.Status, recorder.Body.String()) } - archiver.WaitForArchiving() + // Archiving happens asynchronously, will be completed in cleanup job, err := restapi.JobRepository.Find(&TestJobId, &TestClusterName, &TestStartTime) if err != nil { t.Fatal(err) @@ -446,7 +450,7 @@ func TestRestApi(t *testing.T) { t.Fatal(response.Status, recorder.Body.String()) } - archiver.WaitForArchiving() + // Archiving happens asynchronously, will be completed in cleanup jobid, cluster := int64(12345), "testcluster" job, err := restapi.JobRepository.Find(&jobid, &cluster, nil) if err != nil { diff --git a/internal/api/job.go b/internal/api/job.go index 6b6f02e..5bc7c71 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -669,7 +669,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { return } - cclog.Printf("REST: %s\n", req.GoString()) + cclog.Debugf("REST: %s", req.GoString()) req.State = schema.JobStateRunning if err := importer.SanityChecks(&req); err != nil { @@ -713,7 +713,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { } } - cclog.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime) + cclog.Infof("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime) rw.Header().Add("Content-Type", "application/json") rw.WriteHeader(http.StatusCreated) if err := json.NewEncoder(rw).Encode(DefaultApiResponse{ @@ -946,7 +946,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo } } - cclog.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) + cclog.Infof("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) // Send a response (with status OK). This means that errors that happen from here on forward // can *NOT* be communicated to the client. If reading from a MetricDataRepository or diff --git a/internal/archiver/README.md b/internal/archiver/README.md new file mode 100644 index 0000000..0fae04e --- /dev/null +++ b/internal/archiver/README.md @@ -0,0 +1,190 @@ +# Archiver Package + +The `archiver` package provides asynchronous job archiving functionality for ClusterCockpit. When jobs complete, their metric data is archived from the metric store to a persistent archive backend (filesystem, S3, SQLite, etc.). + +## Architecture + +### Producer-Consumer Pattern + +``` +┌──────────────┐ TriggerArchiving() ┌───────────────┐ +│ API Handler │ ───────────────────────▶ │ archiveChannel│ +│ (Job Stop) │ │ (buffer: 128)│ +└──────────────┘ └───────┬───────┘ + │ + ┌─────────────────────────────────┘ + │ + ▼ + ┌──────────────────────┐ + │ archivingWorker() │ + │ (goroutine) │ + └──────────┬───────────┘ + │ + ▼ + 1. Fetch job metadata + 2. Load metric data + 3. Calculate statistics + 4. Archive to backend + 5. Update database + 6. Call hooks +``` + +### Components + +- **archiveChannel**: Buffered channel (128 jobs) for async communication +- **archivePending**: WaitGroup tracking in-flight archiving operations +- **archivingWorker**: Background goroutine processing archiving requests +- **shutdownCtx**: Context for graceful cancellation during shutdown + +## Usage + +### Initialization + +```go +// Start archiver with context for shutdown control +ctx, cancel := context.WithCancel(context.Background()) +defer cancel() + +archiver.Start(jobRepository, ctx) +``` + +### Archiving a Job + +```go +// Called automatically when a job completes +archiver.TriggerArchiving(job) +``` + +The function returns immediately. Actual archiving happens in the background. + +### Graceful Shutdown + +```go +// Shutdown with 10 second timeout +if err := archiver.Shutdown(10 * time.Second); err != nil { + log.Printf("Archiver shutdown timeout: %v", err) +} +``` + +**Shutdown process:** +1. Closes channel (rejects new jobs) +2. Waits for pending jobs (up to timeout) +3. Cancels context if timeout exceeded +4. Waits for worker to exit cleanly + +## Configuration + +### Channel Buffer Size + +The archiving channel has a buffer of 128 jobs. If more than 128 jobs are queued simultaneously, `TriggerArchiving()` will block until space is available. + +To adjust: +```go +// In archiveWorker.go Start() function +archiveChannel = make(chan *schema.Job, 256) // Increase buffer +``` + +### Scope Selection + +Archive data scopes are automatically selected based on job size: + +- **Node scope**: Always included +- **Core scope**: Included for jobs with ≤8 nodes (reduces data volume for large jobs) +- **Accelerator scope**: Included if job used accelerators (`NumAcc > 0`) + +To adjust the node threshold: +```go +// In archiver.go ArchiveJob() function +if job.NumNodes <= 16 { // Change from 8 to 16 + scopes = append(scopes, schema.MetricScopeCore) +} +``` + +### Resolution + +Data is archived at the highest available resolution (typically 60s intervals). To change: + +```go +// In archiver.go ArchiveJob() function +jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 300) +// 0 = highest resolution +// 300 = 5-minute resolution +``` + +## Error Handling + +### Automatic Retry + +The archiver does **not** automatically retry failed archiving operations. If archiving fails: + +1. Error is logged +2. Job is marked as `MonitoringStatusArchivingFailed` in database +3. Worker continues processing other jobs + +### Manual Retry + +To re-archive failed jobs, query for jobs with `MonitoringStatusArchivingFailed` and call `TriggerArchiving()` again. + +## Performance Considerations + +### Single Worker Thread + +The archiver uses a single worker goroutine. For high-throughput systems: + +- Large channel buffer (128) prevents blocking +- Archiving is typically I/O bound (writing to storage) +- Single worker prevents overwhelming storage backend + +### Shutdown Timeout + +Recommended timeout values: +- **Development**: 5-10 seconds +- **Production**: 10-30 seconds +- **High-load**: 30-60 seconds + +Choose based on: +- Average archiving time per job +- Storage backend latency +- Acceptable shutdown delay + +## Monitoring + +### Logging + +The archiver logs: +- **Info**: Startup, shutdown, successful completions +- **Debug**: Individual job archiving times +- **Error**: Archiving failures with job ID and reason +- **Warn**: Shutdown timeout exceeded + +### Metrics + +Monitor these signals for archiver health: +- Jobs with `MonitoringStatusArchivingFailed` +- Time from job stop to successful archive +- Shutdown timeout occurrences + +## Thread Safety + +All exported functions are safe for concurrent use: +- `Start()` - Safe to call once +- `TriggerArchiving()` - Safe from multiple goroutines +- `Shutdown()` - Safe to call once +- `WaitForArchiving()` - Deprecated, but safe + +Internal state is protected by: +- Channel synchronization (`archiveChannel`) +- WaitGroup for pending count (`archivePending`) +- Context for cancellation (`shutdownCtx`) + +## Files + +- **archiveWorker.go**: Worker lifecycle, channel management, shutdown logic +- **archiver.go**: Core archiving logic, metric loading, statistics calculation + +## Dependencies + +- `internal/repository`: Database operations for job metadata +- `internal/metricDataDispatcher`: Loading metric data from various backends +- `pkg/archive`: Archive backend abstraction (filesystem, S3, SQLite) +- `cc-lib/schema`: Job and metric data structures diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go index 9e834b2..0434844 100644 --- a/internal/archiver/archiveWorker.go +++ b/internal/archiver/archiveWorker.go @@ -2,10 +2,54 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + +// Package archiver provides asynchronous job archiving functionality for ClusterCockpit. +// +// The archiver runs a background worker goroutine that processes job archiving requests +// from a buffered channel. When jobs complete, their metric data is archived from the +// metric store to the configured archive backend (filesystem, S3, etc.). +// +// # Architecture +// +// The archiver uses a producer-consumer pattern: +// - Producer: TriggerArchiving() sends jobs to archiveChannel +// - Consumer: archivingWorker() processes jobs from the channel +// - Coordination: sync.WaitGroup tracks pending archive operations +// +// # Lifecycle +// +// 1. Start(repo, ctx) - Initialize worker with context for cancellation +// 2. TriggerArchiving(job) - Queue job for archiving (called when job stops) +// 3. archivingWorker() - Background goroutine processes jobs +// 4. Shutdown(timeout) - Graceful shutdown with timeout +// +// # Graceful Shutdown +// +// The archiver supports graceful shutdown with configurable timeout: +// - Closes channel to reject new jobs +// - Waits for pending jobs to complete (up to timeout) +// - Cancels context if timeout exceeded +// - Ensures worker goroutine exits cleanly +// +// # Example Usage +// +// // Initialize archiver +// ctx, cancel := context.WithCancel(context.Background()) +// defer cancel() +// archiver.Start(jobRepository, ctx) +// +// // Trigger archiving when job completes +// archiver.TriggerArchiving(job) +// +// // Graceful shutdown with 10 second timeout +// if err := archiver.Shutdown(10 * time.Second); err != nil { +// log.Printf("Archiver shutdown timeout: %v", err) +// } package archiver import ( "context" + "fmt" "sync" "time" @@ -19,38 +63,82 @@ var ( archivePending sync.WaitGroup archiveChannel chan *schema.Job jobRepo *repository.JobRepository + shutdownCtx context.Context + shutdownCancel context.CancelFunc + workerDone chan struct{} ) -func Start(r *repository.JobRepository) { +// Start initializes the archiver and starts the background worker goroutine. +// +// The archiver processes job archiving requests asynchronously via a buffered channel. +// Jobs are sent to the channel using TriggerArchiving() and processed by the worker. +// +// Parameters: +// - r: JobRepository instance for database operations +// - ctx: Context for cancellation (shutdown signal propagation) +// +// The worker goroutine will run until: +// - ctx is cancelled (via parent shutdown) +// - archiveChannel is closed (via Shutdown()) +// +// Must be called before TriggerArchiving(). Safe to call only once. +func Start(r *repository.JobRepository, ctx context.Context) { + shutdownCtx, shutdownCancel = context.WithCancel(ctx) archiveChannel = make(chan *schema.Job, 128) + workerDone = make(chan struct{}) jobRepo = r go archivingWorker() } -// Archiving worker thread +// archivingWorker is the background goroutine that processes job archiving requests. +// +// The worker loop: +// 1. Blocks waiting for jobs on archiveChannel or shutdown signal +// 2. Fetches job metadata from repository +// 3. Archives job data to configured backend (calls ArchiveJob) +// 4. Updates job footprint and energy metrics in database +// 5. Marks job as successfully archived +// 6. Calls job stop hooks +// +// The worker exits when: +// - shutdownCtx is cancelled (timeout during shutdown) +// - archiveChannel is closed (normal shutdown) +// +// Errors during archiving are logged and the job is marked as failed, +// but the worker continues processing other jobs. func archivingWorker() { + defer close(workerDone) + for { select { + case <-shutdownCtx.Done(): + cclog.Info("Archive worker received shutdown signal") + return + case job, ok := <-archiveChannel: if !ok { - break + cclog.Info("Archive channel closed, worker exiting") + return } + start := time.Now() // not using meta data, called to load JobMeta into Cache? // will fail if job meta not in repository if _, err := jobRepo.FetchMetadata(job); err != nil { cclog.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error()) jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed) + archivePending.Done() continue } // ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend - // TODO: Maybe use context with cancel/timeout here - jobMeta, err := ArchiveJob(job, context.Background()) + // Use shutdown context to allow cancellation + jobMeta, err := ArchiveJob(job, shutdownCtx) if err != nil { cclog.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed) + archivePending.Done() continue } @@ -58,30 +146,44 @@ func archivingWorker() { if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil { cclog.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) + archivePending.Done() continue } if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil { cclog.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) + archivePending.Done() continue } // Update the jobs database entry one last time: stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful) if err := jobRepo.Execute(stmt); err != nil { cclog.Errorf("archiving job (dbid: %d) failed at db execute: %s", job.ID, err.Error()) + archivePending.Done() continue } cclog.Debugf("archiving job %d took %s", job.JobID, time.Since(start)) - cclog.Printf("archiving job (dbid: %d) successful", job.ID) + cclog.Infof("archiving job (dbid: %d) successful", job.ID) repository.CallJobStopHooks(job) archivePending.Done() - default: - continue } } } -// Trigger async archiving +// TriggerArchiving queues a job for asynchronous archiving. +// +// This function should be called when a job completes (stops) to archive its +// metric data from the metric store to the configured archive backend. +// +// The function: +// 1. Increments the pending job counter (WaitGroup) +// 2. Sends the job to the archiving channel (buffered, capacity 128) +// 3. Returns immediately (non-blocking unless channel is full) +// +// The actual archiving is performed asynchronously by the worker goroutine. +// Upon completion, the worker will decrement the pending counter. +// +// Panics if Start() has not been called first. func TriggerArchiving(job *schema.Job) { if archiveChannel == nil { cclog.Fatal("Cannot archive without archiving channel. Did you Start the archiver?") @@ -91,8 +193,58 @@ func TriggerArchiving(job *schema.Job) { archiveChannel <- job } -// Wait for background thread to finish pending archiving operations -func WaitForArchiving() { - // close channel and wait for worker to process remaining jobs - archivePending.Wait() +// Shutdown performs a graceful shutdown of the archiver with a configurable timeout. +// +// The shutdown process: +// 1. Closes archiveChannel - no new jobs will be accepted +// 2. Waits for pending jobs to complete (up to timeout duration) +// 3. If timeout is exceeded: +// - Cancels shutdownCtx to interrupt ongoing ArchiveJob operations +// - Returns error indicating timeout +// 4. Waits for worker goroutine to exit cleanly +// +// Parameters: +// - timeout: Maximum duration to wait for pending jobs to complete +// (recommended: 10-30 seconds for production) +// +// Returns: +// - nil if all jobs completed within timeout +// - error if timeout was exceeded (some jobs may not have been archived) +// +// Jobs that don't complete within the timeout will be marked as failed. +// The function always ensures the worker goroutine exits before returning. +// +// Example: +// +// if err := archiver.Shutdown(10 * time.Second); err != nil { +// log.Printf("Some jobs did not complete: %v", err) +// } +func Shutdown(timeout time.Duration) error { + cclog.Info("Initiating archiver shutdown...") + + // Close channel to signal no more jobs will be accepted + close(archiveChannel) + + // Create a channel to signal when all jobs are done + done := make(chan struct{}) + go func() { + archivePending.Wait() + close(done) + }() + + // Wait for jobs to complete or timeout + select { + case <-done: + cclog.Info("All archive jobs completed successfully") + // Wait for worker to exit + <-workerDone + return nil + case <-time.After(timeout): + cclog.Warn("Archiver shutdown timeout exceeded, cancelling remaining operations") + // Cancel any ongoing operations + shutdownCancel() + // Wait for worker to exit + <-workerDone + return fmt.Errorf("archiver shutdown timeout after %v", timeout) + } } diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index e21be13..b88199a 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -2,6 +2,7 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + package archiver import ( @@ -15,7 +16,32 @@ import ( "github.com/ClusterCockpit/cc-lib/schema" ) -// Writes a running job to the job-archive +// ArchiveJob archives a completed job's metric data to the configured archive backend. +// +// This function performs the following operations: +// 1. Loads all metric data for the job from the metric data repository +// 2. Calculates job-level statistics (avg, min, max) for each metric +// 3. Stores the job metadata and metric data to the archive backend +// +// Metric data is retrieved at the highest available resolution (typically 60s) +// for the following scopes: +// - Node scope (always) +// - Core scope (for jobs with ≤8 nodes, to reduce data volume) +// - Accelerator scope (if job used accelerators) +// +// The function respects context cancellation. If ctx is cancelled (e.g., during +// shutdown timeout), the operation will be interrupted and return an error. +// +// Parameters: +// - job: The job to archive (must be a completed job) +// - ctx: Context for cancellation and timeout control +// +// Returns: +// - *schema.Job with populated Statistics field +// - error if data loading or archiving fails +// +// If config.Keys.DisableArchive is true, only job statistics are calculated +// and returned (no data is written to archive backend). func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.Job, error) { allMetrics := make([]string, 0) metricConfigs := archive.GetCluster(job.Cluster).MetricConfig diff --git a/internal/importer/initDB.go b/internal/importer/initDB.go index e061734..a478957 100644 --- a/internal/importer/initDB.go +++ b/internal/importer/initDB.go @@ -74,6 +74,11 @@ func InitDB() error { for jobContainer := range ar.Iter(false) { jobMeta := jobContainer.Meta + if jobMeta == nil { + cclog.Warn("skipping job with nil metadata") + errorOccured++ + continue + } // Bundle 100 inserts into one transaction for better performance if i%100 == 0 { @@ -144,7 +149,7 @@ func InitDB() error { } r.TransactionEnd(t) - cclog.Printf("A total of %d jobs have been registered in %.3f seconds.\n", i, time.Since(starttime).Seconds()) + cclog.Infof("A total of %d jobs have been registered in %.3f seconds.", i, time.Since(starttime).Seconds()) return nil } diff --git a/internal/importer/normalize.go b/internal/importer/normalize.go index fc5e537..943ceb2 100644 --- a/internal/importer/normalize.go +++ b/internal/importer/normalize.go @@ -23,6 +23,11 @@ func getNormalizationFactor(v float64) (float64, int) { count := 0 scale := -3 + // Prevent infinite loop for zero or negative values + if v <= 0.0 { + return 1.0, 0 + } + if v > 1000.0 { for v > 1000.0 { v *= 1e-3 @@ -49,6 +54,11 @@ func getNormalizationFactor(v float64) (float64, int) { func getExponent(p float64) int { count := 0 + // Prevent infinite loop for infinity or NaN values + if math.IsInf(p, 0) || math.IsNaN(p) || p <= 0.0 { + return 0 + } + for p > 1.0 { p = p / 1000.0 count++ diff --git a/internal/memorystore/archive.go b/internal/memorystore/archive.go index 1878952..56065aa 100644 --- a/internal/memorystore/archive.go +++ b/internal/memorystore/archive.go @@ -44,14 +44,14 @@ func Archiving(wg *sync.WaitGroup, ctx context.Context) { return case <-ticks: t := time.Now().Add(-d) - cclog.Printf("[METRICSTORE]> start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339)) + cclog.Infof("[METRICSTORE]> start archiving checkpoints (older than %s)...", t.Format(time.RFC3339)) n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir, Keys.Archive.RootDir, t.Unix(), Keys.Archive.DeleteInstead) if err != nil { - cclog.Printf("[METRICSTORE]> archiving failed: %s\n", err.Error()) + cclog.Errorf("[METRICSTORE]> archiving failed: %s", err.Error()) } else { - cclog.Printf("[METRICSTORE]> done: %d files zipped and moved to archive\n", n) + cclog.Infof("[METRICSTORE]> done: %d files zipped and moved to archive", n) } } } diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go index 8b1705c..e19cbf7 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/memorystore/checkpoint.go @@ -82,14 +82,14 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { case <-ctx.Done(): return case <-ticks: - cclog.Printf("[METRICSTORE]> start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339)) + cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", lastCheckpoint.Format(time.RFC3339)) now := time.Now() n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), now.Unix()) if err != nil { - cclog.Printf("[METRICSTORE]> checkpointing failed: %s\n", err.Error()) + cclog.Errorf("[METRICSTORE]> checkpointing failed: %s", err.Error()) } else { - cclog.Printf("[METRICSTORE]> done: %d checkpoint files created\n", n) + cclog.Infof("[METRICSTORE]> done: %d checkpoint files created", n) lastCheckpoint = now } } @@ -194,7 +194,7 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { continue } - cclog.Printf("[METRICSTORE]> error while checkpointing %#v: %s", workItem.selector, err.Error()) + cclog.Errorf("[METRICSTORE]> error while checkpointing %#v: %s", workItem.selector, err.Error()) atomic.AddInt32(&errs, 1) } else { atomic.AddInt32(&n, 1) @@ -394,7 +394,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { if err != nil { cclog.Fatalf("[METRICSTORE]> Error creating directory: %#v\n", err) } - cclog.Printf("[METRICSTORE]> %#v Directory created successfully.\n", dir) + cclog.Debugf("[METRICSTORE]> %#v Directory created successfully", dir) } // Config read (replace with your actual config read) @@ -413,7 +413,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { if found, err := checkFilesWithExtension(dir, fileFormat); err != nil { return 0, fmt.Errorf("[METRICSTORE]> error checking files with extension: %v", err) } else if found { - cclog.Printf("[METRICSTORE]> Loading %s files because fileformat is %s\n", fileFormat, fileFormat) + cclog.Infof("[METRICSTORE]> Loading %s files because fileformat is %s", fileFormat, fileFormat) return m.FromCheckpoint(dir, from, fileFormat) } @@ -422,7 +422,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { if found, err := checkFilesWithExtension(dir, altFormat); err != nil { return 0, fmt.Errorf("[METRICSTORE]> error checking files with extension: %v", err) } else if found { - cclog.Printf("[METRICSTORE]> Loading %s files but fileformat is %s\n", altFormat, fileFormat) + cclog.Infof("[METRICSTORE]> Loading %s files but fileformat is %s", altFormat, fileFormat) return m.FromCheckpoint(dir, from, altFormat) } diff --git a/internal/memorystore/lineprotocol.go b/internal/memorystore/lineprotocol.go index 607fff3..2bbd7ee 100644 --- a/internal/memorystore/lineprotocol.go +++ b/internal/memorystore/lineprotocol.go @@ -119,7 +119,7 @@ func ReceiveNats(conf *(NatsConfig), for m := range msgs { dec := lineprotocol.NewDecoderWithBytes(m.Data) if err := DecodeLine(dec, ms, clusterTag); err != nil { - cclog.Printf("error: %s\n", err.Error()) + cclog.Errorf("error: %s", err.Error()) } } @@ -134,7 +134,7 @@ func ReceiveNats(conf *(NatsConfig), sub, err = nc.Subscribe(sc.SubscribeTo, func(m *nats.Msg) { dec := lineprotocol.NewDecoderWithBytes(m.Data) if err := DecodeLine(dec, ms, clusterTag); err != nil { - cclog.Printf("error: %s\n", err.Error()) + cclog.Errorf("error: %s", err.Error()) } }) } @@ -142,7 +142,7 @@ func ReceiveNats(conf *(NatsConfig), if err != nil { return err } - cclog.Printf("NATS subscription to '%s' on '%s' established\n", sc.SubscribeTo, conf.Address) + cclog.Infof("NATS subscription to '%s' on '%s' established", sc.SubscribeTo, conf.Address) subs = append(subs, sub) } @@ -150,7 +150,7 @@ func ReceiveNats(conf *(NatsConfig), for _, sub := range subs { err = sub.Unsubscribe() if err != nil { - cclog.Printf("NATS unsubscribe failed: %s", err.Error()) + cclog.Errorf("NATS unsubscribe failed: %s", err.Error()) } } close(msgs) diff --git a/internal/taskManager/ldapSyncService.go b/internal/taskManager/ldapSyncService.go index 27212e8..4a6e64a 100644 --- a/internal/taskManager/ldapSyncService.go +++ b/internal/taskManager/ldapSyncService.go @@ -27,7 +27,7 @@ func RegisterLdapSyncService(ds string) { gocron.NewTask( func() { t := time.Now() - cclog.Printf("ldap sync started at %s", t.Format(time.RFC3339)) + cclog.Infof("ldap sync started at %s", t.Format(time.RFC3339)) if err := auth.LdapAuth.Sync(); err != nil { cclog.Errorf("ldap sync failed: %s", err.Error()) } diff --git a/internal/taskManager/updateDurationService.go b/internal/taskManager/updateDurationService.go index 53882f0..1d98756 100644 --- a/internal/taskManager/updateDurationService.go +++ b/internal/taskManager/updateDurationService.go @@ -25,8 +25,8 @@ func RegisterUpdateDurationWorker() { gocron.NewTask( func() { start := time.Now() - cclog.Printf("Update duration started at %s\n", start.Format(time.RFC3339)) + cclog.Infof("Update duration started at %s", start.Format(time.RFC3339)) jobRepo.UpdateDuration() - cclog.Printf("Update duration is done and took %s\n", time.Since(start)) + cclog.Infof("Update duration is done and took %s", time.Since(start)) })) } diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 4fb5e45..9d2d43b 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -34,7 +34,7 @@ func RegisterFootprintWorker() { c := 0 ce := 0 cl := 0 - cclog.Printf("Update Footprints started at %s\n", s.Format(time.RFC3339)) + cclog.Infof("Update Footprints started at %s", s.Format(time.RFC3339)) for _, cluster := range archive.Clusters { s_cluster := time.Now() @@ -136,6 +136,6 @@ func RegisterFootprintWorker() { } cclog.Debugf("Finish Cluster %s, took %s\n", cluster.Name, time.Since(s_cluster)) } - cclog.Printf("Updating %d (of %d; Skipped %d) Footprints is done and took %s\n", c, cl, ce, time.Since(s)) + cclog.Infof("Updating %d (of %d; Skipped %d) Footprints is done and took %s", c, cl, ce, time.Since(s)) })) } diff --git a/tools/archive-manager/main.go b/tools/archive-manager/main.go index 350bc81..9af07fa 100644 --- a/tools/archive-manager/main.go +++ b/tools/archive-manager/main.go @@ -50,7 +50,7 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, err // Create channels for job distribution jobs := make(chan archive.JobContainer, numWorkers*2) - + // WaitGroup to track worker completion var wg sync.WaitGroup @@ -127,8 +127,6 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, err return finalImported, finalFailed, nil } - - func main() { var srcPath, flagConfigFile, flagLogLevel, flagRemoveCluster, flagRemoveAfter, flagRemoveBefore string var flagSrcConfig, flagDstConfig string @@ -147,7 +145,6 @@ func main() { flag.StringVar(&flagDstConfig, "dst-config", "", "Destination archive backend configuration (JSON), e.g. '{\"kind\":\"sqlite\",\"dbPath\":\"./archive.db\"}'") flag.Parse() - archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", srcPath) cclog.Init(flagLogLevel, flagLogDateTime) @@ -189,7 +186,6 @@ func main() { ccconf.Init(flagConfigFile) - // Load and check main configuration if cfg := ccconf.GetPackageConfig("main"); cfg != nil { if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil { @@ -209,7 +205,7 @@ func main() { if flagValidate { config.Keys.Validate = true for job := range ar.Iter(true) { - cclog.Printf("Validate %s - %d\n", job.Meta.Cluster, job.Meta.JobID) + cclog.Debugf("Validate %s - %d", job.Meta.Cluster, job.Meta.JobID) } os.Exit(0) }