diff --git a/internal/api/api_test.go b/internal/api/api_test.go index bb1ff6f..aa0a1f4 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -156,6 +156,7 @@ func setup(t *testing.T) *api.RestApi { t.Fatal(err) } + archiver.Start(repository.GetJobRepository()) auth.Init() graph.Init() diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go index 6757a0b..7b272e6 100644 --- a/internal/archiver/archiveWorker.go +++ b/internal/archiver/archiveWorker.go @@ -17,11 +17,12 @@ import ( var ( archivePending sync.WaitGroup archiveChannel chan *schema.Job - r *repository.JobRepository + jobRepo *repository.JobRepository ) -func Start(jobRepo *repository.JobRepository) { +func Start(r *repository.JobRepository) { archiveChannel = make(chan *schema.Job, 128) + jobRepo = r go archivingWorker() } @@ -37,9 +38,9 @@ func archivingWorker() { start := time.Now() // not using meta data, called to load JobMeta into Cache? // will fail if job meta not in repository - if _, err := r.FetchMetadata(job); err != nil { + if _, err := jobRepo.FetchMetadata(job); err != nil { log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error()) - r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) + jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) continue } @@ -48,16 +49,16 @@ func archivingWorker() { jobMeta, err := ArchiveJob(job, context.Background()) if err != nil { log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) - r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) + jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) continue } - if err := r.UpdateFootprint(jobMeta); err != nil { + if err := jobRepo.UpdateFootprint(jobMeta); err != nil { log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) continue } // Update the jobs database entry one last time: - if err := r.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil { + if err := jobRepo.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil { log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error()) continue } @@ -70,6 +71,10 @@ func archivingWorker() { // Trigger async archiving func TriggerArchiving(job *schema.Job) { + if archiveChannel == nil { + log.Fatal("Cannot archive without archiving channel. Did you Start the archiver?") + } + archivePending.Add(1) archiveChannel <- job } diff --git a/pkg/archive/testdata/archive/alex/cluster.json b/pkg/archive/testdata/archive/alex/cluster.json index cc2888d..f1cf085 100644 --- a/pkg/archive/testdata/archive/alex/cluster.json +++ b/pkg/archive/testdata/archive/alex/cluster.json @@ -94,7 +94,7 @@ }, "scope": "hwthread", "aggregation": "sum", - "energy": true, + "energy": "power", "timestep": 60, "peak": 500, "normal": 250, @@ -136,7 +136,7 @@ }, "scope": "accelerator", "aggregation": "sum", - "energy": true, + "energy": "power", "timestep": 60, "peak": 400, "normal": 200, @@ -190,7 +190,7 @@ }, "scope": "socket", "aggregation": "sum", - "energy": true, + "energy": "power", "timestep": 60, "peak": 500, "normal": 250, diff --git a/pkg/archive/testdata/archive/fritz/cluster.json b/pkg/archive/testdata/archive/fritz/cluster.json index 58ec3af..3df3a95 100644 --- a/pkg/archive/testdata/archive/fritz/cluster.json +++ b/pkg/archive/testdata/archive/fritz/cluster.json @@ -256,7 +256,7 @@ "normal": 250, "caution": 100, "alert": 50, - "energy": true + "energy": "power" }, { "name": "mem_power", @@ -270,7 +270,7 @@ "normal": 50, "caution": 20, "alert": 10, - "energy": true + "energy": "power" }, { "name": "ipc",