Merge branch 'dev' into sample_resolution_select

- Moved resample changes to metricDataDispatcher
- Added res argument to archiver, updateFootprintService
This commit is contained in:
Christoph Kluge
2024-09-05 17:26:43 +02:00
25 changed files with 824 additions and 524 deletions

View File

@@ -1,112 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"context"
"encoding/json"
"time"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel"
)
// Archiving worker thread
func (r *JobRepository) archivingWorker() {
for {
select {
case job, ok := <-r.archiveChannel:
if !ok {
break
}
start := time.Now()
// not using meta data, called to load JobMeta into Cache?
// will fail if job meta not in repository
if _, err := r.FetchMetadata(job); err != nil {
log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error())
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
continue
}
// metricdata.ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
// TODO: Maybe use context with cancel/timeout here
jobMeta, err := metricdata.ArchiveJob(job, context.Background())
if err != nil {
log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error())
r.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
continue
}
// Update the jobs database entry one last time:
if err := r.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil {
log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error())
continue
}
log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
log.Printf("archiving job (dbid: %d) successful", job.ID)
r.archivePending.Done()
}
}
}
// Stop updates the job with the database id jobId using the provided arguments.
func (r *JobRepository) MarkArchived(
jobMeta *schema.JobMeta,
monitoringStatus int32,
) error {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", jobMeta.JobID)
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
if err != nil {
log.Errorf("cannot get subcluster: %s", err.Error())
return err
}
footprint := make(map[string]float64)
for _, fp := range sc.Footprint {
footprint[fp] = LoadJobStat(jobMeta, fp)
}
var rawFootprint []byte
if rawFootprint, err = json.Marshal(footprint); err != nil {
log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID)
return err
}
stmt = stmt.Set("footprint", rawFootprint)
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
log.Warn("Error while marking job as archived")
return err
}
return nil
}
func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", job)
_, err = stmt.RunWith(r.stmtCache).Exec()
return
}
// Trigger async archiving
func (r *JobRepository) TriggerArchiving(job *schema.Job) {
r.archivePending.Add(1)
r.archiveChannel <- job
}
// Wait for background thread to finish pending archiving operations
func (r *JobRepository) WaitForArchiving() {
// close channel and wait for worker to process remaining jobs
r.archivePending.Wait()
}

View File

@@ -28,12 +28,10 @@ var (
)
type JobRepository struct {
DB *sqlx.DB
stmtCache *sq.StmtCache
cache *lrucache.Cache
archiveChannel chan *schema.Job
driver string
archivePending sync.WaitGroup
DB *sqlx.DB
stmtCache *sq.StmtCache
cache *lrucache.Cache
driver string
}
func GetJobRepository() *JobRepository {
@@ -44,12 +42,9 @@ func GetJobRepository() *JobRepository {
DB: db.DB,
driver: db.Driver,
stmtCache: sq.NewStmtCache(db.DB),
cache: lrucache.New(1024 * 1024),
archiveChannel: make(chan *schema.Job, 128),
stmtCache: sq.NewStmtCache(db.DB),
cache: lrucache.New(1024 * 1024),
}
// start archiving worker
go jobRepoInstance.archivingWorker()
})
return jobRepoInstance
}
@@ -210,7 +205,10 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
return err
}
if _, err = sq.Update("job").Set("meta_data", job.RawMetaData).Where("job.id = ?", job.ID).RunWith(r.stmtCache).Exec(); err != nil {
if _, err = sq.Update("job").
Set("meta_data", job.RawMetaData).
Where("job.id = ?", job.ID).
RunWith(r.stmtCache).Exec(); err != nil {
log.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID)
return err
}
@@ -458,6 +456,46 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
return nil
}
func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) {
query := sq.Select(jobColumns...).From("job").
Where(fmt.Sprintf("job.cluster = '%s'", cluster)).
Where("job.job_state = 'running'").
Where("job.duration>600")
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
jobs := make([]*schema.Job, 0, 50)
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
rows.Close()
log.Warn("Error while scanning rows")
return nil, err
}
jobs = append(jobs, job)
}
log.Infof("Return job count %d", len(jobs))
return jobs, nil
}
func (r *JobRepository) UpdateDuration() error {
stmnt := sq.Update("job").
Set("duration", sq.Expr("? - job.start_time", time.Now().Unix())).
Where("job_state = 'running'")
_, err := stmnt.RunWith(r.stmtCache).Exec()
if err != nil {
return err
}
return nil
}
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) {
var query sq.SelectBuilder
@@ -495,3 +533,100 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
log.Infof("Return job count %d", len(jobs))
return jobs, nil
}
func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).
Where("job.id = ?", job)
_, err = stmt.RunWith(r.stmtCache).Exec()
return
}
func (r *JobRepository) Execute(stmt sq.UpdateBuilder) error {
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
return err
}
return nil
}
func (r *JobRepository) MarkArchived(
stmt sq.UpdateBuilder,
monitoringStatus int32,
) sq.UpdateBuilder {
return stmt.Set("monitoring_status", monitoringStatus)
}
func (r *JobRepository) UpdateEnergy(
stmt sq.UpdateBuilder,
jobMeta *schema.JobMeta,
) (sq.UpdateBuilder, error) {
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
if err != nil {
log.Errorf("cannot get subcluster: %s", err.Error())
return stmt, err
}
energyFootprint := make(map[string]float64)
var totalEnergy float64
var energy float64
for _, fp := range sc.EnergyFootprint {
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil {
// FIXME: Check for unit conversions
if sc.MetricConfig[i].Energy == "power" {
energy = LoadJobStat(jobMeta, fp, "avg") * float64(jobMeta.Duration)
} else if sc.MetricConfig[i].Energy == "energy" {
// This assumes the metric is of aggregation type sum
}
}
energyFootprint[fp] = energy
totalEnergy += energy
}
var rawFootprint []byte
if rawFootprint, err = json.Marshal(energyFootprint); err != nil {
log.Warnf("Error while marshaling energy footprint for job, DB ID '%v'", jobMeta.ID)
return stmt, err
}
stmt.Set("energy_footprint", rawFootprint).
Set("energy", totalEnergy)
return stmt, nil
}
func (r *JobRepository) UpdateFootprint(
stmt sq.UpdateBuilder,
jobMeta *schema.JobMeta,
) (sq.UpdateBuilder, error) {
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
if err != nil {
log.Errorf("cannot get subcluster: %s", err.Error())
return stmt, err
}
footprint := make(map[string]float64)
for _, fp := range sc.Footprint {
statType := "avg"
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil {
statType = sc.MetricConfig[i].Footprint
}
name := fmt.Sprintf("%s_%s", fp, statType)
footprint[fp] = LoadJobStat(jobMeta, name, statType)
}
var rawFootprint []byte
if rawFootprint, err = json.Marshal(footprint); err != nil {
log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID)
return stmt, err
}
stmt.Set("footprint", rawFootprint)
return stmt, nil
}

View File

@@ -13,7 +13,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
@@ -292,13 +292,17 @@ func (r *JobRepository) JobsStats(
return stats, nil
}
// FIXME: Make generic
func LoadJobStat(job *schema.JobMeta, metric string) float64 {
func LoadJobStat(job *schema.JobMeta, metric string, statType string) float64 {
if stats, ok := job.Statistics[metric]; ok {
if metric == "mem_used" {
return stats.Max
} else {
switch statType {
case "avg":
return stats.Avg
case "max":
return stats.Max
case "min":
return stats.Min
default:
log.Errorf("Unknown stat type %s", statType)
}
}
@@ -697,7 +701,7 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram(
continue
}
if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil {
if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil {
log.Errorf("Error while loading averages for histogram: %s", err)
return nil
}

View File

@@ -6,7 +6,6 @@ package repository
import (
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/jmoiron/sqlx"
)
@@ -18,20 +17,12 @@ type Transaction struct {
func (r *JobRepository) TransactionInit() (*Transaction, error) {
var err error
t := new(Transaction)
// Inserts are bundled into transactions because in sqlite,
// that speeds up inserts A LOT.
t.tx, err = r.DB.Beginx()
if err != nil {
log.Warn("Error while bundling transactions")
return nil, err
}
t.stmt, err = t.tx.PrepareNamed(NamedJobInsert)
if err != nil {
log.Warn("Error while preparing namedJobInsert")
return nil, err
}
return t, nil
}
@@ -50,7 +41,6 @@ func (r *JobRepository) TransactionCommit(t *Transaction) error {
return err
}
t.stmt = t.tx.NamedStmt(t.stmt)
return nil
}
@@ -63,10 +53,14 @@ func (r *JobRepository) TransactionEnd(t *Transaction) error {
return nil
}
func (r *JobRepository) TransactionAdd(t *Transaction, job schema.Job) (int64, error) {
res, err := t.stmt.Exec(job)
func (r *JobRepository) TransactionAddNamed(
t *Transaction,
query string,
args ...interface{},
) (int64, error) {
res, err := t.tx.NamedExec(query, args)
if err != nil {
log.Errorf("repository initDB(): %v", err)
log.Errorf("Named Exec failed: %v", err)
return 0, err
}
@@ -79,26 +73,14 @@ func (r *JobRepository) TransactionAdd(t *Transaction, job schema.Job) (int64, e
return id, nil
}
func (r *JobRepository) TransactionAddTag(t *Transaction, tag *schema.Tag) (int64, error) {
res, err := t.tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type)
func (r *JobRepository) TransactionAdd(t *Transaction, query string, args ...interface{}) (int64, error) {
res := t.tx.MustExec(query, args)
id, err := res.LastInsertId()
if err != nil {
log.Errorf("Error while inserting tag into tag table: %v (Type %v)", tag.Name, tag.Type)
return 0, err
}
tagId, err := res.LastInsertId()
if err != nil {
log.Warn("Error while getting last insert ID")
log.Errorf("repository initDB(): %v", err)
return 0, err
}
return tagId, nil
}
func (r *JobRepository) TransactionSetTag(t *Transaction, jobId int64, tagId int64) error {
if _, err := t.tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, jobId, tagId); err != nil {
log.Errorf("Error while inserting jobtag into jobtag table: %v (TagID %v)", jobId, tagId)
return err
}
return nil
return id, nil
}