archive jobs in single worker thread

This commit is contained in:
Pay Giesselmann 2022-12-08 15:04:58 +01:00
parent ded5cda6ba
commit a0f7a598ad
3 changed files with 61 additions and 33 deletions

View File

@ -416,7 +416,7 @@ func main() {
server.Shutdown(context.Background())
// Then, wait for any async archivings still pending...
api.OngoingArchivings.Wait()
api.JobRepository.WaitForArchiving()
}()
if config.Keys.StopJobsExceedingWalltime > 0 {

View File

@ -6,7 +6,6 @@ package api
import (
"bufio"
"context"
"database/sql"
"encoding/json"
"errors"
@ -23,7 +22,6 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
@ -56,7 +54,6 @@ type RestApi struct {
Resolver *graph.Resolver
Authentication *auth.Authentication
MachineStateDir string
OngoingArchivings sync.WaitGroup
RepositoryMutex sync.Mutex
}
@ -721,34 +718,8 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
return
}
// We need to start a new goroutine as this functions needs to return
// for the response to be flushed to the client.
api.OngoingArchivings.Add(1) // So that a shutdown does not interrupt this goroutine.
go func() {
defer api.OngoingArchivings.Done()
if _, err := api.JobRepository.FetchMetadata(job); err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
api.JobRepository.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
return
}
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and create meta.json/data.json files
jobMeta, err := metricdata.ArchiveJob(job, context.Background())
if err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
api.JobRepository.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
return
}
// Update the jobs database entry one last time:
if err := api.JobRepository.Archive(job.ID, schema.MonitoringStatusArchivingSuccessful, jobMeta.Statistics); err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
return
}
log.Printf("archiving job (dbid: %d) successful", job.ID)
}()
// Trigger async archiving
api.JobRepository.TriggerArchiving(job)
}
// func (api *RestApi) importJob(rw http.ResponseWriter, r *http.Request) {

View File

@ -16,6 +16,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
@ -33,6 +34,9 @@ type JobRepository struct {
stmtCache *sq.StmtCache
cache *lrucache.Cache
archiveChannel chan *schema.Job
archivePending sync.WaitGroup
}
func GetJobRepository() *JobRepository {
@ -43,7 +47,10 @@ func GetJobRepository() *JobRepository {
DB: db.DB,
stmtCache: sq.NewStmtCache(db.DB),
cache: lrucache.New(1024 * 1024),
archiveChannel: make(chan *schema.Job, 128),
}
// start archiving worker
go jobRepoInstance.archivingWorker()
})
return jobRepoInstance
@ -326,7 +333,7 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
}
// Stop updates the job with the database id jobId using the provided arguments.
func (r *JobRepository) Archive(
func (r *JobRepository) MarkArchived(
jobId int64,
monitoringStatus int32,
metricStats map[string]schema.JobStatistics) error {
@ -358,6 +365,56 @@ func (r *JobRepository) Archive(
return nil
}
// Archiving worker thread
func (r *JobRepository) archivingWorker(){
for {
select {
case job, ok := <- r.archiveChannel:
if !ok {
break
}
// not using meta data, called to load JobMeta into Cache?
// will fail if job meta not in repository
if _, err := r.FetchMetadata(job); err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
continue
}
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
// TODO: Maybe use context with cancel/timeout here
jobMeta, err := metricdata.ArchiveJob(job, context.Background())
if err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
continue
}
// Update the jobs database entry one last time:
if err := r.MarkArchived(job.ID, schema.MonitoringStatusArchivingSuccessful, jobMeta.Statistics); err != nil {
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
continue
}
log.Printf("archiving job (dbid: %d) successful", job.ID)
r.archivePending.Done()
}
}
}
// Trigger async archiving
func (r *JobRepository) TriggerArchiving(job *schema.Job){
r.archivePending.Add(1)
r.archiveChannel <- job
}
// Wait for background thread to finish pending archiving operations
func (r *JobRepository) WaitForArchiving(){
// close channel and wait for worker to process remaining jobs
close(r.archiveChannel)
r.archivePending.Wait()
}
var ErrNotFound = errors.New("no such job or user")
// FindJobOrUser returns a job database ID or a username if a job or user machtes the search term.