mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2024-12-25 04:49:05 +01:00
Refactor: Archive workers and Tasks
Work in progress
This commit is contained in:
parent
e348ec74fd
commit
01a4d33514
@ -16,7 +16,6 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"runtime"
|
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@ -34,13 +33,13 @@ import (
|
|||||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/routerConfig"
|
"github.com/ClusterCockpit/cc-backend/internal/routerConfig"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/taskManager"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/util"
|
"github.com/ClusterCockpit/cc-backend/internal/util"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/runtimeEnv"
|
"github.com/ClusterCockpit/cc-backend/pkg/runtimeEnv"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
"github.com/ClusterCockpit/cc-backend/web"
|
"github.com/ClusterCockpit/cc-backend/web"
|
||||||
"github.com/go-co-op/gocron"
|
|
||||||
"github.com/google/gops/agent"
|
"github.com/google/gops/agent"
|
||||||
"github.com/gorilla/handlers"
|
"github.com/gorilla/handlers"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
@ -628,79 +627,21 @@ func main() {
|
|||||||
api.JobRepository.WaitForArchiving()
|
api.JobRepository.WaitForArchiving()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
s := gocron.NewScheduler(time.Local)
|
|
||||||
|
|
||||||
if config.Keys.StopJobsExceedingWalltime > 0 {
|
if config.Keys.StopJobsExceedingWalltime > 0 {
|
||||||
log.Info("Register undead jobs service")
|
taskManager.RegisterStopJobsExceedTime()
|
||||||
|
|
||||||
s.Every(1).Day().At("3:00").Do(func() {
|
|
||||||
err = jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("Error while looking for jobs exceeding their walltime: %s", err.Error())
|
|
||||||
}
|
|
||||||
runtime.GC()
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var cfg struct {
|
var cfg struct {
|
||||||
Retention schema.Retention `json:"retention"`
|
Retention schema.Retention `json:"retention"`
|
||||||
Compression int `json:"compression"`
|
Compression int `json:"compression"`
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg.Retention.IncludeDB = true
|
cfg.Retention.IncludeDB = true
|
||||||
|
|
||||||
if err = json.Unmarshal(config.Keys.Archive, &cfg); err != nil {
|
if err = json.Unmarshal(config.Keys.Archive, &cfg); err != nil {
|
||||||
log.Warn("Error while unmarshaling raw config json")
|
log.Warn("Error while unmarshaling raw config json")
|
||||||
}
|
}
|
||||||
|
|
||||||
switch cfg.Retention.Policy {
|
taskManager.RegisterRetentionService(cfg.Retention)
|
||||||
case "delete":
|
|
||||||
log.Info("Register retention delete service")
|
|
||||||
|
|
||||||
s.Every(1).Day().At("4:00").Do(func() {
|
|
||||||
startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600)
|
|
||||||
jobs, err := jobRepo.FindJobsBetween(0, startTime)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("Error while looking for retention jobs: %s", err.Error())
|
|
||||||
}
|
|
||||||
archive.GetHandle().CleanUp(jobs)
|
|
||||||
|
|
||||||
if cfg.Retention.IncludeDB {
|
|
||||||
cnt, err := jobRepo.DeleteJobsBefore(startTime)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Error while deleting retention jobs from db: %s", err.Error())
|
|
||||||
} else {
|
|
||||||
log.Infof("Retention: Removed %d jobs from db", cnt)
|
|
||||||
}
|
|
||||||
if err = jobRepo.Optimize(); err != nil {
|
|
||||||
log.Errorf("Error occured in db optimization: %s", err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
case "move":
|
|
||||||
log.Info("Register retention move service")
|
|
||||||
|
|
||||||
s.Every(1).Day().At("4:00").Do(func() {
|
|
||||||
startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600)
|
|
||||||
jobs, err := jobRepo.FindJobsBetween(0, startTime)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("Error while looking for retention jobs: %s", err.Error())
|
|
||||||
}
|
|
||||||
archive.GetHandle().Move(jobs, cfg.Retention.Location)
|
|
||||||
|
|
||||||
if cfg.Retention.IncludeDB {
|
|
||||||
cnt, err := jobRepo.DeleteJobsBefore(startTime)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Error while deleting retention jobs from db: %v", err)
|
|
||||||
} else {
|
|
||||||
log.Infof("Retention: Removed %d jobs from db", cnt)
|
|
||||||
}
|
|
||||||
if err = jobRepo.Optimize(); err != nil {
|
|
||||||
log.Errorf("Error occured in db optimization: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
if cfg.Compression > 0 {
|
if cfg.Compression > 0 {
|
||||||
log.Info("Register compression service")
|
log.Info("Register compression service")
|
||||||
|
2
go.mod
2
go.mod
@ -42,6 +42,7 @@ require (
|
|||||||
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
|
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
|
||||||
github.com/felixge/httpsnoop v1.0.4 // indirect
|
github.com/felixge/httpsnoop v1.0.4 // indirect
|
||||||
github.com/go-asn1-ber/asn1-ber v1.5.7 // indirect
|
github.com/go-asn1-ber/asn1-ber v1.5.7 // indirect
|
||||||
|
github.com/go-co-op/gocron/v2 v2.9.0 // indirect
|
||||||
github.com/go-jose/go-jose/v4 v4.0.3 // indirect
|
github.com/go-jose/go-jose/v4 v4.0.3 // indirect
|
||||||
github.com/go-openapi/jsonpointer v0.21.0 // indirect
|
github.com/go-openapi/jsonpointer v0.21.0 // indirect
|
||||||
github.com/go-openapi/jsonreference v0.21.0 // indirect
|
github.com/go-openapi/jsonreference v0.21.0 // indirect
|
||||||
@ -54,6 +55,7 @@ require (
|
|||||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||||
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
|
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
|
||||||
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf // indirect
|
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf // indirect
|
||||||
|
github.com/jonboulle/clockwork v0.4.0 // indirect
|
||||||
github.com/josharian/intern v1.0.0 // indirect
|
github.com/josharian/intern v1.0.0 // indirect
|
||||||
github.com/jpillora/backoff v1.0.0 // indirect
|
github.com/jpillora/backoff v1.0.0 // indirect
|
||||||
github.com/json-iterator/go v1.1.12 // indirect
|
github.com/json-iterator/go v1.1.12 // indirect
|
||||||
|
4
go.sum
4
go.sum
@ -61,6 +61,8 @@ github.com/go-asn1-ber/asn1-ber v1.5.7 h1:DTX+lbVTWaTw1hQ+PbZPlnDZPEIs0SS/GCZAl5
|
|||||||
github.com/go-asn1-ber/asn1-ber v1.5.7/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0=
|
github.com/go-asn1-ber/asn1-ber v1.5.7/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0=
|
||||||
github.com/go-co-op/gocron v1.37.0 h1:ZYDJGtQ4OMhTLKOKMIch+/CY70Brbb1dGdooLEhh7b0=
|
github.com/go-co-op/gocron v1.37.0 h1:ZYDJGtQ4OMhTLKOKMIch+/CY70Brbb1dGdooLEhh7b0=
|
||||||
github.com/go-co-op/gocron v1.37.0/go.mod h1:3L/n6BkO7ABj+TrfSVXLRzsP26zmikL4ISkLQ0O8iNY=
|
github.com/go-co-op/gocron v1.37.0/go.mod h1:3L/n6BkO7ABj+TrfSVXLRzsP26zmikL4ISkLQ0O8iNY=
|
||||||
|
github.com/go-co-op/gocron/v2 v2.9.0 h1:+0nTyI3mjc2FGIClBdDWpaLPCNrJ+62o9xbS0ZklEKQ=
|
||||||
|
github.com/go-co-op/gocron/v2 v2.9.0/go.mod h1:xY7bJxGazKam1cz04EebrlP4S9q4iWdiAylMGP3jY9w=
|
||||||
github.com/go-jose/go-jose/v4 v4.0.3 h1:o8aphO8Hv6RPmH+GfzVuyf7YXSBibp+8YyHdOoDESGo=
|
github.com/go-jose/go-jose/v4 v4.0.3 h1:o8aphO8Hv6RPmH+GfzVuyf7YXSBibp+8YyHdOoDESGo=
|
||||||
github.com/go-jose/go-jose/v4 v4.0.3/go.mod h1:NKb5HO1EZccyMpiZNbdUw/14tiXNyUJh188dfnMCAfc=
|
github.com/go-jose/go-jose/v4 v4.0.3/go.mod h1:NKb5HO1EZccyMpiZNbdUw/14tiXNyUJh188dfnMCAfc=
|
||||||
github.com/go-ldap/ldap/v3 v3.4.8 h1:loKJyspcRezt2Q3ZRMq2p/0v8iOurlmeXDPw6fikSvQ=
|
github.com/go-ldap/ldap/v3 v3.4.8 h1:loKJyspcRezt2Q3ZRMq2p/0v8iOurlmeXDPw6fikSvQ=
|
||||||
@ -133,6 +135,8 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ
|
|||||||
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
|
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
|
||||||
github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o=
|
github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o=
|
||||||
github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY=
|
github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY=
|
||||||
|
github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
|
||||||
|
github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc=
|
||||||
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
|
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
|
||||||
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
|
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
|
||||||
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
|
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
|
||||||
|
@ -162,7 +162,7 @@ func LoadData(job *schema.Job,
|
|||||||
ttl = 2 * time.Minute
|
ttl = 2 * time.Minute
|
||||||
}
|
}
|
||||||
|
|
||||||
prepareJobData(job, jd, scopes)
|
prepareJobData(jd, scopes)
|
||||||
|
|
||||||
return jd, ttl, size
|
return jd, ttl, size
|
||||||
})
|
})
|
||||||
@ -266,7 +266,6 @@ func cacheKey(
|
|||||||
// statisticsSeries should be available so that a min/median/max Graph can be
|
// statisticsSeries should be available so that a min/median/max Graph can be
|
||||||
// used instead of a lot of single lines.
|
// used instead of a lot of single lines.
|
||||||
func prepareJobData(
|
func prepareJobData(
|
||||||
job *schema.Job,
|
|
||||||
jobData schema.JobData,
|
jobData schema.JobData,
|
||||||
scopes []schema.MetricScope,
|
scopes []schema.MetricScope,
|
||||||
) {
|
) {
|
||||||
|
112
internal/repository/archiveWorker.go
Normal file
112
internal/repository/archiveWorker.go
Normal file
@ -0,0 +1,112 @@
|
|||||||
|
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package repository
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
|
sq "github.com/Masterminds/squirrel"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Archiving worker thread
|
||||||
|
func (r *JobRepository) archivingWorker() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case job, ok := <-r.archiveChannel:
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
start := time.Now()
|
||||||
|
// not using meta data, called to load JobMeta into Cache?
|
||||||
|
// will fail if job meta not in repository
|
||||||
|
if _, err := r.FetchMetadata(job); err != nil {
|
||||||
|
log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error())
|
||||||
|
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
|
||||||
|
// TODO: Maybe use context with cancel/timeout here
|
||||||
|
jobMeta, err := metricdata.ArchiveJob(job, context.Background())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error())
|
||||||
|
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the jobs database entry one last time:
|
||||||
|
if err := r.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil {
|
||||||
|
log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
|
||||||
|
log.Printf("archiving job (dbid: %d) successful", job.ID)
|
||||||
|
r.archivePending.Done()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop updates the job with the database id jobId using the provided arguments.
|
||||||
|
func (r *JobRepository) MarkArchived(
|
||||||
|
jobMeta *schema.JobMeta,
|
||||||
|
monitoringStatus int32,
|
||||||
|
) error {
|
||||||
|
stmt := sq.Update("job").
|
||||||
|
Set("monitoring_status", monitoringStatus).
|
||||||
|
Where("job.id = ?", jobMeta.JobID)
|
||||||
|
|
||||||
|
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("cannot get subcluster: %s", err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
footprint := make(map[string]float64)
|
||||||
|
|
||||||
|
for _, fp := range sc.Footprint {
|
||||||
|
footprint[fp] = LoadJobStat(jobMeta, fp)
|
||||||
|
}
|
||||||
|
|
||||||
|
var rawFootprint []byte
|
||||||
|
|
||||||
|
if rawFootprint, err = json.Marshal(footprint); err != nil {
|
||||||
|
log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt = stmt.Set("footprint", rawFootprint)
|
||||||
|
|
||||||
|
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
|
||||||
|
log.Warn("Error while marking job as archived")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) {
|
||||||
|
stmt := sq.Update("job").
|
||||||
|
Set("monitoring_status", monitoringStatus).
|
||||||
|
Where("job.id = ?", job)
|
||||||
|
|
||||||
|
_, err = stmt.RunWith(r.stmtCache).Exec()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trigger async archiving
|
||||||
|
func (r *JobRepository) TriggerArchiving(job *schema.Job) {
|
||||||
|
r.archivePending.Add(1)
|
||||||
|
r.archiveChannel <- job
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for background thread to finish pending archiving operations
|
||||||
|
func (r *JobRepository) WaitForArchiving() {
|
||||||
|
// close channel and wait for worker to process remaining jobs
|
||||||
|
r.archivePending.Wait()
|
||||||
|
}
|
8
internal/repository/footprintWorker.go
Normal file
8
internal/repository/footprintWorker.go
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package repository
|
||||||
|
|
||||||
|
func (r *JobRepository) footprintWorker() {
|
||||||
|
}
|
@ -5,7 +5,6 @@
|
|||||||
package repository
|
package repository
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
@ -15,7 +14,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
|
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
|
||||||
@ -278,101 +276,6 @@ func (r *JobRepository) DeleteJobById(id int64) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) {
|
|
||||||
stmt := sq.Update("job").
|
|
||||||
Set("monitoring_status", monitoringStatus).
|
|
||||||
Where("job.id = ?", job)
|
|
||||||
|
|
||||||
_, err = stmt.RunWith(r.stmtCache).Exec()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop updates the job with the database id jobId using the provided arguments.
|
|
||||||
func (r *JobRepository) MarkArchived(
|
|
||||||
jobMeta *schema.JobMeta,
|
|
||||||
monitoringStatus int32,
|
|
||||||
) error {
|
|
||||||
stmt := sq.Update("job").
|
|
||||||
Set("monitoring_status", monitoringStatus).
|
|
||||||
Where("job.id = ?", jobMeta.JobID)
|
|
||||||
|
|
||||||
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("cannot get subcluster: %s", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
footprint := make(map[string]float64)
|
|
||||||
|
|
||||||
for _, fp := range sc.Footprint {
|
|
||||||
footprint[fp] = LoadJobStat(jobMeta, fp)
|
|
||||||
}
|
|
||||||
|
|
||||||
var rawFootprint []byte
|
|
||||||
|
|
||||||
if rawFootprint, err = json.Marshal(footprint); err != nil {
|
|
||||||
log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
stmt = stmt.Set("footprint", rawFootprint)
|
|
||||||
|
|
||||||
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
|
|
||||||
log.Warn("Error while marking job as archived")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Archiving worker thread
|
|
||||||
func (r *JobRepository) archivingWorker() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case job, ok := <-r.archiveChannel:
|
|
||||||
if !ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
start := time.Now()
|
|
||||||
// not using meta data, called to load JobMeta into Cache?
|
|
||||||
// will fail if job meta not in repository
|
|
||||||
if _, err := r.FetchMetadata(job); err != nil {
|
|
||||||
log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error())
|
|
||||||
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
|
|
||||||
// TODO: Maybe use context with cancel/timeout here
|
|
||||||
jobMeta, err := metricdata.ArchiveJob(job, context.Background())
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error())
|
|
||||||
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the jobs database entry one last time:
|
|
||||||
if err := r.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil {
|
|
||||||
log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
|
|
||||||
log.Printf("archiving job (dbid: %d) successful", job.ID)
|
|
||||||
r.archivePending.Done()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Trigger async archiving
|
|
||||||
func (r *JobRepository) TriggerArchiving(job *schema.Job) {
|
|
||||||
r.archivePending.Add(1)
|
|
||||||
r.archiveChannel <- job
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for background thread to finish pending archiving operations
|
|
||||||
func (r *JobRepository) WaitForArchiving() {
|
|
||||||
// close channel and wait for worker to process remaining jobs
|
|
||||||
r.archivePending.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm string) (jobid string, username string, project string, jobname string) {
|
func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm string) (jobid string, username string, project string, jobname string) {
|
||||||
if _, err := strconv.Atoi(searchterm); err == nil { // Return empty on successful conversion: parent method will redirect for integer jobId
|
if _, err := strconv.Atoi(searchterm); err == nil { // Return empty on successful conversion: parent method will redirect for integer jobId
|
||||||
return searchterm, "", "", ""
|
return searchterm, "", "", ""
|
||||||
|
70
internal/taskManager/retentionService.go
Normal file
70
internal/taskManager/retentionService.go
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package taskManager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
|
"github.com/go-co-op/gocron/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func RegisterRetentionService(cfg schema.Retention) {
|
||||||
|
switch cfg.Policy {
|
||||||
|
case "delete":
|
||||||
|
|
||||||
|
log.Info("Register retention delete service")
|
||||||
|
|
||||||
|
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(04, 0, 0))),
|
||||||
|
gocron.NewTask(
|
||||||
|
func() {
|
||||||
|
startTime := time.Now().Unix() - int64(cfg.Age*24*3600)
|
||||||
|
jobs, err := jobRepo.FindJobsBetween(0, startTime)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("Error while looking for retention jobs: %s", err.Error())
|
||||||
|
}
|
||||||
|
archive.GetHandle().CleanUp(jobs)
|
||||||
|
|
||||||
|
if cfg.IncludeDB {
|
||||||
|
cnt, err := jobRepo.DeleteJobsBefore(startTime)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error while deleting retention jobs from db: %s", err.Error())
|
||||||
|
} else {
|
||||||
|
log.Infof("Retention: Removed %d jobs from db", cnt)
|
||||||
|
}
|
||||||
|
if err = jobRepo.Optimize(); err != nil {
|
||||||
|
log.Errorf("Error occured in db optimization: %s", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
case "move":
|
||||||
|
log.Info("Register retention move service")
|
||||||
|
|
||||||
|
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(04, 0, 0))),
|
||||||
|
gocron.NewTask(
|
||||||
|
func() {
|
||||||
|
startTime := time.Now().Unix() - int64(cfg.Age*24*3600)
|
||||||
|
jobs, err := jobRepo.FindJobsBetween(0, startTime)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("Error while looking for retention jobs: %s", err.Error())
|
||||||
|
}
|
||||||
|
archive.GetHandle().Move(jobs, cfg.Location)
|
||||||
|
|
||||||
|
if cfg.IncludeDB {
|
||||||
|
cnt, err := jobRepo.DeleteJobsBefore(startTime)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error while deleting retention jobs from db: %v", err)
|
||||||
|
} else {
|
||||||
|
log.Infof("Retention: Removed %d jobs from db", cnt)
|
||||||
|
}
|
||||||
|
if err = jobRepo.Optimize(); err != nil {
|
||||||
|
log.Errorf("Error occured in db optimization: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
27
internal/taskManager/stopJobsExceedTime.go
Normal file
27
internal/taskManager/stopJobsExceedTime.go
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package taskManager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"runtime"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
|
"github.com/go-co-op/gocron/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func RegisterStopJobsExceedTime() {
|
||||||
|
log.Info("Register undead jobs service")
|
||||||
|
|
||||||
|
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(03, 0, 0))),
|
||||||
|
gocron.NewTask(
|
||||||
|
func() {
|
||||||
|
err := jobRepo.StopJobsExceedingWalltimeBy(config.Keys.StopJobsExceedingWalltime)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("Error while looking for jobs exceeding their walltime: %s", err.Error())
|
||||||
|
}
|
||||||
|
runtime.GC()
|
||||||
|
}))
|
||||||
|
}
|
29
internal/taskManager/taskManager.go
Normal file
29
internal/taskManager/taskManager.go
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package taskManager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
|
"github.com/go-co-op/gocron/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
s gocron.Scheduler
|
||||||
|
jobRepo *repository.JobRepository
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
var err error
|
||||||
|
jobRepo = repository.GetJobRepository()
|
||||||
|
s, err = gocron.NewScheduler()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error while creating gocron scheduler: %s", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Shutdown() {
|
||||||
|
s.Shutdown()
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user