mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-01-13 21:19:06 +01:00
Fix bugs and failed testcases
This commit is contained in:
parent
1b70596735
commit
80c46bea7f
@ -14,6 +14,7 @@ import (
|
|||||||
|
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/util"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
@ -86,13 +87,22 @@ func HandleImportFlag(flag string) error {
|
|||||||
StartTimeUnix: jobMeta.StartTime,
|
StartTimeUnix: jobMeta.StartTime,
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Do loop for new sub structure for stats
|
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
|
||||||
// job.LoadAvg = loadJobStat(&jobMeta, "cpu_load")
|
if err != nil {
|
||||||
// job.FlopsAnyAvg = loadJobStat(&jobMeta, "flops_any")
|
log.Errorf("cannot get subcluster: %s", err.Error())
|
||||||
// job.MemUsedMax = loadJobStat(&jobMeta, "mem_used")
|
return err
|
||||||
// job.MemBwAvg = loadJobStat(&jobMeta, "mem_bw")
|
}
|
||||||
// job.NetBwAvg = loadJobStat(&jobMeta, "net_bw")
|
|
||||||
// job.FileBwAvg = loadJobStat(&jobMeta, "file_bw")
|
job.Footprint = make(map[string]float64)
|
||||||
|
|
||||||
|
for _, fp := range sc.Footprint {
|
||||||
|
job.Footprint[fp] = util.LoadJobStat(&jobMeta, fp)
|
||||||
|
}
|
||||||
|
job.RawFootprint, err = json.Marshal(job.Footprint)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("Error while marshaling job footprint")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
job.RawResources, err = json.Marshal(job.Resources)
|
job.RawResources, err = json.Marshal(job.Resources)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/util"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
@ -60,13 +61,22 @@ func InitDB() error {
|
|||||||
StartTimeUnix: jobMeta.StartTime,
|
StartTimeUnix: jobMeta.StartTime,
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Convert to loop for new footprint layout
|
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
|
||||||
// job.LoadAvg = loadJobStat(jobMeta, "cpu_load")
|
if err != nil {
|
||||||
// job.FlopsAnyAvg = loadJobStat(jobMeta, "flops_any")
|
log.Errorf("cannot get subcluster: %s", err.Error())
|
||||||
// job.MemUsedMax = loadJobStat(jobMeta, "mem_used")
|
return err
|
||||||
// job.MemBwAvg = loadJobStat(jobMeta, "mem_bw")
|
}
|
||||||
// job.NetBwAvg = loadJobStat(jobMeta, "net_bw")
|
job.Footprint = make(map[string]float64)
|
||||||
// job.FileBwAvg = loadJobStat(jobMeta, "file_bw")
|
|
||||||
|
for _, fp := range sc.Footprint {
|
||||||
|
job.Footprint[fp] = util.LoadJobStat(jobMeta, fp)
|
||||||
|
}
|
||||||
|
|
||||||
|
job.RawFootprint, err = json.Marshal(job.Footprint)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("Error while marshaling job footprint")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
job.RawResources, err = json.Marshal(job.Resources)
|
job.RawResources, err = json.Marshal(job.Resources)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -150,18 +160,6 @@ func SanityChecks(job *schema.BaseJob) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadJobStat(job *schema.JobMeta, metric string) float64 {
|
|
||||||
if stats, ok := job.Statistics[metric]; ok {
|
|
||||||
if metric == "mem_used" {
|
|
||||||
return stats.Max
|
|
||||||
} else {
|
|
||||||
return stats.Avg
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0.0
|
|
||||||
}
|
|
||||||
|
|
||||||
func checkJobData(d *schema.JobData) error {
|
func checkJobData(d *schema.JobData) error {
|
||||||
for _, scopes := range *d {
|
for _, scopes := range *d {
|
||||||
// var newUnit schema.Unit
|
// var newUnit schema.Unit
|
||||||
|
@ -8,6 +8,7 @@
|
|||||||
},
|
},
|
||||||
"scope": "node",
|
"scope": "node",
|
||||||
"aggregation": "avg",
|
"aggregation": "avg",
|
||||||
|
"footprint": true,
|
||||||
"timestep": 60,
|
"timestep": 60,
|
||||||
"peak": 72,
|
"peak": 72,
|
||||||
"normal": 72,
|
"normal": 72,
|
||||||
@ -35,6 +36,7 @@
|
|||||||
},
|
},
|
||||||
"scope": "node",
|
"scope": "node",
|
||||||
"aggregation": "sum",
|
"aggregation": "sum",
|
||||||
|
"footprint": true,
|
||||||
"timestep": 60,
|
"timestep": 60,
|
||||||
"peak": 256,
|
"peak": 256,
|
||||||
"normal": 128,
|
"normal": 128,
|
||||||
@ -49,6 +51,7 @@
|
|||||||
},
|
},
|
||||||
"scope": "hwthread",
|
"scope": "hwthread",
|
||||||
"aggregation": "sum",
|
"aggregation": "sum",
|
||||||
|
"footprint": true,
|
||||||
"timestep": 60,
|
"timestep": 60,
|
||||||
"peak": 5600,
|
"peak": 5600,
|
||||||
"normal": 1000,
|
"normal": 1000,
|
||||||
@ -91,6 +94,7 @@
|
|||||||
},
|
},
|
||||||
"scope": "socket",
|
"scope": "socket",
|
||||||
"aggregation": "sum",
|
"aggregation": "sum",
|
||||||
|
"footprint": true,
|
||||||
"timestep": 60,
|
"timestep": 60,
|
||||||
"peak": 350,
|
"peak": 350,
|
||||||
"normal": 100,
|
"normal": 100,
|
||||||
|
@ -16,6 +16,7 @@ import (
|
|||||||
|
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/util"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
|
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
|
||||||
@ -64,6 +65,7 @@ var jobColumns []string = []string{
|
|||||||
|
|
||||||
func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
|
func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
|
||||||
job := &schema.Job{}
|
job := &schema.Job{}
|
||||||
|
|
||||||
if err := row.Scan(
|
if err := row.Scan(
|
||||||
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId,
|
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId,
|
||||||
&job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
|
&job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
|
||||||
@ -79,7 +81,7 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
|
|||||||
job.RawResources = nil
|
job.RawResources = nil
|
||||||
|
|
||||||
if err := json.Unmarshal(job.RawFootprint, &job.Footprint); err != nil {
|
if err := json.Unmarshal(job.RawFootprint, &job.Footprint); err != nil {
|
||||||
log.Warn("Error while unmarshaling raw footprint json")
|
log.Warnf("Error while unmarshaling raw footprint json: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
job.RawFootprint = nil
|
job.RawFootprint = nil
|
||||||
@ -242,6 +244,7 @@ func (r *JobRepository) Find(
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("Timer Find %s", time.Since(start))
|
log.Debugf("Timer Find %s", time.Since(start))
|
||||||
|
|
||||||
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -397,6 +400,11 @@ func (r *JobRepository) FindConcurrentJobs(
|
|||||||
// Start inserts a new job in the table, returning the unique job ID.
|
// Start inserts a new job in the table, returning the unique job ID.
|
||||||
// Statistics are not transfered!
|
// Statistics are not transfered!
|
||||||
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
|
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
|
||||||
|
job.RawFootprint, err = json.Marshal(job.Footprint)
|
||||||
|
if err != nil {
|
||||||
|
return -1, fmt.Errorf("REPOSITORY/JOB > encoding footprint field failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
job.RawResources, err = json.Marshal(job.Resources)
|
job.RawResources, err = json.Marshal(job.Resources)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return -1, fmt.Errorf("REPOSITORY/JOB > encoding resources field failed: %w", err)
|
return -1, fmt.Errorf("REPOSITORY/JOB > encoding resources field failed: %w", err)
|
||||||
@ -409,10 +417,10 @@ func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
|
|||||||
|
|
||||||
res, err := r.DB.NamedExec(`INSERT INTO job (
|
res, err := r.DB.NamedExec(`INSERT INTO job (
|
||||||
job_id, user, project, cluster, subcluster, `+"`partition`"+`, array_job_id, num_nodes, num_hwthreads, num_acc,
|
job_id, user, project, cluster, subcluster, `+"`partition`"+`, array_job_id, num_nodes, num_hwthreads, num_acc,
|
||||||
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data
|
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, resources, meta_data
|
||||||
) VALUES (
|
) VALUES (
|
||||||
:job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
|
:job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
|
||||||
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data
|
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :resources, :meta_data
|
||||||
);`, job)
|
);`, job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return -1, err
|
return -1, err
|
||||||
@ -478,35 +486,33 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
|
|||||||
|
|
||||||
// Stop updates the job with the database id jobId using the provided arguments.
|
// Stop updates the job with the database id jobId using the provided arguments.
|
||||||
func (r *JobRepository) MarkArchived(
|
func (r *JobRepository) MarkArchived(
|
||||||
jobId int64,
|
jobMeta *schema.JobMeta,
|
||||||
monitoringStatus int32,
|
monitoringStatus int32,
|
||||||
metricStats map[string]schema.JobStatistics,
|
|
||||||
) error {
|
) error {
|
||||||
stmt := sq.Update("job").
|
stmt := sq.Update("job").
|
||||||
Set("monitoring_status", monitoringStatus).
|
Set("monitoring_status", monitoringStatus).
|
||||||
Where("job.id = ?", jobId)
|
Where("job.id = ?", jobMeta.JobID)
|
||||||
|
|
||||||
for metric, stats := range metricStats {
|
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
|
||||||
switch metric {
|
if err != nil {
|
||||||
case "flops_any":
|
log.Errorf("cannot get subcluster: %s", err.Error())
|
||||||
stmt = stmt.Set("flops_any_avg", stats.Avg)
|
return err
|
||||||
case "mem_used":
|
|
||||||
stmt = stmt.Set("mem_used_max", stats.Max)
|
|
||||||
case "mem_bw":
|
|
||||||
stmt = stmt.Set("mem_bw_avg", stats.Avg)
|
|
||||||
case "load":
|
|
||||||
stmt = stmt.Set("load_avg", stats.Avg)
|
|
||||||
case "cpu_load":
|
|
||||||
stmt = stmt.Set("load_avg", stats.Avg)
|
|
||||||
case "net_bw":
|
|
||||||
stmt = stmt.Set("net_bw_avg", stats.Avg)
|
|
||||||
case "file_bw":
|
|
||||||
stmt = stmt.Set("file_bw_avg", stats.Avg)
|
|
||||||
default:
|
|
||||||
log.Debugf("MarkArchived() Metric '%v' unknown", metric)
|
|
||||||
}
|
}
|
||||||
|
footprint := make(map[string]float64)
|
||||||
|
|
||||||
|
for _, fp := range sc.Footprint {
|
||||||
|
footprint[fp] = util.LoadJobStat(jobMeta, fp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var rawFootprint []byte
|
||||||
|
|
||||||
|
if rawFootprint, err = json.Marshal(footprint); err != nil {
|
||||||
|
log.Warnf("Error while marshaling footprint for job, DB ID '%v'", jobMeta.ID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt = stmt.Set("footprint", rawFootprint)
|
||||||
|
|
||||||
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
|
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
|
||||||
log.Warn("Error while marking job as archived")
|
log.Warn("Error while marking job as archived")
|
||||||
return err
|
return err
|
||||||
@ -541,7 +547,7 @@ func (r *JobRepository) archivingWorker() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Update the jobs database entry one last time:
|
// Update the jobs database entry one last time:
|
||||||
if err := r.MarkArchived(job.ID, schema.MonitoringStatusArchivingSuccessful, jobMeta.Statistics); err != nil {
|
if err := r.MarkArchived(jobMeta, schema.MonitoringStatusArchivingSuccessful); err != nil {
|
||||||
log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error())
|
log.Errorf("archiving job (dbid: %d) failed at marking archived step: %s", job.ID, err.Error())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -786,12 +792,10 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
|
|||||||
|
|
||||||
const NamedJobInsert string = `INSERT INTO job (
|
const NamedJobInsert string = `INSERT INTO job (
|
||||||
job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
|
job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
|
||||||
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data,
|
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, resources, meta_data
|
||||||
mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total
|
|
||||||
) VALUES (
|
) VALUES (
|
||||||
:job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
|
:job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
|
||||||
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data,
|
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :resources, :meta_data
|
||||||
:mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total
|
|
||||||
);`
|
);`
|
||||||
|
|
||||||
func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) {
|
func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) {
|
||||||
|
@ -4,7 +4,10 @@
|
|||||||
// license that can be found in the LICENSE file.
|
// license that can be found in the LICENSE file.
|
||||||
package util
|
package util
|
||||||
|
|
||||||
import "golang.org/x/exp/constraints"
|
import (
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
|
"golang.org/x/exp/constraints"
|
||||||
|
)
|
||||||
|
|
||||||
func Min[T constraints.Ordered](a, b T) T {
|
func Min[T constraints.Ordered](a, b T) T {
|
||||||
if a < b {
|
if a < b {
|
||||||
@ -19,3 +22,15 @@ func Max[T constraints.Ordered](a, b T) T {
|
|||||||
}
|
}
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func LoadJobStat(job *schema.JobMeta, metric string) float64 {
|
||||||
|
if stats, ok := job.Statistics[metric]; ok {
|
||||||
|
if metric == "mem_used" {
|
||||||
|
return stats.Max
|
||||||
|
} else {
|
||||||
|
return stats.Avg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0.0
|
||||||
|
}
|
||||||
|
@ -51,7 +51,7 @@ func TestInit(t *testing.T) {
|
|||||||
if version != 1 {
|
if version != 1 {
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
if len(fsa.clusters) != 3 || fsa.clusters[0] != "emmy" {
|
if len(fsa.clusters) != 3 || fsa.clusters[1] != "emmy" {
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -16,8 +16,6 @@ import (
|
|||||||
// Common subset of Job and JobMeta. Use one of those, not this type directly.
|
// Common subset of Job and JobMeta. Use one of those, not this type directly.
|
||||||
|
|
||||||
type BaseJob struct {
|
type BaseJob struct {
|
||||||
Footprint map[string]float64 `json:"footPrint"`
|
|
||||||
MetaData map[string]string `json:"metaData"`
|
|
||||||
Cluster string `json:"cluster" db:"cluster" example:"fritz"`
|
Cluster string `json:"cluster" db:"cluster" example:"fritz"`
|
||||||
SubCluster string `json:"subCluster" db:"subcluster" example:"main"`
|
SubCluster string `json:"subCluster" db:"subcluster" example:"main"`
|
||||||
Partition string `json:"partition,omitempty" db:"partition" example:"main"`
|
Partition string `json:"partition,omitempty" db:"partition" example:"main"`
|
||||||
@ -27,8 +25,10 @@ type BaseJob struct {
|
|||||||
Tags []*Tag `json:"tags,omitempty"`
|
Tags []*Tag `json:"tags,omitempty"`
|
||||||
RawFootprint []byte `json:"-" db:"footprint"`
|
RawFootprint []byte `json:"-" db:"footprint"`
|
||||||
RawMetaData []byte `json:"-" db:"meta_data"`
|
RawMetaData []byte `json:"-" db:"meta_data"`
|
||||||
Resources []*Resource `json:"resources"`
|
|
||||||
RawResources []byte `json:"-" db:"resources"`
|
RawResources []byte `json:"-" db:"resources"`
|
||||||
|
Resources []*Resource `json:"resources"`
|
||||||
|
Footprint map[string]float64 `json:"footPrint"`
|
||||||
|
MetaData map[string]string `json:"metaData"`
|
||||||
ConcurrentJobs JobLinkResultList `json:"concurrentJobs"`
|
ConcurrentJobs JobLinkResultList `json:"concurrentJobs"`
|
||||||
Energy float64 `json:"energy"`
|
Energy float64 `json:"energy"`
|
||||||
ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"`
|
ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"`
|
||||||
|
Loading…
Reference in New Issue
Block a user