mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-01-13 21:19:06 +01:00
95 lines
2.9 KiB
Go
95 lines
2.9 KiB
Go
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
|
// All rights reserved.
|
|
// Use of this source code is governed by a MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
package archiver
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
|
sq "github.com/Masterminds/squirrel"
|
|
)
|
|
|
|
var (
|
|
archivePending sync.WaitGroup
|
|
archiveChannel chan *schema.Job
|
|
jobRepo *repository.JobRepository
|
|
)
|
|
|
|
func Start(r *repository.JobRepository) {
|
|
archiveChannel = make(chan *schema.Job, 128)
|
|
jobRepo = r
|
|
|
|
go archivingWorker()
|
|
}
|
|
|
|
// Archiving worker thread
|
|
func archivingWorker() {
|
|
for {
|
|
select {
|
|
case job, ok := <-archiveChannel:
|
|
if !ok {
|
|
break
|
|
}
|
|
start := time.Now()
|
|
// not using meta data, called to load JobMeta into Cache?
|
|
// will fail if job meta not in repository
|
|
if _, err := jobRepo.FetchMetadata(job); err != nil {
|
|
log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error())
|
|
jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
|
continue
|
|
}
|
|
|
|
// ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
|
|
// TODO: Maybe use context with cancel/timeout here
|
|
jobMeta, err := ArchiveJob(job, context.Background())
|
|
if err != nil {
|
|
log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error())
|
|
jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
|
continue
|
|
}
|
|
|
|
stmt := sq.Update("job").Where("job.id = ?", job.ID)
|
|
|
|
if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil {
|
|
log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error())
|
|
continue
|
|
}
|
|
if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil {
|
|
log.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error())
|
|
continue
|
|
}
|
|
// Update the jobs database entry one last time:
|
|
stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful)
|
|
if err := jobRepo.Execute(stmt); err != nil {
|
|
log.Errorf("archiving job (dbid: %d) failed at db execute: %s", job.ID, err.Error())
|
|
continue
|
|
}
|
|
log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
|
|
log.Printf("archiving job (dbid: %d) successful", job.ID)
|
|
archivePending.Done()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Trigger async archiving
|
|
func TriggerArchiving(job *schema.Job) {
|
|
if archiveChannel == nil {
|
|
log.Fatal("Cannot archive without archiving channel. Did you Start the archiver?")
|
|
}
|
|
|
|
archivePending.Add(1)
|
|
archiveChannel <- job
|
|
}
|
|
|
|
// Wait for background thread to finish pending archiving operations
|
|
func WaitForArchiving() {
|
|
// close channel and wait for worker to process remaining jobs
|
|
archivePending.Wait()
|
|
}
|