Move rest of archiveing code into new archive package

This commit is contained in:
Jan Eitzinger 2024-08-28 11:13:54 +02:00
parent 83df6f015c
commit db5809d522
6 changed files with 147 additions and 127 deletions

View File

@ -20,6 +20,7 @@ import (
"github.com/99designs/gqlgen/graphql/handler" "github.com/99designs/gqlgen/graphql/handler"
"github.com/99designs/gqlgen/graphql/playground" "github.com/99designs/gqlgen/graphql/playground"
"github.com/ClusterCockpit/cc-backend/internal/api" "github.com/ClusterCockpit/cc-backend/internal/api"
"github.com/ClusterCockpit/cc-backend/internal/archiver"
"github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph"
@ -308,5 +309,5 @@ func serverShutdown() {
server.Shutdown(context.Background()) server.Shutdown(context.Background())
// Then, wait for any async archivings still pending... // Then, wait for any async archivings still pending...
apiHandle.JobRepository.WaitForArchiving() archiver.WaitForArchiving()
} }

View File

@ -19,6 +19,7 @@ import (
"testing" "testing"
"github.com/ClusterCockpit/cc-backend/internal/api" "github.com/ClusterCockpit/cc-backend/internal/api"
"github.com/ClusterCockpit/cc-backend/internal/archiver"
"github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph"
@ -312,7 +313,7 @@ func TestRestApi(t *testing.T) {
t.Fatal(response.Status, recorder.Body.String()) t.Fatal(response.Status, recorder.Body.String())
} }
restapi.JobRepository.WaitForArchiving() archiver.WaitForArchiving()
resolver := graph.GetResolverInstance() resolver := graph.GetResolverInstance()
job, err := resolver.Query().Job(ctx, strconv.Itoa(int(dbid))) job, err := resolver.Query().Job(ctx, strconv.Itoa(int(dbid)))
if err != nil { if err != nil {
@ -423,7 +424,7 @@ func TestRestApi(t *testing.T) {
t.Fatal(response.Status, recorder.Body.String()) t.Fatal(response.Status, recorder.Body.String())
} }
restapi.JobRepository.WaitForArchiving() archiver.WaitForArchiving()
jobid, cluster := int64(12345), "testcluster" jobid, cluster := int64(12345), "testcluster"
job, err := restapi.JobRepository.Find(&jobid, &cluster, nil) job, err := restapi.JobRepository.Find(&jobid, &cluster, nil)
if err != nil { if err != nil {

View File

@ -19,6 +19,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/archiver"
"github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph"
@ -1081,7 +1082,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
} }
// Trigger async archiving // Trigger async archiving
api.JobRepository.TriggerArchiving(job) archiver.TriggerArchiving(job)
} }
func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) { func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {

View File

@ -0,0 +1,81 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package archiver
import (
"context"
"sync"
"time"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
var (
archivePending sync.WaitGroup
archiveChannel chan *schema.Job
r *repository.JobRepository
)
func Start(jobRepo *repository.JobRepository) {
archiveChannel = make(chan *schema.Job, 128)
go archivingWorker()
}
// Archiving worker thread
func archivingWorker() {
for {
select {
case job, ok := <-archiveChannel:
if !ok {
break
}
start := time.Now()
// not using meta data, called to load JobMeta into Cache?
// will fail if job meta not in repository
if _, err := r.FetchMetadata(job); err != nil {
log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error())
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
continue
}
// ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
// TODO: Maybe use context with cancel/timeout here
jobMeta, err := ArchiveJob(job, context.Background())
if err != nil {
log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error())
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
continue
}
if err := r.UpdateFootprint(jobMeta); err != nil {
log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error())
continue
}
// Update the jobs database entry one last time:
if err := r.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil {
log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error())
continue
}
log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
log.Printf("archiving job (dbid: %d) successful", job.ID)
archivePending.Done()
}
}
}
// Trigger async archiving
func TriggerArchiving(job *schema.Job) {
archivePending.Add(1)
archiveChannel <- job
}
// Wait for background thread to finish pending archiving operations
func WaitForArchiving() {
// close channel and wait for worker to process remaining jobs
archivePending.Wait()
}

View File

@ -1,112 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"context"
"encoding/json"
"time"
"github.com/ClusterCockpit/cc-backend/internal/archiver"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel"
)
// Archiving worker thread
func (r *JobRepository) archivingWorker() {
for {
select {
case job, ok := <-r.archiveChannel:
if !ok {
break
}
start := time.Now()
// not using meta data, called to load JobMeta into Cache?
// will fail if job meta not in repository
if _, err := r.FetchMetadata(job); err != nil {
log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error())
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
continue
}
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
// TODO: Maybe use context with cancel/timeout here
jobMeta, err := archiver.ArchiveJob(job, context.Background())
if err != nil {
log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error())
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
continue
}
// Update the jobs database entry one last time:
if err := r.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil {
log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error())
continue
}
log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
log.Printf("archiving job (dbid: %d) successful", job.ID)
r.archivePending.Done()
}
}
}
// Stop updates the job with the database id jobId using the provided arguments.
func (r *JobRepository) MarkArchived(
jobMeta *schema.JobMeta,
monitoringStatus int32,
) error {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobMeta.JobID)
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
if err != nil {
log.Errorf("cannot get subcluster: %s", err.Error())
return err
}
footprint := make(map[string]float64)
for _, fp := range sc.Footprint {
footprint[fp] = LoadJobStat(jobMeta, fp)
}
var rawFootprint []byte
if rawFootprint, err = json.Marshal(footprint); err != nil {
log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID)
return err
}
stmt = stmt.Set("footprint", rawFootprint)
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
log.Warn("Error while marking job as archived")
return err
}
return nil
}
func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", job)
_, err = stmt.RunWith(r.stmtCache).Exec()
return
}
// Trigger async archiving
func (r *JobRepository) TriggerArchiving(job *schema.Job) {
r.archivePending.Add(1)
r.archiveChannel <- job
}
// Wait for background thread to finish pending archiving operations
func (r *JobRepository) WaitForArchiving() {
// close channel and wait for worker to process remaining jobs
r.archivePending.Wait()
}

View File

@ -28,12 +28,10 @@ var (
) )
type JobRepository struct { type JobRepository struct {
DB *sqlx.DB DB *sqlx.DB
stmtCache *sq.StmtCache stmtCache *sq.StmtCache
cache *lrucache.Cache cache *lrucache.Cache
archiveChannel chan *schema.Job driver string
driver string
archivePending sync.WaitGroup
} }
func GetJobRepository() *JobRepository { func GetJobRepository() *JobRepository {
@ -44,12 +42,9 @@ func GetJobRepository() *JobRepository {
DB: db.DB, DB: db.DB,
driver: db.Driver, driver: db.Driver,
stmtCache: sq.NewStmtCache(db.DB), stmtCache: sq.NewStmtCache(db.DB),
cache: lrucache.New(1024 * 1024), cache: lrucache.New(1024 * 1024),
archiveChannel: make(chan *schema.Job, 128),
} }
// start archiving worker
go jobRepoInstance.archivingWorker()
}) })
return jobRepoInstance return jobRepoInstance
} }
@ -495,3 +490,56 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
log.Infof("Return job count %d", len(jobs)) log.Infof("Return job count %d", len(jobs))
return jobs, nil return jobs, nil
} }
func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", job)
_, err = stmt.RunWith(r.stmtCache).Exec()
return
}
// Stop updates the job with the database id jobId using the provided arguments.
func (r *JobRepository) MarkArchived(
jobMeta *schema.JobMeta,
monitoringStatus int32,
) error {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobMeta.JobID)
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
log.Warn("Error while marking job as archived")
return err
}
return nil
}
func (r *JobRepository) UpdateFootprint(jobMeta *schema.JobMeta) error {
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
if err != nil {
log.Errorf("cannot get subcluster: %s", err.Error())
return err
}
footprint := make(map[string]float64)
for _, fp := range sc.Footprint {
footprint[fp] = LoadJobStat(jobMeta, fp)
}
var rawFootprint []byte
if rawFootprint, err = json.Marshal(footprint); err != nil {
log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID)
return err
}
stmt := sq.Update("job").Set("footprint", rawFootprint)
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
log.Warn("Error while updating job footprint")
return err
}
return nil
}