From db5809d522028fe36c51b187269168f803830cb6 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 28 Aug 2024 11:13:54 +0200 Subject: [PATCH] Move rest of archiveing code into new archive package --- cmd/cc-backend/server.go | 3 +- internal/api/api_test.go | 5 +- internal/api/rest.go | 3 +- internal/archiver/archiveWorker.go | 81 +++++++++++++++++++ internal/repository/archiveWorker.go | 112 --------------------------- internal/repository/job.go | 70 ++++++++++++++--- 6 files changed, 147 insertions(+), 127 deletions(-) create mode 100644 internal/archiver/archiveWorker.go delete mode 100644 internal/repository/archiveWorker.go diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index 7ca9ccb..bc20fcf 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -20,6 +20,7 @@ import ( "github.com/99designs/gqlgen/graphql/handler" "github.com/99designs/gqlgen/graphql/playground" "github.com/ClusterCockpit/cc-backend/internal/api" + "github.com/ClusterCockpit/cc-backend/internal/archiver" "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" @@ -308,5 +309,5 @@ func serverShutdown() { server.Shutdown(context.Background()) // Then, wait for any async archivings still pending... - apiHandle.JobRepository.WaitForArchiving() + archiver.WaitForArchiving() } diff --git a/internal/api/api_test.go b/internal/api/api_test.go index a6d183e..bb1ff6f 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/ClusterCockpit/cc-backend/internal/api" + "github.com/ClusterCockpit/cc-backend/internal/archiver" "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" @@ -312,7 +313,7 @@ func TestRestApi(t *testing.T) { t.Fatal(response.Status, recorder.Body.String()) } - restapi.JobRepository.WaitForArchiving() + archiver.WaitForArchiving() resolver := graph.GetResolverInstance() job, err := resolver.Query().Job(ctx, strconv.Itoa(int(dbid))) if err != nil { @@ -423,7 +424,7 @@ func TestRestApi(t *testing.T) { t.Fatal(response.Status, recorder.Body.String()) } - restapi.JobRepository.WaitForArchiving() + archiver.WaitForArchiving() jobid, cluster := int64(12345), "testcluster" job, err := restapi.JobRepository.Find(&jobid, &cluster, nil) if err != nil { diff --git a/internal/api/rest.go b/internal/api/rest.go index da0f4be..b737090 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/ClusterCockpit/cc-backend/internal/archiver" "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" @@ -1081,7 +1082,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo } // Trigger async archiving - api.JobRepository.TriggerArchiving(job) + archiver.TriggerArchiving(job) } func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) { diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go new file mode 100644 index 0000000..6757a0b --- /dev/null +++ b/internal/archiver/archiveWorker.go @@ -0,0 +1,81 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package archiver + +import ( + "context" + "sync" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +var ( + archivePending sync.WaitGroup + archiveChannel chan *schema.Job + r *repository.JobRepository +) + +func Start(jobRepo *repository.JobRepository) { + archiveChannel = make(chan *schema.Job, 128) + + go archivingWorker() +} + +// Archiving worker thread +func archivingWorker() { + for { + select { + case job, ok := <-archiveChannel: + if !ok { + break + } + start := time.Now() + // not using meta data, called to load JobMeta into Cache? + // will fail if job meta not in repository + if _, err := r.FetchMetadata(job); err != nil { + log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error()) + r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) + continue + } + + // ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend + // TODO: Maybe use context with cancel/timeout here + jobMeta, err := ArchiveJob(job, context.Background()) + if err != nil { + log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) + r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) + continue + } + + if err := r.UpdateFootprint(jobMeta); err != nil { + log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) + continue + } + // Update the jobs database entry one last time: + if err := r.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil { + log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error()) + continue + } + log.Debugf("archiving job %d took %s", job.JobID, time.Since(start)) + log.Printf("archiving job (dbid: %d) successful", job.ID) + archivePending.Done() + } + } +} + +// Trigger async archiving +func TriggerArchiving(job *schema.Job) { + archivePending.Add(1) + archiveChannel <- job +} + +// Wait for background thread to finish pending archiving operations +func WaitForArchiving() { + // close channel and wait for worker to process remaining jobs + archivePending.Wait() +} diff --git a/internal/repository/archiveWorker.go b/internal/repository/archiveWorker.go deleted file mode 100644 index 7094b7c..0000000 --- a/internal/repository/archiveWorker.go +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package repository - -import ( - "context" - "encoding/json" - "time" - - "github.com/ClusterCockpit/cc-backend/internal/archiver" - "github.com/ClusterCockpit/cc-backend/pkg/archive" - "github.com/ClusterCockpit/cc-backend/pkg/log" - "github.com/ClusterCockpit/cc-backend/pkg/schema" - sq "github.com/Masterminds/squirrel" -) - -// Archiving worker thread -func (r *JobRepository) archivingWorker() { - for { - select { - case job, ok := <-r.archiveChannel: - if !ok { - break - } - start := time.Now() - // not using meta data, called to load JobMeta into Cache? - // will fail if job meta not in repository - if _, err := r.FetchMetadata(job); err != nil { - log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error()) - r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) - continue - } - - // metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend - // TODO: Maybe use context with cancel/timeout here - jobMeta, err := archiver.ArchiveJob(job, context.Background()) - if err != nil { - log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) - r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) - continue - } - - // Update the jobs database entry one last time: - if err := r.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil { - log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error()) - continue - } - log.Debugf("archiving job %d took %s", job.JobID, time.Since(start)) - log.Printf("archiving job (dbid: %d) successful", job.ID) - r.archivePending.Done() - } - } -} - -// Stop updates the job with the database id jobId using the provided arguments. -func (r *JobRepository) MarkArchived( - jobMeta *schema.JobMeta, - monitoringStatus int32, -) error { - stmt := sq.Update("job"). - Set("monitoring_status", monitoringStatus). - Where("job.id = ?", jobMeta.JobID) - - sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) - if err != nil { - log.Errorf("cannot get subcluster: %s", err.Error()) - return err - } - footprint := make(map[string]float64) - - for _, fp := range sc.Footprint { - footprint[fp] = LoadJobStat(jobMeta, fp) - } - - var rawFootprint []byte - - if rawFootprint, err = json.Marshal(footprint); err != nil { - log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID) - return err - } - - stmt = stmt.Set("footprint", rawFootprint) - - if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { - log.Warn("Error while marking job as archived") - return err - } - return nil -} - -func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) { - stmt := sq.Update("job"). - Set("monitoring_status", monitoringStatus). - Where("job.id = ?", job) - - _, err = stmt.RunWith(r.stmtCache).Exec() - return -} - -// Trigger async archiving -func (r *JobRepository) TriggerArchiving(job *schema.Job) { - r.archivePending.Add(1) - r.archiveChannel <- job -} - -// Wait for background thread to finish pending archiving operations -func (r *JobRepository) WaitForArchiving() { - // close channel and wait for worker to process remaining jobs - r.archivePending.Wait() -} diff --git a/internal/repository/job.go b/internal/repository/job.go index ca8350f..2c5eef8 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -28,12 +28,10 @@ var ( ) type JobRepository struct { - DB *sqlx.DB - stmtCache *sq.StmtCache - cache *lrucache.Cache - archiveChannel chan *schema.Job - driver string - archivePending sync.WaitGroup + DB *sqlx.DB + stmtCache *sq.StmtCache + cache *lrucache.Cache + driver string } func GetJobRepository() *JobRepository { @@ -44,12 +42,9 @@ func GetJobRepository() *JobRepository { DB: db.DB, driver: db.Driver, - stmtCache: sq.NewStmtCache(db.DB), - cache: lrucache.New(1024 * 1024), - archiveChannel: make(chan *schema.Job, 128), + stmtCache: sq.NewStmtCache(db.DB), + cache: lrucache.New(1024 * 1024), } - // start archiving worker - go jobRepoInstance.archivingWorker() }) return jobRepoInstance } @@ -495,3 +490,56 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64 log.Infof("Return job count %d", len(jobs)) return jobs, nil } + +func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) { + stmt := sq.Update("job"). + Set("monitoring_status", monitoringStatus). + Where("job.id = ?", job) + + _, err = stmt.RunWith(r.stmtCache).Exec() + return +} + +// Stop updates the job with the database id jobId using the provided arguments. +func (r *JobRepository) MarkArchived( + jobMeta *schema.JobMeta, + monitoringStatus int32, +) error { + stmt := sq.Update("job"). + Set("monitoring_status", monitoringStatus). + Where("job.id = ?", jobMeta.JobID) + + if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { + log.Warn("Error while marking job as archived") + return err + } + return nil +} + +func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error { + sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) + if err != nil { + log.Errorf("cannot get subcluster: %s", err.Error()) + return err + } + footprint := make(map[string]float64) + + for _, fp := range sc.Footprint { + footprint[fp] = LoadJobStat(jobMeta, fp) + } + + var rawFootprint []byte + + if rawFootprint, err = json.Marshal(footprint); err != nil { + log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID) + return err + } + + stmt := sq.Update("job").Set("footprint", rawFootprint) + + if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { + log.Warn("Error while updating job footprint") + return err + } + return nil +}