mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-12-17 04:36:17 +01:00
Refactor variabel naming and update doc comments
This commit is contained in:
@@ -253,7 +253,7 @@ func (api *RestAPI) getCompleteJobByID(rw http.ResponseWriter, r *http.Request)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
job, err = api.JobRepository.FindById(r.Context(), id) // Get Job from Repo by ID
|
job, err = api.JobRepository.FindByID(r.Context(), id) // Get Job from Repo by ID
|
||||||
} else {
|
} else {
|
||||||
handleError(fmt.Errorf("the parameter 'id' is required"), http.StatusBadRequest, rw)
|
handleError(fmt.Errorf("the parameter 'id' is required"), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
@@ -346,7 +346,7 @@ func (api *RestAPI) getJobByID(rw http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
job, err = api.JobRepository.FindById(r.Context(), id)
|
job, err = api.JobRepository.FindByID(r.Context(), id)
|
||||||
} else {
|
} else {
|
||||||
handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw)
|
handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
@@ -445,7 +445,7 @@ func (api *RestAPI) editMeta(rw http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
job, err := api.JobRepository.FindById(r.Context(), id)
|
job, err := api.JobRepository.FindByID(r.Context(), id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw)
|
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw)
|
||||||
return
|
return
|
||||||
@@ -493,7 +493,7 @@ func (api *RestAPI) tagJob(rw http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
job, err := api.JobRepository.FindById(r.Context(), id)
|
job, err := api.JobRepository.FindByID(r.Context(), id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw)
|
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw)
|
||||||
return
|
return
|
||||||
@@ -557,7 +557,7 @@ func (api *RestAPI) removeTagJob(rw http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
job, err := api.JobRepository.FindById(r.Context(), id)
|
job, err := api.JobRepository.FindByID(r.Context(), id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw)
|
handleError(fmt.Errorf("finding job failed: %w", err), http.StatusNotFound, rw)
|
||||||
return
|
return
|
||||||
@@ -796,7 +796,7 @@ func (api *RestAPI) deleteJobByID(rw http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = api.JobRepository.DeleteJobById(id)
|
err = api.JobRepository.DeleteJobByID(id)
|
||||||
} else {
|
} else {
|
||||||
handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw)
|
handleError(errors.New("the parameter 'id' is required"), http.StatusBadRequest, rw)
|
||||||
return
|
return
|
||||||
@@ -852,7 +852,7 @@ func (api *RestAPI) deleteJobByRequest(rw http.ResponseWriter, r *http.Request)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = api.JobRepository.DeleteJobById(*job.ID)
|
err = api.JobRepository.DeleteJobByID(*job.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(fmt.Errorf("deleting job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
handleError(fmt.Errorf("deleting job failed: %w", err), http.StatusUnprocessableEntity, rw)
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -376,7 +376,7 @@ func (r *queryResolver) Node(ctx context.Context, id string) (*schema.Node, erro
|
|||||||
cclog.Warn("Error while parsing job id")
|
cclog.Warn("Error while parsing job id")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return repo.GetNodeById(numericId, false)
|
return repo.GetNodeByID(numericId, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Nodes is the resolver for the nodes field.
|
// Nodes is the resolver for the nodes field.
|
||||||
@@ -442,7 +442,7 @@ func (r *queryResolver) Job(ctx context.Context, id string) (*schema.Job, error)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
job, err := r.Repo.FindById(ctx, numericId)
|
job, err := r.Repo.FindByID(ctx, numericId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Warn("Error while finding job by id")
|
cclog.Warn("Error while finding job by id")
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -1003,10 +1003,12 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
|
|||||||
// SubCluster returns generated.SubClusterResolver implementation.
|
// SubCluster returns generated.SubClusterResolver implementation.
|
||||||
func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} }
|
func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} }
|
||||||
|
|
||||||
type clusterResolver struct{ *Resolver }
|
type (
|
||||||
type jobResolver struct{ *Resolver }
|
clusterResolver struct{ *Resolver }
|
||||||
type metricValueResolver struct{ *Resolver }
|
jobResolver struct{ *Resolver }
|
||||||
type mutationResolver struct{ *Resolver }
|
metricValueResolver struct{ *Resolver }
|
||||||
type nodeResolver struct{ *Resolver }
|
mutationResolver struct{ *Resolver }
|
||||||
type queryResolver struct{ *Resolver }
|
nodeResolver struct{ *Resolver }
|
||||||
type subClusterResolver struct{ *Resolver }
|
queryResolver struct{ *Resolver }
|
||||||
|
subClusterResolver struct{ *Resolver }
|
||||||
|
)
|
||||||
|
|||||||
@@ -2,12 +2,14 @@
|
|||||||
// All rights reserved. This file is part of cc-backend.
|
// All rights reserved. This file is part of cc-backend.
|
||||||
// Use of this source code is governed by a MIT-style
|
// Use of this source code is governed by a MIT-style
|
||||||
// license that can be found in the LICENSE file.
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
package graph
|
package graph
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"slices"
|
||||||
|
|
||||||
"github.com/99designs/gqlgen/graphql"
|
"github.com/99designs/gqlgen/graphql"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
|
||||||
@@ -185,11 +187,5 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
|
|||||||
func requireField(ctx context.Context, name string) bool {
|
func requireField(ctx context.Context, name string) bool {
|
||||||
fields := graphql.CollectAllFields(ctx)
|
fields := graphql.CollectAllFields(ctx)
|
||||||
|
|
||||||
for _, f := range fields {
|
return slices.Contains(fields, name)
|
||||||
if f == name {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -376,7 +376,7 @@ func (r *JobRepository) DeleteJobsBefore(startTime int64, omitTagged bool) (int,
|
|||||||
return cnt, err
|
return cnt, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *JobRepository) DeleteJobById(id int64) error {
|
func (r *JobRepository) DeleteJobByID(id int64) error {
|
||||||
// Invalidate cache entries before deletion
|
// Invalidate cache entries before deletion
|
||||||
r.cache.Del(fmt.Sprintf("metadata:%d", id))
|
r.cache.Del(fmt.Sprintf("metadata:%d", id))
|
||||||
r.cache.Del(fmt.Sprintf("energyFootprint:%d", id))
|
r.cache.Del(fmt.Sprintf("energyFootprint:%d", id))
|
||||||
@@ -577,10 +577,10 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *JobRepository) FindJobIdsByTag(tagId int64) ([]int64, error) {
|
func (r *JobRepository) FindJobIdsByTag(tagID int64) ([]int64, error) {
|
||||||
query := sq.Select("job.id").From("job").
|
query := sq.Select("job.id").From("job").
|
||||||
Join("jobtag ON jobtag.job_id = job.id").
|
Join("jobtag ON jobtag.job_id = job.id").
|
||||||
Where(sq.Eq{"jobtag.tag_id": tagId}).Distinct()
|
Where(sq.Eq{"jobtag.tag_id": tagID}).Distinct()
|
||||||
rows, err := query.RunWith(r.stmtCache).Query()
|
rows, err := query.RunWith(r.stmtCache).Query()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Error("Error while running query")
|
cclog.Error("Error while running query")
|
||||||
@@ -589,15 +589,15 @@ func (r *JobRepository) FindJobIdsByTag(tagId int64) ([]int64, error) {
|
|||||||
jobIds := make([]int64, 0, 100)
|
jobIds := make([]int64, 0, 100)
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var jobId int64
|
var jobID int64
|
||||||
|
|
||||||
if err := rows.Scan(&jobId); err != nil {
|
if err := rows.Scan(&jobID); err != nil {
|
||||||
rows.Close()
|
rows.Close()
|
||||||
cclog.Warn("Error while scanning rows")
|
cclog.Warn("Error while scanning rows")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
jobIds = append(jobIds, jobId)
|
jobIds = append(jobIds, jobID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return jobIds, nil
|
return jobIds, nil
|
||||||
@@ -731,10 +731,11 @@ func (r *JobRepository) UpdateEnergy(
|
|||||||
metricEnergy := 0.0
|
metricEnergy := 0.0
|
||||||
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil {
|
if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil {
|
||||||
// Note: For DB data, calculate and save as kWh
|
// Note: For DB data, calculate and save as kWh
|
||||||
if sc.MetricConfig[i].Energy == "energy" { // this metric has energy as unit (Joules or Wh)
|
switch sc.MetricConfig[i].Energy {
|
||||||
|
case "energy": // this metric has energy as unit (Joules or Wh)
|
||||||
cclog.Warnf("Update EnergyFootprint for Job %d and Metric %s on cluster %s: Set to 'energy' in cluster.json: Not implemented, will return 0.0", jobMeta.JobID, jobMeta.Cluster, fp)
|
cclog.Warnf("Update EnergyFootprint for Job %d and Metric %s on cluster %s: Set to 'energy' in cluster.json: Not implemented, will return 0.0", jobMeta.JobID, jobMeta.Cluster, fp)
|
||||||
// FIXME: Needs sum as stats type
|
// FIXME: Needs sum as stats type
|
||||||
} else if sc.MetricConfig[i].Energy == "power" { // this metric has power as unit (Watt)
|
case "power": // this metric has power as unit (Watt)
|
||||||
// Energy: Power (in Watts) * Time (in Seconds)
|
// Energy: Power (in Watts) * Time (in Seconds)
|
||||||
// Unit: (W * (s / 3600)) / 1000 = kWh
|
// Unit: (W * (s / 3600)) / 1000 = kWh
|
||||||
// Round 2 Digits: round(Energy * 100) / 100
|
// Round 2 Digits: round(Energy * 100) / 100
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
// All rights reserved. This file is part of cc-backend.
|
// All rights reserved. This file is part of cc-backend.
|
||||||
// Use of this source code is governed by a MIT-style
|
// Use of this source code is governed by a MIT-style
|
||||||
// license that can be found in the LICENSE file.
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
package repository
|
package repository
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -109,27 +110,27 @@ func (r *JobRepository) Start(job *schema.Job) (id int64, err error) {
|
|||||||
|
|
||||||
// Stop updates the job with the database id jobId using the provided arguments.
|
// Stop updates the job with the database id jobId using the provided arguments.
|
||||||
func (r *JobRepository) Stop(
|
func (r *JobRepository) Stop(
|
||||||
jobId int64,
|
jobID int64,
|
||||||
duration int32,
|
duration int32,
|
||||||
state schema.JobState,
|
state schema.JobState,
|
||||||
monitoringStatus int32,
|
monitoringStatus int32,
|
||||||
) (err error) {
|
) (err error) {
|
||||||
// Invalidate cache entries as job state is changing
|
// Invalidate cache entries as job state is changing
|
||||||
r.cache.Del(fmt.Sprintf("metadata:%d", jobId))
|
r.cache.Del(fmt.Sprintf("metadata:%d", jobID))
|
||||||
r.cache.Del(fmt.Sprintf("energyFootprint:%d", jobId))
|
r.cache.Del(fmt.Sprintf("energyFootprint:%d", jobID))
|
||||||
|
|
||||||
stmt := sq.Update("job").
|
stmt := sq.Update("job").
|
||||||
Set("job_state", state).
|
Set("job_state", state).
|
||||||
Set("duration", duration).
|
Set("duration", duration).
|
||||||
Set("monitoring_status", monitoringStatus).
|
Set("monitoring_status", monitoringStatus).
|
||||||
Where("job.id = ?", jobId)
|
Where("job.id = ?", jobID)
|
||||||
|
|
||||||
_, err = stmt.RunWith(r.stmtCache).Exec()
|
_, err = stmt.RunWith(r.stmtCache).Exec()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *JobRepository) StopCached(
|
func (r *JobRepository) StopCached(
|
||||||
jobId int64,
|
jobID int64,
|
||||||
duration int32,
|
duration int32,
|
||||||
state schema.JobState,
|
state schema.JobState,
|
||||||
monitoringStatus int32,
|
monitoringStatus int32,
|
||||||
@@ -140,7 +141,7 @@ func (r *JobRepository) StopCached(
|
|||||||
Set("job_state", state).
|
Set("job_state", state).
|
||||||
Set("duration", duration).
|
Set("duration", duration).
|
||||||
Set("monitoring_status", monitoringStatus).
|
Set("monitoring_status", monitoringStatus).
|
||||||
Where("job_cache.id = ?", jobId)
|
Where("job_cache.id = ?", jobID)
|
||||||
|
|
||||||
_, err = stmt.RunWith(r.stmtCache).Exec()
|
_, err = stmt.RunWith(r.stmtCache).Exec()
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
// All rights reserved. This file is part of cc-backend.
|
// All rights reserved. This file is part of cc-backend.
|
||||||
// Use of this source code is governed by a MIT-style
|
// Use of this source code is governed by a MIT-style
|
||||||
// license that can be found in the LICENSE file.
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
package repository
|
package repository
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -22,13 +23,13 @@ import (
|
|||||||
// It returns a pointer to a schema.Job data structure and an error variable.
|
// It returns a pointer to a schema.Job data structure and an error variable.
|
||||||
// To check if no job was found test err == sql.ErrNoRows
|
// To check if no job was found test err == sql.ErrNoRows
|
||||||
func (r *JobRepository) Find(
|
func (r *JobRepository) Find(
|
||||||
jobId *int64,
|
jobID *int64,
|
||||||
cluster *string,
|
cluster *string,
|
||||||
startTime *int64,
|
startTime *int64,
|
||||||
) (*schema.Job, error) {
|
) (*schema.Job, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
q := sq.Select(jobColumns...).From("job").
|
q := sq.Select(jobColumns...).From("job").
|
||||||
Where("job.job_id = ?", *jobId)
|
Where("job.job_id = ?", *jobID)
|
||||||
|
|
||||||
if cluster != nil {
|
if cluster != nil {
|
||||||
q = q.Where("job.cluster = ?", *cluster)
|
q = q.Where("job.cluster = ?", *cluster)
|
||||||
@@ -44,12 +45,12 @@ func (r *JobRepository) Find(
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *JobRepository) FindCached(
|
func (r *JobRepository) FindCached(
|
||||||
jobId *int64,
|
jobID *int64,
|
||||||
cluster *string,
|
cluster *string,
|
||||||
startTime *int64,
|
startTime *int64,
|
||||||
) (*schema.Job, error) {
|
) (*schema.Job, error) {
|
||||||
q := sq.Select(jobCacheColumns...).From("job_cache").
|
q := sq.Select(jobCacheColumns...).From("job_cache").
|
||||||
Where("job_cache.job_id = ?", *jobId)
|
Where("job_cache.job_id = ?", *jobID)
|
||||||
|
|
||||||
if cluster != nil {
|
if cluster != nil {
|
||||||
q = q.Where("job_cache.cluster = ?", *cluster)
|
q = q.Where("job_cache.cluster = ?", *cluster)
|
||||||
@@ -63,19 +64,19 @@ func (r *JobRepository) FindCached(
|
|||||||
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find executes a SQL query to find a specific batch job.
|
// FindAll executes a SQL query to find all batch jobs matching the given criteria.
|
||||||
// The job is queried using the batch job id, the cluster name,
|
// Jobs are queried using the batch job id, and optionally filtered by cluster name
|
||||||
// and the start time of the job in UNIX epoch time seconds.
|
// and start time (UNIX epoch time seconds).
|
||||||
// It returns a pointer to a schema.Job data structure and an error variable.
|
// It returns a slice of pointers to schema.Job data structures and an error variable.
|
||||||
// To check if no job was found test err == sql.ErrNoRows
|
// An empty slice is returned if no matching jobs are found.
|
||||||
func (r *JobRepository) FindAll(
|
func (r *JobRepository) FindAll(
|
||||||
jobId *int64,
|
jobID *int64,
|
||||||
cluster *string,
|
cluster *string,
|
||||||
startTime *int64,
|
startTime *int64,
|
||||||
) ([]*schema.Job, error) {
|
) ([]*schema.Job, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
q := sq.Select(jobColumns...).From("job").
|
q := sq.Select(jobColumns...).From("job").
|
||||||
Where("job.job_id = ?", *jobId)
|
Where("job.job_id = ?", *jobID)
|
||||||
|
|
||||||
if cluster != nil {
|
if cluster != nil {
|
||||||
q = q.Where("job.cluster = ?", *cluster)
|
q = q.Where("job.cluster = ?", *cluster)
|
||||||
@@ -139,13 +140,13 @@ func (r *JobRepository) GetJobList(limit int, offset int) ([]int64, error) {
|
|||||||
return jl, nil
|
return jl, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindById executes a SQL query to find a specific batch job.
|
// FindByID executes a SQL query to find a specific batch job.
|
||||||
// The job is queried using the database id.
|
// The job is queried using the database id.
|
||||||
// It returns a pointer to a schema.Job data structure and an error variable.
|
// It returns a pointer to a schema.Job data structure and an error variable.
|
||||||
// To check if no job was found test err == sql.ErrNoRows
|
// To check if no job was found test err == sql.ErrNoRows
|
||||||
func (r *JobRepository) FindById(ctx context.Context, jobId int64) (*schema.Job, error) {
|
func (r *JobRepository) FindByID(ctx context.Context, jobID int64) (*schema.Job, error) {
|
||||||
q := sq.Select(jobColumns...).
|
q := sq.Select(jobColumns...).
|
||||||
From("job").Where("job.id = ?", jobId)
|
From("job").Where("job.id = ?", jobID)
|
||||||
|
|
||||||
q, qerr := SecurityCheck(ctx, q)
|
q, qerr := SecurityCheck(ctx, q)
|
||||||
if qerr != nil {
|
if qerr != nil {
|
||||||
@@ -155,14 +156,14 @@ func (r *JobRepository) FindById(ctx context.Context, jobId int64) (*schema.Job,
|
|||||||
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindByIdWithUser executes a SQL query to find a specific batch job.
|
// FindByIDWithUser executes a SQL query to find a specific batch job.
|
||||||
// The job is queried using the database id. The user is passed directly,
|
// The job is queried using the database id. The user is passed directly,
|
||||||
// instead as part of the context.
|
// instead as part of the context.
|
||||||
// It returns a pointer to a schema.Job data structure and an error variable.
|
// It returns a pointer to a schema.Job data structure and an error variable.
|
||||||
// To check if no job was found test err == sql.ErrNoRows
|
// To check if no job was found test err == sql.ErrNoRows
|
||||||
func (r *JobRepository) FindByIdWithUser(user *schema.User, jobId int64) (*schema.Job, error) {
|
func (r *JobRepository) FindByIDWithUser(user *schema.User, jobID int64) (*schema.Job, error) {
|
||||||
q := sq.Select(jobColumns...).
|
q := sq.Select(jobColumns...).
|
||||||
From("job").Where("job.id = ?", jobId)
|
From("job").Where("job.id = ?", jobID)
|
||||||
|
|
||||||
q, qerr := SecurityCheckWithUser(user, q)
|
q, qerr := SecurityCheckWithUser(user, q)
|
||||||
if qerr != nil {
|
if qerr != nil {
|
||||||
@@ -172,24 +173,24 @@ func (r *JobRepository) FindByIdWithUser(user *schema.User, jobId int64) (*schem
|
|||||||
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindByIdDirect executes a SQL query to find a specific batch job.
|
// FindByIDDirect executes a SQL query to find a specific batch job.
|
||||||
// The job is queried using the database id.
|
// The job is queried using the database id.
|
||||||
// It returns a pointer to a schema.Job data structure and an error variable.
|
// It returns a pointer to a schema.Job data structure and an error variable.
|
||||||
// To check if no job was found test err == sql.ErrNoRows
|
// To check if no job was found test err == sql.ErrNoRows
|
||||||
func (r *JobRepository) FindByIdDirect(jobId int64) (*schema.Job, error) {
|
func (r *JobRepository) FindByIDDirect(jobID int64) (*schema.Job, error) {
|
||||||
q := sq.Select(jobColumns...).
|
q := sq.Select(jobColumns...).
|
||||||
From("job").Where("job.id = ?", jobId)
|
From("job").Where("job.id = ?", jobID)
|
||||||
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindByJobId executes a SQL query to find a specific batch job.
|
// FindByJobID executes a SQL query to find a specific batch job.
|
||||||
// The job is queried using the slurm id and the clustername.
|
// The job is queried using the slurm id and the clustername.
|
||||||
// It returns a pointer to a schema.Job data structure and an error variable.
|
// It returns a pointer to a schema.Job data structure and an error variable.
|
||||||
// To check if no job was found test err == sql.ErrNoRows
|
// To check if no job was found test err == sql.ErrNoRows
|
||||||
func (r *JobRepository) FindByJobId(ctx context.Context, jobId int64, startTime int64, cluster string) (*schema.Job, error) {
|
func (r *JobRepository) FindByJobID(ctx context.Context, jobID int64, startTime int64, cluster string) (*schema.Job, error) {
|
||||||
q := sq.Select(jobColumns...).
|
q := sq.Select(jobColumns...).
|
||||||
From("job").
|
From("job").
|
||||||
Where("job.job_id = ?", jobId).
|
Where("job.job_id = ?", jobID).
|
||||||
Where("job.cluster = ?", cluster).
|
Where("job.cluster = ?", cluster).
|
||||||
Where("job.start_time = ?", startTime)
|
Where("job.start_time = ?", startTime)
|
||||||
|
|
||||||
@@ -205,10 +206,10 @@ func (r *JobRepository) FindByJobId(ctx context.Context, jobId int64, startTime
|
|||||||
// The job is queried using the slurm id,a username and the cluster.
|
// The job is queried using the slurm id,a username and the cluster.
|
||||||
// It returns a bool.
|
// It returns a bool.
|
||||||
// If job was found, user is owner: test err != sql.ErrNoRows
|
// If job was found, user is owner: test err != sql.ErrNoRows
|
||||||
func (r *JobRepository) IsJobOwner(jobId int64, startTime int64, user string, cluster string) bool {
|
func (r *JobRepository) IsJobOwner(jobID int64, startTime int64, user string, cluster string) bool {
|
||||||
q := sq.Select("id").
|
q := sq.Select("id").
|
||||||
From("job").
|
From("job").
|
||||||
Where("job.job_id = ?", jobId).
|
Where("job.job_id = ?", jobID).
|
||||||
Where("job.hpc_user = ?", user).
|
Where("job.hpc_user = ?", user).
|
||||||
Where("job.cluster = ?", cluster).
|
Where("job.cluster = ?", cluster).
|
||||||
Where("job.start_time = ?", startTime)
|
Where("job.start_time = ?", startTime)
|
||||||
@@ -269,19 +270,19 @@ func (r *JobRepository) FindConcurrentJobs(
|
|||||||
queryString := fmt.Sprintf("cluster=%s", job.Cluster)
|
queryString := fmt.Sprintf("cluster=%s", job.Cluster)
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var id, jobId, startTime sql.NullInt64
|
var id, jobID, startTime sql.NullInt64
|
||||||
|
|
||||||
if err = rows.Scan(&id, &jobId, &startTime); err != nil {
|
if err = rows.Scan(&id, &jobID, &startTime); err != nil {
|
||||||
cclog.Warn("Error while scanning rows")
|
cclog.Warn("Error while scanning rows")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if id.Valid {
|
if id.Valid {
|
||||||
queryString += fmt.Sprintf("&jobId=%d", int(jobId.Int64))
|
queryString += fmt.Sprintf("&jobId=%d", int(jobID.Int64))
|
||||||
items = append(items,
|
items = append(items,
|
||||||
&model.JobLink{
|
&model.JobLink{
|
||||||
ID: fmt.Sprint(id.Int64),
|
ID: fmt.Sprint(id.Int64),
|
||||||
JobID: int(jobId.Int64),
|
JobID: int(jobID.Int64),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -294,19 +295,19 @@ func (r *JobRepository) FindConcurrentJobs(
|
|||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var id, jobId, startTime sql.NullInt64
|
var id, jobID, startTime sql.NullInt64
|
||||||
|
|
||||||
if err := rows.Scan(&id, &jobId, &startTime); err != nil {
|
if err := rows.Scan(&id, &jobID, &startTime); err != nil {
|
||||||
cclog.Warn("Error while scanning rows")
|
cclog.Warn("Error while scanning rows")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if id.Valid {
|
if id.Valid {
|
||||||
queryString += fmt.Sprintf("&jobId=%d", int(jobId.Int64))
|
queryString += fmt.Sprintf("&jobId=%d", int(jobID.Int64))
|
||||||
items = append(items,
|
items = append(items,
|
||||||
&model.JobLink{
|
&model.JobLink{
|
||||||
ID: fmt.Sprint(id.Int64),
|
ID: fmt.Sprint(id.Int64),
|
||||||
JobID: int(jobId.Int64),
|
JobID: int(jobID.Int64),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ func TestFind(t *testing.T) {
|
|||||||
func TestFindById(t *testing.T) {
|
func TestFindById(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
|
|
||||||
job, err := r.FindById(getContext(t), 338)
|
job, err := r.FindByID(getContext(t), 338)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -106,7 +106,7 @@ func (r *NodeRepository) GetNode(hostname string, cluster string, withMeta bool)
|
|||||||
return node, nil
|
return node, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *NodeRepository) GetNodeById(id int64, withMeta bool) (*schema.Node, error) {
|
func (r *NodeRepository) GetNodeByID(id int64, withMeta bool) (*schema.Node, error) {
|
||||||
node := &schema.Node{}
|
node := &schema.Node{}
|
||||||
var timestamp int
|
var timestamp int
|
||||||
if err := sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state",
|
if err := sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state",
|
||||||
@@ -240,7 +240,6 @@ func (r *NodeRepository) QueryNodes(
|
|||||||
page *model.PageRequest,
|
page *model.PageRequest,
|
||||||
order *model.OrderByInput, // Currently unused!
|
order *model.OrderByInput, // Currently unused!
|
||||||
) ([]*schema.Node, error) {
|
) ([]*schema.Node, error) {
|
||||||
|
|
||||||
query, qerr := AccessCheck(ctx,
|
query, qerr := AccessCheck(ctx,
|
||||||
sq.Select("hostname", "cluster", "subcluster", "node_state", "health_state", "MAX(time_stamp) as time").
|
sq.Select("hostname", "cluster", "subcluster", "node_state", "health_state", "MAX(time_stamp) as time").
|
||||||
From("node").
|
From("node").
|
||||||
@@ -309,7 +308,6 @@ func (r *NodeRepository) CountNodes(
|
|||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
filters []*model.NodeFilter,
|
filters []*model.NodeFilter,
|
||||||
) (int, error) {
|
) (int, error) {
|
||||||
|
|
||||||
query, qerr := AccessCheck(ctx,
|
query, qerr := AccessCheck(ctx,
|
||||||
sq.Select("time_stamp", "count(*) as countRes").
|
sq.Select("time_stamp", "count(*) as countRes").
|
||||||
From("node").
|
From("node").
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ func BenchmarkDB_FindJobById(b *testing.B) {
|
|||||||
|
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
_, err := db.FindById(getContext(b), jobId)
|
_, err := db.FindByID(getContext(b), jobId)
|
||||||
noErr(b, err)
|
noErr(b, err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
// All rights reserved. This file is part of cc-backend.
|
// All rights reserved. This file is part of cc-backend.
|
||||||
// Use of this source code is governed by a MIT-style
|
// Use of this source code is governed by a MIT-style
|
||||||
// license that can be found in the LICENSE file.
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
package repository
|
package repository
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -18,7 +19,7 @@ import (
|
|||||||
// AddTag adds the tag with id `tagId` to the job with the database id `jobId`.
|
// AddTag adds the tag with id `tagId` to the job with the database id `jobId`.
|
||||||
// Requires user authentication for security checks.
|
// Requires user authentication for security checks.
|
||||||
func (r *JobRepository) AddTag(user *schema.User, job int64, tag int64) ([]*schema.Tag, error) {
|
func (r *JobRepository) AddTag(user *schema.User, job int64, tag int64) ([]*schema.Tag, error) {
|
||||||
j, err := r.FindByIdWithUser(user, job)
|
j, err := r.FindByIDWithUser(user, job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Warnf("Error finding job %d for user %s: %v", job, user.Username, err)
|
cclog.Warnf("Error finding job %d for user %s: %v", job, user.Username, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -32,7 +33,7 @@ func (r *JobRepository) AddTag(user *schema.User, job int64, tag int64) ([]*sche
|
|||||||
// AddTagDirect adds a tag without user security checks.
|
// AddTagDirect adds a tag without user security checks.
|
||||||
// Use only for internal/admin operations.
|
// Use only for internal/admin operations.
|
||||||
func (r *JobRepository) AddTagDirect(job int64, tag int64) ([]*schema.Tag, error) {
|
func (r *JobRepository) AddTagDirect(job int64, tag int64) ([]*schema.Tag, error) {
|
||||||
j, err := r.FindByIdDirect(job)
|
j, err := r.FindByIDDirect(job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Warnf("Error finding job %d: %v", job, err)
|
cclog.Warnf("Error finding job %d: %v", job, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -43,10 +44,10 @@ func (r *JobRepository) AddTagDirect(job int64, tag int64) ([]*schema.Tag, error
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Removes a tag from a job by tag id.
|
// RemoveTag removes the tag with the database id `tag` from the job with the database id `job`.
|
||||||
// Used by GraphQL API
|
// Requires user authentication for security checks. Used by GraphQL API.
|
||||||
func (r *JobRepository) RemoveTag(user *schema.User, job, tag int64) ([]*schema.Tag, error) {
|
func (r *JobRepository) RemoveTag(user *schema.User, job, tag int64) ([]*schema.Tag, error) {
|
||||||
j, err := r.FindByIdWithUser(user, job)
|
j, err := r.FindByIDWithUser(user, job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Warn("Error while finding job by id")
|
cclog.Warn("Error while finding job by id")
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -75,8 +76,8 @@ func (r *JobRepository) RemoveTag(user *schema.User, job, tag int64) ([]*schema.
|
|||||||
return tags, archive.UpdateTags(j, archiveTags)
|
return tags, archive.UpdateTags(j, archiveTags)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Removes a tag from a job by tag info
|
// RemoveJobTagByRequest removes a tag from the job with the database id `job` by tag type, name, and scope.
|
||||||
// Used by REST API
|
// Requires user authentication for security checks. Used by REST API.
|
||||||
func (r *JobRepository) RemoveJobTagByRequest(user *schema.User, job int64, tagType string, tagName string, tagScope string) ([]*schema.Tag, error) {
|
func (r *JobRepository) RemoveJobTagByRequest(user *schema.User, job int64, tagType string, tagName string, tagScope string) ([]*schema.Tag, error) {
|
||||||
// Get Tag ID to delete
|
// Get Tag ID to delete
|
||||||
tagID, exists := r.TagId(tagType, tagName, tagScope)
|
tagID, exists := r.TagId(tagType, tagName, tagScope)
|
||||||
@@ -86,7 +87,7 @@ func (r *JobRepository) RemoveJobTagByRequest(user *schema.User, job int64, tagT
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get Job
|
// Get Job
|
||||||
j, err := r.FindByIdWithUser(user, job)
|
j, err := r.FindByIDWithUser(user, job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Warn("Error while finding job by id")
|
cclog.Warn("Error while finding job by id")
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -124,7 +125,7 @@ func (r *JobRepository) removeTagFromArchiveJobs(jobIds []int64) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
job, err := r.FindByIdDirect(j)
|
job, err := r.FindByIDDirect(j)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Warnf("Error while getting job %d", j)
|
cclog.Warnf("Error while getting job %d", j)
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ func TestRegister(t *testing.T) {
|
|||||||
func TestMatch(t *testing.T) {
|
func TestMatch(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
|
|
||||||
job, err := r.FindByIdDirect(317)
|
job, err := r.FindByIDDirect(317)
|
||||||
noErr(t, err)
|
noErr(t, err)
|
||||||
|
|
||||||
var tagger AppTagger
|
var tagger AppTagger
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ type JobTagger struct {
|
|||||||
// startTaggers are applied when a job starts (e.g., application detection)
|
// startTaggers are applied when a job starts (e.g., application detection)
|
||||||
startTaggers []Tagger
|
startTaggers []Tagger
|
||||||
// stopTaggers are applied when a job completes (e.g., job classification)
|
// stopTaggers are applied when a job completes (e.g., job classification)
|
||||||
stopTaggers []Tagger
|
stopTaggers []Tagger
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTagger() {
|
func newTagger() {
|
||||||
@@ -98,7 +98,7 @@ func RunTaggers() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, id := range jl {
|
for _, id := range jl {
|
||||||
job, err := r.FindByIdDirect(id)
|
job, err := r.FindByIDDirect(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Errorf("Error while getting job %s", err)
|
cclog.Errorf("Error while getting job %s", err)
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ func TestInit(t *testing.T) {
|
|||||||
func TestJobStartCallback(t *testing.T) {
|
func TestJobStartCallback(t *testing.T) {
|
||||||
Init()
|
Init()
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
job, err := r.FindByIdDirect(525)
|
job, err := r.FindByIDDirect(525)
|
||||||
noErr(t, err)
|
noErr(t, err)
|
||||||
|
|
||||||
jobs := make([]*schema.Job, 0, 1)
|
jobs := make([]*schema.Job, 0, 1)
|
||||||
|
|||||||
Reference in New Issue
Block a user