mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-01-27 03:39:05 +01:00
Merge pull request #77 from giesselmann/serial_archiving
archive jobs in single worker thread
This commit is contained in:
commit
b0cd88bba1
@ -416,7 +416,7 @@ func main() {
|
||||
server.Shutdown(context.Background())
|
||||
|
||||
// Then, wait for any async archivings still pending...
|
||||
api.OngoingArchivings.Wait()
|
||||
api.JobRepository.WaitForArchiving()
|
||||
}()
|
||||
|
||||
if config.Keys.StopJobsExceedingWalltime > 0 {
|
||||
|
@ -6,7 +6,6 @@ package api
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
@ -23,7 +22,6 @@ import (
|
||||
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
@ -56,7 +54,6 @@ type RestApi struct {
|
||||
Resolver *graph.Resolver
|
||||
Authentication *auth.Authentication
|
||||
MachineStateDir string
|
||||
OngoingArchivings sync.WaitGroup
|
||||
RepositoryMutex sync.Mutex
|
||||
}
|
||||
|
||||
@ -721,34 +718,8 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
|
||||
return
|
||||
}
|
||||
|
||||
// We need to start a new goroutine as this functions needs to return
|
||||
// for the response to be flushed to the client.
|
||||
api.OngoingArchivings.Add(1) // So that a shutdown does not interrupt this goroutine.
|
||||
go func() {
|
||||
defer api.OngoingArchivings.Done()
|
||||
|
||||
if _, err := api.JobRepository.FetchMetadata(job); err != nil {
|
||||
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
||||
api.JobRepository.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
||||
return
|
||||
}
|
||||
|
||||
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and create meta.json/data.json files
|
||||
jobMeta, err := metricdata.ArchiveJob(job, context.Background())
|
||||
if err != nil {
|
||||
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
||||
api.JobRepository.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
||||
return
|
||||
}
|
||||
|
||||
// Update the jobs database entry one last time:
|
||||
if err := api.JobRepository.Archive(job.ID, schema.MonitoringStatusArchivingSuccessful, jobMeta.Statistics); err != nil {
|
||||
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("archiving job (dbid: %d) successful", job.ID)
|
||||
}()
|
||||
// Trigger async archiving
|
||||
api.JobRepository.TriggerArchiving(job)
|
||||
}
|
||||
|
||||
// func (api *RestApi) importJob(rw http.ResponseWriter, r *http.Request) {
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/auth"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
@ -33,6 +34,9 @@ type JobRepository struct {
|
||||
|
||||
stmtCache *sq.StmtCache
|
||||
cache *lrucache.Cache
|
||||
|
||||
archiveChannel chan *schema.Job
|
||||
archivePending sync.WaitGroup
|
||||
}
|
||||
|
||||
func GetJobRepository() *JobRepository {
|
||||
@ -43,7 +47,10 @@ func GetJobRepository() *JobRepository {
|
||||
DB: db.DB,
|
||||
stmtCache: sq.NewStmtCache(db.DB),
|
||||
cache: lrucache.New(1024 * 1024),
|
||||
archiveChannel: make(chan *schema.Job, 128),
|
||||
}
|
||||
// start archiving worker
|
||||
go jobRepoInstance.archivingWorker()
|
||||
})
|
||||
|
||||
return jobRepoInstance
|
||||
@ -326,7 +333,7 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
|
||||
}
|
||||
|
||||
// Stop updates the job with the database id jobId using the provided arguments.
|
||||
func (r *JobRepository) Archive(
|
||||
func (r *JobRepository) MarkArchived(
|
||||
jobId int64,
|
||||
monitoringStatus int32,
|
||||
metricStats map[string]schema.JobStatistics) error {
|
||||
@ -358,6 +365,55 @@ func (r *JobRepository) Archive(
|
||||
return nil
|
||||
}
|
||||
|
||||
// Archiving worker thread
|
||||
func (r *JobRepository) archivingWorker(){
|
||||
for {
|
||||
select {
|
||||
case job, ok := <- r.archiveChannel:
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
// not using meta data, called to load JobMeta into Cache?
|
||||
// will fail if job meta not in repository
|
||||
if _, err := r.FetchMetadata(job); err != nil {
|
||||
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
||||
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
||||
continue
|
||||
}
|
||||
|
||||
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
|
||||
// TODO: Maybe use context with cancel/timeout here
|
||||
jobMeta, err := metricdata.ArchiveJob(job, context.Background())
|
||||
if err != nil {
|
||||
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
||||
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
||||
continue
|
||||
}
|
||||
|
||||
// Update the jobs database entry one last time:
|
||||
if err := r.MarkArchived(job.ID, schema.MonitoringStatusArchivingSuccessful, jobMeta.Statistics); err != nil {
|
||||
log.Errorf("archiving job (dbid: %d) failed: %s", job.ID, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("archiving job (dbid: %d) successful", job.ID)
|
||||
r.archivePending.Done()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Trigger async archiving
|
||||
func (r *JobRepository) TriggerArchiving(job *schema.Job){
|
||||
r.archivePending.Add(1)
|
||||
r.archiveChannel <- job
|
||||
}
|
||||
|
||||
// Wait for background thread to finish pending archiving operations
|
||||
func (r *JobRepository) WaitForArchiving(){
|
||||
// close channel and wait for worker to process remaining jobs
|
||||
r.archivePending.Wait()
|
||||
}
|
||||
|
||||
var ErrNotFound = errors.New("no such job or user")
|
||||
|
||||
// FindJobOrUser returns a job database ID or a username if a job or user machtes the search term.
|
||||
|
@ -441,7 +441,7 @@ func TestRestApi(t *testing.T) {
|
||||
t.Fatal(response.Status, recorder.Body.String())
|
||||
}
|
||||
|
||||
restapi.OngoingArchivings.Wait()
|
||||
restapi.JobRepository.WaitForArchiving()
|
||||
job, err := restapi.Resolver.Query().Job(context.Background(), strconv.Itoa(int(dbid)))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -559,7 +559,7 @@ func subtestLetJobFail(t *testing.T, restapi *api.RestApi, r *mux.Router) {
|
||||
t.Fatal(response.Status, recorder.Body.String())
|
||||
}
|
||||
|
||||
restapi.OngoingArchivings.Wait()
|
||||
restapi.JobRepository.WaitForArchiving()
|
||||
jobid, cluster := int64(12345), "testcluster"
|
||||
job, err := restapi.JobRepository.Find(&jobid, &cluster, nil)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user