mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-07-23 12:51:40 +02:00
@@ -16,6 +16,7 @@ import (
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
@@ -212,7 +213,7 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
|
||||
}
|
||||
|
||||
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
|
||||
return nil
|
||||
return archive.UpdateMetadata(job, job.MetaData)
|
||||
}
|
||||
|
||||
// Find executes a SQL query to find a specific batch job.
|
||||
@@ -223,8 +224,8 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
|
||||
func (r *JobRepository) Find(
|
||||
jobId *int64,
|
||||
cluster *string,
|
||||
startTime *int64) (*schema.Job, error) {
|
||||
|
||||
startTime *int64,
|
||||
) (*schema.Job, error) {
|
||||
start := time.Now()
|
||||
q := sq.Select(jobColumns...).From("job").
|
||||
Where("job.job_id = ?", *jobId)
|
||||
@@ -248,8 +249,8 @@ func (r *JobRepository) Find(
|
||||
func (r *JobRepository) FindAll(
|
||||
jobId *int64,
|
||||
cluster *string,
|
||||
startTime *int64) ([]*schema.Job, error) {
|
||||
|
||||
startTime *int64,
|
||||
) ([]*schema.Job, error) {
|
||||
start := time.Now()
|
||||
q := sq.Select(jobColumns...).From("job").
|
||||
Where("job.job_id = ?", *jobId)
|
||||
@@ -292,7 +293,8 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {
|
||||
|
||||
func (r *JobRepository) FindConcurrentJobs(
|
||||
ctx context.Context,
|
||||
job *schema.Job) (*model.JobLinkResultList, error) {
|
||||
job *schema.Job,
|
||||
) (*model.JobLinkResultList, error) {
|
||||
if job == nil {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -420,8 +422,8 @@ func (r *JobRepository) Stop(
|
||||
jobId int64,
|
||||
duration int32,
|
||||
state schema.JobState,
|
||||
monitoringStatus int32) (err error) {
|
||||
|
||||
monitoringStatus int32,
|
||||
) (err error) {
|
||||
stmt := sq.Update("job").
|
||||
Set("job_state", state).
|
||||
Set("duration", duration).
|
||||
@@ -435,7 +437,7 @@ func (r *JobRepository) Stop(
|
||||
func (r *JobRepository) DeleteJobsBefore(startTime int64) (int, error) {
|
||||
var cnt int
|
||||
qs := fmt.Sprintf("SELECT count(*) FROM job WHERE job.start_time < %d", startTime)
|
||||
err := r.DB.Get(&cnt, qs) //ignore error as it will also occur in delete statement
|
||||
err := r.DB.Get(&cnt, qs) // ignore error as it will also occur in delete statement
|
||||
_, err = r.DB.Exec(`DELETE FROM job WHERE job.start_time < ?`, startTime)
|
||||
if err != nil {
|
||||
log.Errorf(" DeleteJobsBefore(%d): error %#v", startTime, err)
|
||||
@@ -468,8 +470,8 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
|
||||
func (r *JobRepository) MarkArchived(
|
||||
jobId int64,
|
||||
monitoringStatus int32,
|
||||
metricStats map[string]schema.JobStatistics) error {
|
||||
|
||||
metricStats map[string]schema.JobStatistics,
|
||||
) error {
|
||||
stmt := sq.Update("job").
|
||||
Set("monitoring_status", monitoringStatus).
|
||||
Where("job.id = ?", jobId)
|
||||
@@ -578,8 +580,10 @@ func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm
|
||||
}
|
||||
}
|
||||
|
||||
var ErrNotFound = errors.New("no such jobname, project or user")
|
||||
var ErrForbidden = errors.New("not authorized")
|
||||
var (
|
||||
ErrNotFound = errors.New("no such jobname, project or user")
|
||||
ErrForbidden = errors.New("not authorized")
|
||||
)
|
||||
|
||||
func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, table string, selectColumn string, whereColumn string, isLike bool) (result string, err error) {
|
||||
compareStr := " = ?"
|
||||
@@ -663,7 +667,6 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) {
|
||||
// AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host.
|
||||
// Hosts with zero jobs running on them will not show up!
|
||||
func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) {
|
||||
|
||||
start := time.Now()
|
||||
subclusters := make(map[string]map[string]int)
|
||||
rows, err := sq.Select("resources", "subcluster").From("job").
|
||||
@@ -706,7 +709,6 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
|
||||
}
|
||||
|
||||
func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
|
||||
|
||||
start := time.Now()
|
||||
res, err := sq.Update("job").
|
||||
Set("monitoring_status", schema.MonitoringStatusArchivingFailed).
|
||||
@@ -735,7 +737,6 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
|
||||
}
|
||||
|
||||
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) {
|
||||
|
||||
var query sq.SelectBuilder
|
||||
|
||||
if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {
|
||||
|
Reference in New Issue
Block a user