mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-22 07:47:29 +01:00
Revert hpc_cluster to cluster. Refactor.
This commit is contained in:
2
go.mod
2
go.mod
@@ -6,7 +6,7 @@ toolchain go1.24.1
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/99designs/gqlgen v0.17.78
|
github.com/99designs/gqlgen v0.17.78
|
||||||
github.com/ClusterCockpit/cc-lib v0.10.0
|
github.com/ClusterCockpit/cc-lib v0.10.1
|
||||||
github.com/Masterminds/squirrel v1.5.4
|
github.com/Masterminds/squirrel v1.5.4
|
||||||
github.com/coreos/go-oidc/v3 v3.12.0
|
github.com/coreos/go-oidc/v3 v3.12.0
|
||||||
github.com/expr-lang/expr v1.17.6
|
github.com/expr-lang/expr v1.17.6
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -6,8 +6,8 @@ github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25
|
|||||||
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
|
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
|
||||||
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8=
|
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8=
|
||||||
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
|
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
|
||||||
github.com/ClusterCockpit/cc-lib v0.10.0 h1:Pa8mqVciCOipzXTO18ZL8vwMi2JJh/ZjQbCWXZl2R78=
|
github.com/ClusterCockpit/cc-lib v0.10.1 h1:tjGEH8mFGgznYxO8BKLiiar0eZR1Oytk8x5iIQHZR5s=
|
||||||
github.com/ClusterCockpit/cc-lib v0.10.0/go.mod h1:nvTZuxFCTwlos8I1rL5O1RPab7vRtkU8E/PGiaF6pQA=
|
github.com/ClusterCockpit/cc-lib v0.10.1/go.mod h1:nvTZuxFCTwlos8I1rL5O1RPab7vRtkU8E/PGiaF6pQA=
|
||||||
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
|
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
|
||||||
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
|
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
|
||||||
github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM=
|
github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM=
|
||||||
|
|||||||
@@ -124,19 +124,19 @@ func setup(t *testing.T) *api.RestApi {
|
|||||||
cclog.Init("info", true)
|
cclog.Init("info", true)
|
||||||
tmpdir := t.TempDir()
|
tmpdir := t.TempDir()
|
||||||
jobarchive := filepath.Join(tmpdir, "job-archive")
|
jobarchive := filepath.Join(tmpdir, "job-archive")
|
||||||
if err := os.Mkdir(jobarchive, 0777); err != nil {
|
if err := os.Mkdir(jobarchive, 0o777); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), fmt.Appendf(nil, "%d", 2), 0666); err != nil {
|
if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), fmt.Appendf(nil, "%d", 2), 0o666); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.Mkdir(filepath.Join(jobarchive, "testcluster"), 0777); err != nil {
|
if err := os.Mkdir(filepath.Join(jobarchive, "testcluster"), 0o777); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.WriteFile(filepath.Join(jobarchive, "testcluster", "cluster.json"), []byte(testclusterJson), 0666); err != nil {
|
if err := os.WriteFile(filepath.Join(jobarchive, "testcluster", "cluster.json"), []byte(testclusterJson), 0o666); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -147,7 +147,7 @@ func setup(t *testing.T) *api.RestApi {
|
|||||||
}
|
}
|
||||||
|
|
||||||
cfgFilePath := filepath.Join(tmpdir, "config.json")
|
cfgFilePath := filepath.Join(tmpdir, "config.json")
|
||||||
if err := os.WriteFile(cfgFilePath, []byte(testconfig), 0666); err != nil {
|
if err := os.WriteFile(cfgFilePath, []byte(testconfig), 0o666); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -293,7 +293,7 @@ func TestRestApi(t *testing.T) {
|
|||||||
job.SubCluster != "sc1" ||
|
job.SubCluster != "sc1" ||
|
||||||
job.Partition != "default" ||
|
job.Partition != "default" ||
|
||||||
job.Walltime != 3600 ||
|
job.Walltime != 3600 ||
|
||||||
job.ArrayJobId != 0 ||
|
job.ArrayJobID != 0 ||
|
||||||
job.NumNodes != 1 ||
|
job.NumNodes != 1 ||
|
||||||
job.NumHWThreads != 8 ||
|
job.NumHWThreads != 8 ||
|
||||||
job.NumAcc != 0 ||
|
job.NumAcc != 0 ||
|
||||||
|
|||||||
@@ -112,7 +112,7 @@ type ComplexityRoot struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Job struct {
|
Job struct {
|
||||||
ArrayJobId func(childComplexity int) int
|
ArrayJobID func(childComplexity int) int
|
||||||
Cluster func(childComplexity int) int
|
Cluster func(childComplexity int) int
|
||||||
ConcurrentJobs func(childComplexity int) int
|
ConcurrentJobs func(childComplexity int) int
|
||||||
Duration func(childComplexity int) int
|
Duration func(childComplexity int) int
|
||||||
@@ -690,11 +690,11 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin
|
|||||||
return e.complexity.IntRangeOutput.To(childComplexity), true
|
return e.complexity.IntRangeOutput.To(childComplexity), true
|
||||||
|
|
||||||
case "Job.arrayJobId":
|
case "Job.arrayJobId":
|
||||||
if e.complexity.Job.ArrayJobId == nil {
|
if e.complexity.Job.ArrayJobID == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
return e.complexity.Job.ArrayJobId(childComplexity), true
|
return e.complexity.Job.ArrayJobID(childComplexity), true
|
||||||
|
|
||||||
case "Job.cluster":
|
case "Job.cluster":
|
||||||
if e.complexity.Job.Cluster == nil {
|
if e.complexity.Job.Cluster == nil {
|
||||||
@@ -5369,7 +5369,7 @@ func (ec *executionContext) _Job_arrayJobId(ctx context.Context, field graphql.C
|
|||||||
}()
|
}()
|
||||||
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) {
|
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) {
|
||||||
ctx = rctx // use context from middleware stack in children
|
ctx = rctx // use context from middleware stack in children
|
||||||
return obj.ArrayJobId, nil
|
return obj.ArrayJobID, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ec.Error(ctx, err)
|
ec.Error(ctx, err)
|
||||||
|
|||||||
@@ -820,12 +820,10 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
|
|||||||
// SubCluster returns generated.SubClusterResolver implementation.
|
// SubCluster returns generated.SubClusterResolver implementation.
|
||||||
func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} }
|
func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} }
|
||||||
|
|
||||||
type (
|
type clusterResolver struct{ *Resolver }
|
||||||
clusterResolver struct{ *Resolver }
|
type jobResolver struct{ *Resolver }
|
||||||
jobResolver struct{ *Resolver }
|
type metricValueResolver struct{ *Resolver }
|
||||||
metricValueResolver struct{ *Resolver }
|
type mutationResolver struct{ *Resolver }
|
||||||
mutationResolver struct{ *Resolver }
|
type nodeResolver struct{ *Resolver }
|
||||||
nodeResolver struct{ *Resolver }
|
type queryResolver struct{ *Resolver }
|
||||||
queryResolver struct{ *Resolver }
|
type subClusterResolver struct{ *Resolver }
|
||||||
subClusterResolver struct{ *Resolver }
|
|
||||||
)
|
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ func GetJobRepository() *JobRepository {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var jobColumns []string = []string{
|
var jobColumns []string = []string{
|
||||||
"job.id", "job.job_id", "job.hpc_user", "job.project", "job.hpc_cluster", "job.subcluster",
|
"job.id", "job.job_id", "job.hpc_user", "job.project", "job.cluster", "job.subcluster",
|
||||||
"job.start_time", "job.cluster_partition", "job.array_job_id", "job.num_nodes",
|
"job.start_time", "job.cluster_partition", "job.array_job_id", "job.num_nodes",
|
||||||
"job.num_hwthreads", "job.num_acc", "job.shared", "job.monitoring_status",
|
"job.num_hwthreads", "job.num_acc", "job.shared", "job.monitoring_status",
|
||||||
"job.smt", "job.job_state", "job.duration", "job.walltime", "job.resources",
|
"job.smt", "job.job_state", "job.duration", "job.walltime", "job.resources",
|
||||||
@@ -60,7 +60,7 @@ var jobColumns []string = []string{
|
|||||||
}
|
}
|
||||||
|
|
||||||
var jobCacheColumns []string = []string{
|
var jobCacheColumns []string = []string{
|
||||||
"job_cache.id", "job_cache.job_id", "job_cache.hpc_user", "job_cache.project", "job_cache.hpc_cluster",
|
"job_cache.id", "job_cache.job_id", "job_cache.hpc_user", "job_cache.project", "job_cache.cluster",
|
||||||
"job_cache.subcluster", "job_cache.start_time", "job_cache.cluster_partition",
|
"job_cache.subcluster", "job_cache.start_time", "job_cache.cluster_partition",
|
||||||
"job_cache.array_job_id", "job_cache.num_nodes", "job_cache.num_hwthreads",
|
"job_cache.array_job_id", "job_cache.num_nodes", "job_cache.num_hwthreads",
|
||||||
"job_cache.num_acc", "job_cache.shared", "job_cache.monitoring_status", "job_cache.smt",
|
"job_cache.num_acc", "job_cache.shared", "job_cache.monitoring_status", "job_cache.smt",
|
||||||
@@ -73,7 +73,7 @@ func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) {
|
|||||||
|
|
||||||
if err := row.Scan(
|
if err := row.Scan(
|
||||||
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster,
|
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster,
|
||||||
&job.StartTime, &job.Partition, &job.ArrayJobId, &job.NumNodes, &job.NumHWThreads,
|
&job.StartTime, &job.Partition, &job.ArrayJobID, &job.NumNodes, &job.NumHWThreads,
|
||||||
&job.NumAcc, &job.Shared, &job.MonitoringStatus, &job.SMT, &job.State,
|
&job.NumAcc, &job.Shared, &job.MonitoringStatus, &job.SMT, &job.State,
|
||||||
&job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint, &job.Energy); err != nil {
|
&job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint, &job.Energy); err != nil {
|
||||||
cclog.Warnf("Error while scanning rows (Job): %v", err)
|
cclog.Warnf("Error while scanning rows (Job): %v", err)
|
||||||
@@ -390,7 +390,7 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) {
|
|||||||
start := time.Now()
|
start := time.Now()
|
||||||
partitions := r.cache.Get("partitions:"+cluster, func() (any, time.Duration, int) {
|
partitions := r.cache.Get("partitions:"+cluster, func() (any, time.Duration, int) {
|
||||||
parts := []string{}
|
parts := []string{}
|
||||||
if err = r.DB.Select(&parts, `SELECT DISTINCT job.cluster_partition FROM job WHERE job.hpc_cluster = ?;`, cluster); err != nil {
|
if err = r.DB.Select(&parts, `SELECT DISTINCT job.cluster_partition FROM job WHERE job.cluster = ?;`, cluster); err != nil {
|
||||||
return nil, 0, 1000
|
return nil, 0, 1000
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -410,7 +410,7 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
|
|||||||
subclusters := make(map[string]map[string]int)
|
subclusters := make(map[string]map[string]int)
|
||||||
rows, err := sq.Select("resources", "subcluster").From("job").
|
rows, err := sq.Select("resources", "subcluster").From("job").
|
||||||
Where("job.job_state = 'running'").
|
Where("job.job_state = 'running'").
|
||||||
Where("job.hpc_cluster = ?", cluster).
|
Where("job.cluster = ?", cluster).
|
||||||
RunWith(r.stmtCache).Query()
|
RunWith(r.stmtCache).Query()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Error("Error while running query")
|
cclog.Error("Error while running query")
|
||||||
@@ -505,7 +505,7 @@ func (r *JobRepository) FindJobIdsByTag(tagId int64) ([]int64, error) {
|
|||||||
// FIXME: Reconsider filtering short jobs with harcoded threshold
|
// FIXME: Reconsider filtering short jobs with harcoded threshold
|
||||||
func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) {
|
func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) {
|
||||||
query := sq.Select(jobColumns...).From("job").
|
query := sq.Select(jobColumns...).From("job").
|
||||||
Where(fmt.Sprintf("job.hpc_cluster = '%s'", cluster)).
|
Where(fmt.Sprintf("job.cluster = '%s'", cluster)).
|
||||||
Where("job.job_state = 'running'").
|
Where("job.job_state = 'running'").
|
||||||
Where("job.duration > 600")
|
Where("job.duration > 600")
|
||||||
|
|
||||||
@@ -587,7 +587,7 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
|
|||||||
Where("job.id = ?", job)
|
Where("job.id = ?", job)
|
||||||
|
|
||||||
_, err = stmt.RunWith(r.stmtCache).Exec()
|
_, err = stmt.RunWith(r.stmtCache).Exec()
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *JobRepository) Execute(stmt sq.UpdateBuilder) error {
|
func (r *JobRepository) Execute(stmt sq.UpdateBuilder) error {
|
||||||
|
|||||||
@@ -14,18 +14,18 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const NamedJobCacheInsert string = `INSERT INTO job_cache (
|
const NamedJobCacheInsert string = `INSERT INTO job_cache (
|
||||||
job_id, hpc_user, project, hpc_cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc,
|
job_id, hpc_user, project, cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc,
|
||||||
shared, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data
|
shared, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data
|
||||||
) VALUES (
|
) VALUES (
|
||||||
:job_id, :hpc_user, :project, :hpc_cluster, :subcluster, :cluster_partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
|
:job_id, :hpc_user, :project, :cluster, :subcluster, :cluster_partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
|
||||||
:shared, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
|
:shared, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
|
||||||
);`
|
);`
|
||||||
|
|
||||||
const NamedJobInsert string = `INSERT INTO job (
|
const NamedJobInsert string = `INSERT INTO job (
|
||||||
job_id, hpc_user, project, hpc_cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc,
|
job_id, hpc_user, project, cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc,
|
||||||
shared, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data
|
shared, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data
|
||||||
) VALUES (
|
) VALUES (
|
||||||
:job_id, :hpc_user, :project, :hpc_cluster, :subcluster, :cluster_partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
|
:job_id, :hpc_user, :project, :cluster, :subcluster, :cluster_partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
|
||||||
:shared, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
|
:shared, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
|
||||||
);`
|
);`
|
||||||
|
|
||||||
@@ -70,7 +70,7 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
_, err = r.DB.Exec(
|
_, err = r.DB.Exec(
|
||||||
"INSERT INTO job (job_id, hpc_cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, hpc_cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache")
|
"INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Warnf("Error while Job sync: %v", err)
|
cclog.Warnf("Error while Job sync: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -120,7 +120,7 @@ func (r *JobRepository) Stop(
|
|||||||
Where("job.id = ?", jobId)
|
Where("job.id = ?", jobId)
|
||||||
|
|
||||||
_, err = stmt.RunWith(r.stmtCache).Exec()
|
_, err = stmt.RunWith(r.stmtCache).Exec()
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *JobRepository) StopCached(
|
func (r *JobRepository) StopCached(
|
||||||
@@ -136,5 +136,5 @@ func (r *JobRepository) StopCached(
|
|||||||
Where("job.id = ?", jobId)
|
Where("job.id = ?", jobId)
|
||||||
|
|
||||||
_, err = stmt.RunWith(r.stmtCache).Exec()
|
_, err = stmt.RunWith(r.stmtCache).Exec()
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ func (r *JobRepository) Find(
|
|||||||
Where("job.job_id = ?", *jobId)
|
Where("job.job_id = ?", *jobId)
|
||||||
|
|
||||||
if cluster != nil {
|
if cluster != nil {
|
||||||
q = q.Where("job.hpc_cluster = ?", *cluster)
|
q = q.Where("job.cluster = ?", *cluster)
|
||||||
}
|
}
|
||||||
if startTime != nil {
|
if startTime != nil {
|
||||||
q = q.Where("job.start_time = ?", *startTime)
|
q = q.Where("job.start_time = ?", *startTime)
|
||||||
@@ -52,7 +52,7 @@ func (r *JobRepository) FindCached(
|
|||||||
Where("job_cache.job_id = ?", *jobId)
|
Where("job_cache.job_id = ?", *jobId)
|
||||||
|
|
||||||
if cluster != nil {
|
if cluster != nil {
|
||||||
q = q.Where("job_cache.hpc_cluster = ?", *cluster)
|
q = q.Where("job_cache.cluster = ?", *cluster)
|
||||||
}
|
}
|
||||||
if startTime != nil {
|
if startTime != nil {
|
||||||
q = q.Where("job_cache.start_time = ?", *startTime)
|
q = q.Where("job_cache.start_time = ?", *startTime)
|
||||||
@@ -78,7 +78,7 @@ func (r *JobRepository) FindAll(
|
|||||||
Where("job.job_id = ?", *jobId)
|
Where("job.job_id = ?", *jobId)
|
||||||
|
|
||||||
if cluster != nil {
|
if cluster != nil {
|
||||||
q = q.Where("job.hpc_cluster = ?", *cluster)
|
q = q.Where("job.cluster = ?", *cluster)
|
||||||
}
|
}
|
||||||
if startTime != nil {
|
if startTime != nil {
|
||||||
q = q.Where("job.start_time = ?", *startTime)
|
q = q.Where("job.start_time = ?", *startTime)
|
||||||
@@ -183,7 +183,7 @@ func (r *JobRepository) FindByJobId(ctx context.Context, jobId int64, startTime
|
|||||||
q := sq.Select(jobColumns...).
|
q := sq.Select(jobColumns...).
|
||||||
From("job").
|
From("job").
|
||||||
Where("job.job_id = ?", jobId).
|
Where("job.job_id = ?", jobId).
|
||||||
Where("job.hpc_cluster = ?", cluster).
|
Where("job.cluster = ?", cluster).
|
||||||
Where("job.start_time = ?", startTime)
|
Where("job.start_time = ?", startTime)
|
||||||
|
|
||||||
q, qerr := SecurityCheck(ctx, q)
|
q, qerr := SecurityCheck(ctx, q)
|
||||||
@@ -203,7 +203,7 @@ func (r *JobRepository) IsJobOwner(jobId int64, startTime int64, user string, cl
|
|||||||
From("job").
|
From("job").
|
||||||
Where("job.job_id = ?", jobId).
|
Where("job.job_id = ?", jobId).
|
||||||
Where("job.hpc_user = ?", user).
|
Where("job.hpc_user = ?", user).
|
||||||
Where("job.hpc_cluster = ?", cluster).
|
Where("job.cluster = ?", cluster).
|
||||||
Where("job.start_time = ?", startTime)
|
Where("job.start_time = ?", startTime)
|
||||||
|
|
||||||
_, err := scanJob(q.RunWith(r.stmtCache).QueryRow())
|
_, err := scanJob(q.RunWith(r.stmtCache).QueryRow())
|
||||||
|
|||||||
@@ -168,7 +168,7 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
|
|||||||
query = buildMetaJsonCondition("jobName", filter.JobName, query)
|
query = buildMetaJsonCondition("jobName", filter.JobName, query)
|
||||||
}
|
}
|
||||||
if filter.Cluster != nil {
|
if filter.Cluster != nil {
|
||||||
query = buildStringCondition("job.hpc_cluster", filter.Cluster, query)
|
query = buildStringCondition("job.cluster", filter.Cluster, query)
|
||||||
}
|
}
|
||||||
if filter.Partition != nil {
|
if filter.Partition != nil {
|
||||||
query = buildStringCondition("job.cluster_partition", filter.Partition, query)
|
query = buildStringCondition("job.cluster_partition", filter.Partition, query)
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
CREATE TABLE "job_cache" (
|
CREATE TABLE "job_cache" (
|
||||||
id INTEGER PRIMARY KEY,
|
id INTEGER PRIMARY KEY,
|
||||||
job_id BIGINT NOT NULL,
|
job_id BIGINT NOT NULL,
|
||||||
hpc_cluster VARCHAR(255) NOT NULL,
|
cluster VARCHAR(255) NOT NULL,
|
||||||
subcluster VARCHAR(255) NOT NULL,
|
subcluster VARCHAR(255) NOT NULL,
|
||||||
submit_time BIGINT NOT NULL DEFAULT 0, -- Unix timestamp
|
submit_time BIGINT NOT NULL DEFAULT 0, -- Unix timestamp
|
||||||
start_time BIGINT NOT NULL DEFAULT 0, -- Unix timestamp
|
start_time BIGINT NOT NULL DEFAULT 0, -- Unix timestamp
|
||||||
@@ -30,13 +30,13 @@ CREATE TABLE "job_cache" (
|
|||||||
energy REAL NOT NULL DEFAULT 0.0,
|
energy REAL NOT NULL DEFAULT 0.0,
|
||||||
energy_footprint TEXT DEFAULT NULL,
|
energy_footprint TEXT DEFAULT NULL,
|
||||||
footprint TEXT DEFAULT NULL,
|
footprint TEXT DEFAULT NULL,
|
||||||
UNIQUE (job_id, hpc_cluster, start_time)
|
UNIQUE (job_id, cluster, start_time)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE "job_new" (
|
CREATE TABLE "job_new" (
|
||||||
id INTEGER PRIMARY KEY,
|
id INTEGER PRIMARY KEY,
|
||||||
job_id BIGINT NOT NULL,
|
job_id BIGINT NOT NULL,
|
||||||
hpc_cluster TEXT NOT NULL,
|
cluster TEXT NOT NULL,
|
||||||
subcluster TEXT NOT NULL,
|
subcluster TEXT NOT NULL,
|
||||||
submit_time BIGINT NOT NULL DEFAULT 0, -- Unix timestamp
|
submit_time BIGINT NOT NULL DEFAULT 0, -- Unix timestamp
|
||||||
start_time BIGINT NOT NULL DEFAULT 0, -- Unix timestamp
|
start_time BIGINT NOT NULL DEFAULT 0, -- Unix timestamp
|
||||||
@@ -65,10 +65,9 @@ CREATE TABLE "job_new" (
|
|||||||
energy REAL NOT NULL DEFAULT 0.0,
|
energy REAL NOT NULL DEFAULT 0.0,
|
||||||
energy_footprint TEXT DEFAULT NULL,
|
energy_footprint TEXT DEFAULT NULL,
|
||||||
footprint TEXT DEFAULT NULL,
|
footprint TEXT DEFAULT NULL,
|
||||||
UNIQUE (job_id, hpc_cluster, start_time)
|
UNIQUE (job_id, cluster, start_time)
|
||||||
);
|
);
|
||||||
|
|
||||||
ALTER TABLE job RENAME COLUMN cluster TO hpc_cluster;
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS lookup_exclusive (
|
CREATE TABLE IF NOT EXISTS lookup_exclusive (
|
||||||
id INTEGER PRIMARY KEY,
|
id INTEGER PRIMARY KEY,
|
||||||
@@ -76,20 +75,43 @@ CREATE TABLE IF NOT EXISTS lookup_exclusive (
|
|||||||
);
|
);
|
||||||
|
|
||||||
INSERT INTO lookup_exclusive (id, name) VALUES
|
INSERT INTO lookup_exclusive (id, name) VALUES
|
||||||
(0, 'multi_user'),
|
(0, 'multi_user'),
|
||||||
(1, 'none'),
|
(1, 'none'),
|
||||||
(2, 'single_user');
|
(2, 'single_user');
|
||||||
|
|
||||||
INSERT INTO job_new (
|
INSERT INTO job_new (
|
||||||
id, job_id, hpc_cluster, subcluster, submit_time, start_time, hpc_user, project,
|
id, job_id, cluster, subcluster, submit_time, start_time, hpc_user, project,
|
||||||
cluster_partition, array_job_id, duration, walltime, job_state, meta_data, resources,
|
cluster_partition, array_job_id, duration, walltime, job_state, meta_data, resources,
|
||||||
num_nodes, num_hwthreads, num_acc, smt, shared, monitoring_status, energy,
|
num_nodes, num_hwthreads, num_acc, smt, shared, monitoring_status, energy,
|
||||||
energy_footprint, footprint
|
energy_footprint, footprint
|
||||||
) SELECT
|
) SELECT
|
||||||
id, job_id, hpc_cluster, subcluster, 0, start_time, hpc_user, project,
|
id,
|
||||||
cluster_partition, array_job_id, duration, walltime, job_state, meta_data, resources,
|
job_id,
|
||||||
num_nodes, num_hwthreads, num_acc, smt, (SELECT name FROM lookup_exclusive WHERE id=job.exclusive), monitoring_status, energy,
|
cluster,
|
||||||
energy_footprint, footprint
|
subcluster,
|
||||||
|
0,
|
||||||
|
start_time,
|
||||||
|
hpc_user,
|
||||||
|
project,
|
||||||
|
cluster_partition,
|
||||||
|
array_job_id,
|
||||||
|
duration,
|
||||||
|
walltime,
|
||||||
|
job_state,
|
||||||
|
meta_data,
|
||||||
|
resources,
|
||||||
|
num_nodes,
|
||||||
|
num_hwthreads,
|
||||||
|
num_acc,
|
||||||
|
smt,
|
||||||
|
(
|
||||||
|
SELECT name FROM lookup_exclusive
|
||||||
|
WHERE id = job.exclusive
|
||||||
|
),
|
||||||
|
monitoring_status,
|
||||||
|
energy,
|
||||||
|
energy_footprint,
|
||||||
|
footprint
|
||||||
FROM job;
|
FROM job;
|
||||||
|
|
||||||
DROP TABLE lookup_exclusive;
|
DROP TABLE lookup_exclusive;
|
||||||
|
|||||||
@@ -27,104 +27,23 @@ CREATE TABLE "node_state" (
|
|||||||
FOREIGN KEY (node_id) REFERENCES node (id)
|
FOREIGN KEY (node_id) REFERENCES node (id)
|
||||||
);
|
);
|
||||||
|
|
||||||
-- DROP indices using old column name "cluster"
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_user;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_project;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_subcluster;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_starttime;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_duration;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_numnodes;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_numhwthreads;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_numacc;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_energy;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_partition;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_partition_starttime;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_partition_duration;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_partition_numnodes;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_partition_numhwthreads;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_partition_numacc;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_partition_energy;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_user;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_project;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_starttime;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_duration;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_numnodes;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_numhwthreads;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_numacc;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_energy;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_jobstate;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_jobstate_user;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_jobstate_project;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_jobstate_starttime;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_jobstate_duration;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_jobstate_numnodes;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_jobstate_numhwthreads;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_jobstate_numacc;
|
|
||||||
DROP INDEX IF EXISTS jobs_cluster_jobstate_energy;
|
|
||||||
|
|
||||||
-- -- CREATE UPDATED indices with new column names
|
|
||||||
-- Cluster Filter
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster ON job (hpc_cluster);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_user ON job (hpc_cluster, hpc_user);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_project ON job (hpc_cluster, project);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_subcluster ON job (hpc_cluster, subcluster);
|
|
||||||
-- Cluster Filter Sorting
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_starttime ON job (hpc_cluster, start_time);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_duration ON job (hpc_cluster, duration);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_numnodes ON job (hpc_cluster, num_nodes);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_numhwthreads ON job (hpc_cluster, num_hwthreads);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_numacc ON job (hpc_cluster, num_acc);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_energy ON job (hpc_cluster, energy);
|
|
||||||
-- Cluster+Partition Filter
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition ON job (hpc_cluster, cluster_partition);
|
|
||||||
-- Cluster+Partition Filter Sorting
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime ON job (hpc_cluster, cluster_partition, start_time);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_duration ON job (hpc_cluster, cluster_partition, duration);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numnodes ON job (hpc_cluster, cluster_partition, num_nodes);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numhwthreads ON job (hpc_cluster, cluster_partition, num_hwthreads);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numacc ON job (hpc_cluster, cluster_partition, num_acc);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_energy ON job (hpc_cluster, cluster_partition, energy);
|
|
||||||
-- Cluster+Partition+Jobstate Filter
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate ON job (hpc_cluster, cluster_partition, job_state);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_user ON job (hpc_cluster, cluster_partition, job_state, hpc_user);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_project ON job (hpc_cluster, cluster_partition, job_state, project);
|
|
||||||
-- Cluster+Partition+Jobstate Filter Sorting
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_starttime ON job (hpc_cluster, cluster_partition, job_state, start_time);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_duration ON job (hpc_cluster, cluster_partition, job_state, duration);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_numnodes ON job (hpc_cluster, cluster_partition, job_state, num_nodes);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_numhwthreads ON job (hpc_cluster, cluster_partition, job_state, num_hwthreads);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_numacc ON job (hpc_cluster, cluster_partition, job_state, num_acc);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_energy ON job (hpc_cluster, cluster_partition, job_state, energy);
|
|
||||||
-- Cluster+JobState Filter
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate ON job (hpc_cluster, job_state);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_user ON job (hpc_cluster, job_state, hpc_user);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_project ON job (hpc_cluster, job_state, project);
|
|
||||||
-- Cluster+JobState Filter Sorting
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_starttime ON job (hpc_cluster, job_state, start_time);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_duration ON job (hpc_cluster, job_state, duration);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numnodes ON job (hpc_cluster, job_state, num_nodes);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numhwthreads ON job (hpc_cluster, job_state, num_hwthreads);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numacc ON job (hpc_cluster, job_state, num_acc);
|
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_energy ON job (hpc_cluster, job_state, energy);
|
|
||||||
--- --- END UPDATE existing indices
|
|
||||||
|
|
||||||
-- Add NEW Indices For New Job Table Columns
|
-- Add NEW Indices For New Job Table Columns
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_submittime ON job (hpc_cluster, submit_time);
|
CREATE INDEX IF NOT EXISTS jobs_cluster_submittime ON job (cluster, submit_time);
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_submittime ON job (hpc_cluster, cluster_partition, submit_time);
|
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_submittime ON job (cluster, cluster_partition, submit_time);
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_submittime ON job (hpc_cluster, cluster_partition, job_state, submit_time);
|
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_submittime ON job (
|
||||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_submittime ON job (hpc_cluster, job_state, submit_time);
|
cluster, cluster_partition, job_state, submit_time
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_submittime ON job (cluster, job_state, submit_time);
|
||||||
|
|
||||||
-- Add NEW Indices For New Node Table VARCHAR Fields
|
-- Add NEW Indices For New Node Table VARCHAR Fields
|
||||||
CREATE INDEX IF NOT EXISTS nodes_cluster ON node (cluster);
|
CREATE INDEX IF NOT EXISTS nodes_cluster ON node (cluster);
|
||||||
CREATE INDEX IF NOT EXISTS nodes_cluster_subcluster ON node (cluster, subcluster);
|
CREATE INDEX IF NOT EXISTS nodes_cluster_subcluster ON node (cluster, subcluster);
|
||||||
|
|
||||||
-- Add NEW Indices For New Node_State Table Fields
|
-- Add NEW Indices For New Node_State Table Fields
|
||||||
CREATE INDEX IF NOT EXISTS nodeStates_state ON node_state (node_state);
|
CREATE INDEX IF NOT EXISTS nodestates_state ON node_state (node_state);
|
||||||
CREATE INDEX IF NOT EXISTS nodeStates_health ON node_state (health_state);
|
CREATE INDEX IF NOT EXISTS nodestates_health ON node_state (health_state);
|
||||||
CREATE INDEX IF NOT EXISTS nodeStates_nodeid_state ON node (node_id, node_state);
|
CREATE INDEX IF NOT EXISTS nodestates_nodeid_state ON node_state (node_id, node_state);
|
||||||
CREATE INDEX IF NOT EXISTS nodeStates_nodeid_health ON node (node_id, health_state);
|
CREATE INDEX IF NOT EXISTS nodestates_nodeid_health ON node_state (node_id, health_state);
|
||||||
|
|
||||||
-- Add NEW Indices For Increased Amounts of Tags
|
-- Add NEW Indices For Increased Amounts of Tags
|
||||||
CREATE INDEX IF NOT EXISTS tags_jobid ON jobtag (job_id);
|
CREATE INDEX IF NOT EXISTS tags_jobid ON jobtag (job_id);
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ import (
|
|||||||
var groupBy2column = map[model.Aggregate]string{
|
var groupBy2column = map[model.Aggregate]string{
|
||||||
model.AggregateUser: "job.hpc_user",
|
model.AggregateUser: "job.hpc_user",
|
||||||
model.AggregateProject: "job.project",
|
model.AggregateProject: "job.project",
|
||||||
model.AggregateCluster: "job.hpc_cluster",
|
model.AggregateCluster: "job.cluster",
|
||||||
model.AggregateSubcluster: "job.subcluster",
|
model.AggregateSubcluster: "job.subcluster",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -79,15 +79,15 @@ func TestGetUIConfig(t *testing.T) {
|
|||||||
t.Fatal("No config")
|
t.Fatal("No config")
|
||||||
}
|
}
|
||||||
|
|
||||||
for key := range cfg {
|
tmp, exists := cfg["metricConfig_jobListMetrics"]
|
||||||
print("%s\n", key)
|
if exists {
|
||||||
}
|
|
||||||
// t.Fatal("No config")
|
|
||||||
|
|
||||||
// tmp := cfg["plot_list_selectedMetrics"]
|
metrics := tmp.([]string)
|
||||||
// metrics := tmp.([]string)
|
str := metrics[2]
|
||||||
// str := metrics[2]
|
if str != "flops_any" {
|
||||||
// if str != "flops_any" {
|
t.Errorf("wrong config\ngot: %s \nwant: flops_any", str)
|
||||||
// t.Errorf("wrong config\ngot: %s \nwant: flops_any", str)
|
}
|
||||||
// }
|
} else {
|
||||||
|
t.Fatal("Key metricConfig_jobListMetrics is missing")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user