mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-07-23 21:01:40 +02:00
Intermediate Save commit
This commit is contained in:
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/units"
|
||||
)
|
||||
|
||||
// `AUTO_INCREMENT` is in a comment because of this hack:
|
||||
@@ -103,11 +104,11 @@ func HandleImportFlag(flag string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if config.Keys.Validate {
|
||||
if err := schema.Validate(schema.Meta, bytes.NewReader(raw)); err != nil {
|
||||
return fmt.Errorf("validate job meta: %v", err)
|
||||
}
|
||||
// if config.Keys.Validate {
|
||||
if err := schema.Validate(schema.Meta, bytes.NewReader(raw)); err != nil {
|
||||
return fmt.Errorf("validate job meta: %v", err)
|
||||
}
|
||||
// }
|
||||
dec := json.NewDecoder(bytes.NewReader(raw))
|
||||
dec.DisallowUnknownFields()
|
||||
jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults}
|
||||
@@ -132,6 +133,7 @@ func HandleImportFlag(flag string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
checkJobData(&jobData)
|
||||
SanityChecks(&jobMeta.BaseJob)
|
||||
jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful
|
||||
if job, err := GetJobRepository().Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows {
|
||||
@@ -368,14 +370,31 @@ func loadJobStat(job *schema.JobMeta, metric string) float64 {
|
||||
}
|
||||
|
||||
func checkJobData(d *schema.JobData) error {
|
||||
// for name, scopes := range *d {
|
||||
for _, scopes := range *d {
|
||||
var newUnit string
|
||||
// Add node scope if missing
|
||||
for _, metric := range scopes {
|
||||
if strings.Contains(metric.Unit.Base, "B/s") ||
|
||||
strings.Contains(metric.Unit.Base, "F/s") ||
|
||||
strings.Contains(metric.Unit.Base, "B") {
|
||||
|
||||
// for scope, metric := range scopes {
|
||||
// // 1. Unit normalisation
|
||||
// First get overall avg
|
||||
sum := 0.0
|
||||
for _, s := range metric.Series {
|
||||
sum += s.Statistics.Avg
|
||||
}
|
||||
|
||||
// }
|
||||
// // 2. Add node scope if missing
|
||||
avg := sum / float64(len(metric.Series))
|
||||
|
||||
// }
|
||||
for _, s := range metric.Series {
|
||||
fp := schema.ConvertFloatToFloat64(s.Data)
|
||||
// Normalize values with new unit prefix
|
||||
units.NormalizeSeries(fp, avg, metric.Unit, &newUnit)
|
||||
s.Data = schema.GetFloat64ToFloat(fp)
|
||||
}
|
||||
metric.Unit = newUnit
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@@ -211,7 +211,13 @@ func (r *JobRepository) Stop(
|
||||
}
|
||||
|
||||
// TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC;
|
||||
func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggregate, filters []*model.JobFilter, weight *model.Weights, limit *int) (map[string]int, error) {
|
||||
func (r *JobRepository) CountGroupedJobs(
|
||||
ctx context.Context,
|
||||
aggreg model.Aggregate,
|
||||
filters []*model.JobFilter,
|
||||
weight *model.Weights,
|
||||
limit *int) (map[string]int, error) {
|
||||
|
||||
if !aggreg.IsValid() {
|
||||
return nil, errors.New("invalid aggregate")
|
||||
}
|
||||
@@ -301,10 +307,14 @@ func (r *JobRepository) Archive(
|
||||
|
||||
var ErrNotFound = errors.New("no such job or user")
|
||||
|
||||
// FindJobOrUser returns a job database ID or a username if a job or user machtes the search term.
|
||||
// As 0 is a valid job id, check if username is "" instead in order to check what machted.
|
||||
// If nothing matches the search, `ErrNotFound` is returned.
|
||||
func (r *JobRepository) FindJobOrUser(ctx context.Context, searchterm string) (job int64, username string, err error) {
|
||||
// FindJobOrUser returns a job database ID or a username if a job or user
|
||||
// machtes the search term. As 0 is a valid job id, check if username is ""
|
||||
// instead in order to check what matched. If nothing matches the search,
|
||||
// `ErrNotFound` is returned.
|
||||
func (r *JobRepository) FindJobOrUser(
|
||||
ctx context.Context,
|
||||
searchterm string) (job int64, username string, err error) {
|
||||
|
||||
user := auth.GetUser(ctx)
|
||||
if id, err := strconv.Atoi(searchterm); err == nil {
|
||||
qb := sq.Select("job.id").From("job").Where("job.job_id = ?", id)
|
||||
@@ -353,6 +363,7 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) {
|
||||
// AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host.
|
||||
// Hosts with zero jobs running on them will not show up!
|
||||
func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) {
|
||||
|
||||
subclusters := make(map[string]map[string]int)
|
||||
rows, err := sq.Select("resources", "subcluster").From("job").
|
||||
Where("job.job_state = 'running'").
|
||||
@@ -390,6 +401,7 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
|
||||
}
|
||||
|
||||
func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
|
||||
|
||||
res, err := sq.Update("job").
|
||||
Set("monitoring_status", schema.MonitoringStatusArchivingFailed).
|
||||
Set("duration", 0).
|
||||
|
Reference in New Issue
Block a user