Bugs fixed in unit tests and archiver init

This commit is contained in:
2024-08-28 12:26:35 +02:00
parent db5809d522
commit f305863616
4 changed files with 18 additions and 12 deletions

View File

@@ -17,11 +17,12 @@ import (
var (
archivePending sync.WaitGroup
archiveChannel chan *schema.Job
r *repository.JobRepository
jobRepo *repository.JobRepository
)
func Start(jobRepo *repository.JobRepository) {
func Start(r *repository.JobRepository) {
archiveChannel = make(chan *schema.Job, 128)
jobRepo = r
go archivingWorker()
}
@@ -37,9 +38,9 @@ func archivingWorker() {
start := time.Now()
// not using meta data, called to load JobMeta into Cache?
// will fail if job meta not in repository
if _, err := r.FetchMetadata(job); err != nil {
if _, err := jobRepo.FetchMetadata(job); err != nil {
log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error())
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
continue
}
@@ -48,16 +49,16 @@ func archivingWorker() {
jobMeta, err := ArchiveJob(job, context.Background())
if err != nil {
log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error())
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
continue
}
if err := r.UpdateFootprint(jobMeta); err != nil {
if err := jobRepo.UpdateFootprint(jobMeta); err != nil {
log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error())
continue
}
// Update the jobs database entry one last time:
if err := r.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil {
if err := jobRepo.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil {
log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error())
continue
}
@@ -70,6 +71,10 @@ func archivingWorker() {
// Trigger async archiving
func TriggerArchiving(job *schema.Job) {
if archiveChannel == nil {
log.Fatal("Cannot archive without archiving channel. Did you Start the archiver?")
}
archivePending.Add(1)
archiveChannel <- job
}