mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-01-15 17:21:46 +01:00
Review and improve error messages and doc comments
This commit is contained in:
@@ -156,27 +156,41 @@ func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) {
|
||||
return job, nil
|
||||
}
|
||||
|
||||
// Optimize performs database optimization by running VACUUM command.
|
||||
// This reclaims unused space and defragments the database file.
|
||||
// Should be run periodically during maintenance windows.
|
||||
func (r *JobRepository) Optimize() error {
|
||||
if _, err := r.DB.Exec(`VACUUM`); err != nil {
|
||||
return err
|
||||
cclog.Errorf("Error while executing VACUUM: %v", err)
|
||||
return fmt.Errorf("failed to optimize database: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Flush removes all data from job-related tables (jobtag, tag, job).
|
||||
// WARNING: This is a destructive operation that deletes all job data.
|
||||
// Use with extreme caution, typically only for testing or complete resets.
|
||||
func (r *JobRepository) Flush() error {
|
||||
if _, err := r.DB.Exec(`DELETE FROM jobtag`); err != nil {
|
||||
return err
|
||||
cclog.Errorf("Error while deleting from jobtag table: %v", err)
|
||||
return fmt.Errorf("failed to flush jobtag table: %w", err)
|
||||
}
|
||||
if _, err := r.DB.Exec(`DELETE FROM tag`); err != nil {
|
||||
return err
|
||||
cclog.Errorf("Error while deleting from tag table: %v", err)
|
||||
return fmt.Errorf("failed to flush tag table: %w", err)
|
||||
}
|
||||
if _, err := r.DB.Exec(`DELETE FROM job`); err != nil {
|
||||
return err
|
||||
cclog.Errorf("Error while deleting from job table: %v", err)
|
||||
return fmt.Errorf("failed to flush job table: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error) {
|
||||
if job == nil {
|
||||
return nil, fmt.Errorf("job cannot be nil")
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
cachekey := fmt.Sprintf("metadata:%d", job.ID)
|
||||
if cached := r.cache.Get(cachekey, nil); cached != nil {
|
||||
@@ -186,8 +200,8 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
|
||||
|
||||
if err := sq.Select("job.meta_data").From("job").Where("job.id = ?", job.ID).
|
||||
RunWith(r.stmtCache).QueryRow().Scan(&job.RawMetaData); err != nil {
|
||||
cclog.Warn("Error while scanning for job metadata")
|
||||
return nil, err
|
||||
cclog.Warnf("Error while scanning for job metadata (ID=%d): %v", job.ID, err)
|
||||
return nil, fmt.Errorf("failed to fetch metadata for job %d: %w", job.ID, err)
|
||||
}
|
||||
|
||||
if len(job.RawMetaData) == 0 {
|
||||
@@ -195,8 +209,8 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil {
|
||||
cclog.Warn("Error while unmarshaling raw metadata json")
|
||||
return nil, err
|
||||
cclog.Warnf("Error while unmarshaling raw metadata json (ID=%d): %v", job.ID, err)
|
||||
return nil, fmt.Errorf("failed to unmarshal metadata for job %d: %w", job.ID, err)
|
||||
}
|
||||
|
||||
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
|
||||
@@ -205,6 +219,10 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
|
||||
}
|
||||
|
||||
func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err error) {
|
||||
if job == nil {
|
||||
return fmt.Errorf("job cannot be nil")
|
||||
}
|
||||
|
||||
cachekey := fmt.Sprintf("metadata:%d", job.ID)
|
||||
r.cache.Del(cachekey)
|
||||
if job.MetaData == nil {
|
||||
@@ -241,12 +259,16 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
|
||||
}
|
||||
|
||||
func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, error) {
|
||||
if job == nil {
|
||||
return nil, fmt.Errorf("job cannot be nil")
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
if err := sq.Select("job.footprint").From("job").Where("job.id = ?", job.ID).
|
||||
RunWith(r.stmtCache).QueryRow().Scan(&job.RawFootprint); err != nil {
|
||||
cclog.Warn("Error while scanning for job footprint")
|
||||
return nil, err
|
||||
cclog.Warnf("Error while scanning for job footprint (ID=%d): %v", job.ID, err)
|
||||
return nil, fmt.Errorf("failed to fetch footprint for job %d: %w", job.ID, err)
|
||||
}
|
||||
|
||||
if len(job.RawFootprint) == 0 {
|
||||
@@ -254,8 +276,8 @@ func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, err
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(job.RawFootprint, &job.Footprint); err != nil {
|
||||
cclog.Warn("Error while unmarshaling raw footprint json")
|
||||
return nil, err
|
||||
cclog.Warnf("Error while unmarshaling raw footprint json (ID=%d): %v", job.ID, err)
|
||||
return nil, fmt.Errorf("failed to unmarshal footprint for job %d: %w", job.ID, err)
|
||||
}
|
||||
|
||||
cclog.Debugf("Timer FetchFootprint %s", time.Since(start))
|
||||
@@ -263,6 +285,10 @@ func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, err
|
||||
}
|
||||
|
||||
func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float64, error) {
|
||||
if job == nil {
|
||||
return nil, fmt.Errorf("job cannot be nil")
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
cachekey := fmt.Sprintf("energyFootprint:%d", job.ID)
|
||||
if cached := r.cache.Get(cachekey, nil); cached != nil {
|
||||
@@ -272,8 +298,8 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6
|
||||
|
||||
if err := sq.Select("job.energy_footprint").From("job").Where("job.id = ?", job.ID).
|
||||
RunWith(r.stmtCache).QueryRow().Scan(&job.RawEnergyFootprint); err != nil {
|
||||
cclog.Warn("Error while scanning for job energy_footprint")
|
||||
return nil, err
|
||||
cclog.Warnf("Error while scanning for job energy_footprint (ID=%d): %v", job.ID, err)
|
||||
return nil, fmt.Errorf("failed to fetch energy footprint for job %d: %w", job.ID, err)
|
||||
}
|
||||
|
||||
if len(job.RawEnergyFootprint) == 0 {
|
||||
@@ -281,8 +307,8 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(job.RawEnergyFootprint, &job.EnergyFootprint); err != nil {
|
||||
cclog.Warn("Error while unmarshaling raw energy footprint json")
|
||||
return nil, err
|
||||
cclog.Warnf("Error while unmarshaling raw energy footprint json (ID=%d): %v", job.ID, err)
|
||||
return nil, fmt.Errorf("failed to unmarshal energy footprint for job %d: %w", job.ID, err)
|
||||
}
|
||||
|
||||
r.cache.Put(cachekey, job.EnergyFootprint, len(job.EnergyFootprint), 24*time.Hour)
|
||||
@@ -363,6 +389,10 @@ func (r *JobRepository) DeleteJobByID(id int64) error {
|
||||
}
|
||||
|
||||
func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm string) (jobid string, username string, project string, jobname string) {
|
||||
if searchterm == "" {
|
||||
return "", "", "", ""
|
||||
}
|
||||
|
||||
if _, err := strconv.Atoi(searchterm); err == nil { // Return empty on successful conversion: parent method will redirect for integer jobId
|
||||
return searchterm, "", "", ""
|
||||
} else { // Has to have letters and logged-in user for other guesses
|
||||
@@ -394,6 +424,10 @@ var (
|
||||
)
|
||||
|
||||
func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, table string, selectColumn string, whereColumn string, isLike bool) (result string, err error) {
|
||||
if user == nil {
|
||||
return "", fmt.Errorf("user cannot be nil")
|
||||
}
|
||||
|
||||
compareStr := " = ?"
|
||||
query := searchterm
|
||||
if isLike {
|
||||
@@ -404,17 +438,11 @@ func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, ta
|
||||
theQuery := sq.Select(table+"."+selectColumn).Distinct().From(table).
|
||||
Where(table+"."+whereColumn+compareStr, query)
|
||||
|
||||
// theSql, args, theErr := theQuery.ToSql()
|
||||
// if theErr != nil {
|
||||
// cclog.Warn("Error while converting query to sql")
|
||||
// return "", err
|
||||
// }
|
||||
// cclog.Debugf("SQL query (FindColumnValue): `%s`, args: %#v", theSql, args)
|
||||
|
||||
err := theQuery.RunWith(r.stmtCache).QueryRow().Scan(&result)
|
||||
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return "", err
|
||||
cclog.Warnf("Error while querying FindColumnValue (table=%s, column=%s): %v", table, selectColumn, err)
|
||||
return "", fmt.Errorf("failed to find column value: %w", err)
|
||||
} else if err == nil {
|
||||
return result, nil
|
||||
}
|
||||
@@ -426,21 +454,26 @@ func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, ta
|
||||
}
|
||||
|
||||
func (r *JobRepository) FindColumnValues(user *schema.User, query string, table string, selectColumn string, whereColumn string) (results []string, err error) {
|
||||
if user == nil {
|
||||
return nil, fmt.Errorf("user cannot be nil")
|
||||
}
|
||||
|
||||
emptyResult := make([]string, 0)
|
||||
if user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport, schema.RoleManager}) {
|
||||
rows, err := sq.Select(table+"."+selectColumn).Distinct().From(table).
|
||||
Where(table+"."+whereColumn+" LIKE ?", fmt.Sprint("%", query, "%")).
|
||||
RunWith(r.stmtCache).Query()
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return emptyResult, err
|
||||
cclog.Errorf("Error while querying FindColumnValues (table=%s, column=%s): %v", table, selectColumn, err)
|
||||
return emptyResult, fmt.Errorf("failed to find column values: %w", err)
|
||||
} else if err == nil {
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var result string
|
||||
err := rows.Scan(&result)
|
||||
if err != nil {
|
||||
rows.Close()
|
||||
cclog.Warnf("Error while scanning rows: %v", err)
|
||||
return emptyResult, err
|
||||
cclog.Warnf("Error while scanning rows in FindColumnValues: %v", err)
|
||||
return emptyResult, fmt.Errorf("failed to scan column value: %w", err)
|
||||
}
|
||||
results = append(results, result)
|
||||
}
|
||||
@@ -482,8 +515,8 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
|
||||
Where("job.cluster = ?", cluster).
|
||||
RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
cclog.Error("Error while running query")
|
||||
return nil, err
|
||||
cclog.Errorf("Error while running AllocatedNodes query for cluster=%s: %v", cluster, err)
|
||||
return nil, fmt.Errorf("failed to query allocated nodes for cluster %s: %w", cluster, err)
|
||||
}
|
||||
|
||||
var raw []byte
|
||||
@@ -493,12 +526,12 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
|
||||
var resources []*schema.Resource
|
||||
var subcluster string
|
||||
if err := rows.Scan(&raw, &subcluster); err != nil {
|
||||
cclog.Warn("Error while scanning rows")
|
||||
return nil, err
|
||||
cclog.Warnf("Error while scanning rows in AllocatedNodes: %v", err)
|
||||
return nil, fmt.Errorf("failed to scan allocated nodes row: %w", err)
|
||||
}
|
||||
if err := json.Unmarshal(raw, &resources); err != nil {
|
||||
cclog.Warn("Error while unmarshaling raw resources json")
|
||||
return nil, err
|
||||
cclog.Warnf("Error while unmarshaling raw resources json in AllocatedNodes: %v", err)
|
||||
return nil, fmt.Errorf("failed to unmarshal resources in AllocatedNodes: %w", err)
|
||||
}
|
||||
|
||||
hosts, ok := subclusters[subcluster]
|
||||
@@ -529,14 +562,14 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
|
||||
Where("(? - job.start_time) > (job.walltime + ?)", currentTime, seconds).
|
||||
RunWith(r.DB).Exec()
|
||||
if err != nil {
|
||||
cclog.Warn("Error while stopping jobs exceeding walltime")
|
||||
return err
|
||||
cclog.Warnf("Error while stopping jobs exceeding walltime: %v", err)
|
||||
return fmt.Errorf("failed to stop jobs exceeding walltime: %w", err)
|
||||
}
|
||||
|
||||
rowsAffected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
cclog.Warn("Error while fetching affected rows after stopping due to exceeded walltime")
|
||||
return err
|
||||
cclog.Warnf("Error while fetching affected rows after stopping due to exceeded walltime: %v", err)
|
||||
return fmt.Errorf("failed to get rows affected count: %w", err)
|
||||
}
|
||||
|
||||
if rowsAffected > 0 {
|
||||
@@ -552,18 +585,19 @@ func (r *JobRepository) FindJobIdsByTag(tagID int64) ([]int64, error) {
|
||||
Where(sq.Eq{"jobtag.tag_id": tagID}).Distinct()
|
||||
rows, err := query.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
cclog.Error("Error while running query")
|
||||
return nil, err
|
||||
cclog.Errorf("Error while running FindJobIdsByTag query for tagID=%d: %v", tagID, err)
|
||||
return nil, fmt.Errorf("failed to find job IDs by tag %d: %w", tagID, err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
jobIds := make([]int64, 0, 100)
|
||||
|
||||
for rows.Next() {
|
||||
var jobID int64
|
||||
|
||||
if err := rows.Scan(&jobID); err != nil {
|
||||
rows.Close()
|
||||
cclog.Warn("Error while scanning rows")
|
||||
return nil, err
|
||||
cclog.Warnf("Error while scanning rows in FindJobIdsByTag: %v", err)
|
||||
return nil, fmt.Errorf("failed to scan job ID in FindJobIdsByTag: %w", err)
|
||||
}
|
||||
|
||||
jobIds = append(jobIds, jobID)
|
||||
@@ -581,8 +615,8 @@ func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) {
|
||||
|
||||
rows, err := query.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
cclog.Error("Error while running query")
|
||||
return nil, err
|
||||
cclog.Errorf("Error while running FindRunningJobs query for cluster=%s: %v", cluster, err)
|
||||
return nil, fmt.Errorf("failed to find running jobs for cluster %s: %w", cluster, err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
@@ -590,8 +624,8 @@ func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) {
|
||||
for rows.Next() {
|
||||
job, err := scanJob(rows)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while scanning rows")
|
||||
return nil, err
|
||||
cclog.Warnf("Error while scanning rows in FindRunningJobs: %v", err)
|
||||
return nil, fmt.Errorf("failed to scan job in FindRunningJobs: %w", err)
|
||||
}
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
@@ -607,7 +641,8 @@ func (r *JobRepository) UpdateDuration() error {
|
||||
|
||||
_, err := stmnt.RunWith(r.stmtCache).Exec()
|
||||
if err != nil {
|
||||
return err
|
||||
cclog.Errorf("Error while updating duration for running jobs: %v", err)
|
||||
return fmt.Errorf("failed to update duration for running jobs: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -634,8 +669,8 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
|
||||
|
||||
rows, err := query.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
cclog.Error("Error while running query")
|
||||
return nil, err
|
||||
cclog.Errorf("Error while running FindJobsBetween query: %v", err)
|
||||
return nil, fmt.Errorf("failed to find jobs between %d and %d: %w", startTimeBegin, startTimeEnd, err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
@@ -643,8 +678,8 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64
|
||||
for rows.Next() {
|
||||
job, err := scanJob(rows)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while scanning rows")
|
||||
return nil, err
|
||||
cclog.Warnf("Error while scanning rows in FindJobsBetween: %v", err)
|
||||
return nil, fmt.Errorf("failed to scan job in FindJobsBetween: %w", err)
|
||||
}
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
@@ -662,13 +697,17 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
|
||||
Set("monitoring_status", monitoringStatus).
|
||||
Where("job.id = ?", job)
|
||||
|
||||
_, err = stmt.RunWith(r.stmtCache).Exec()
|
||||
return err
|
||||
if _, err = stmt.RunWith(r.stmtCache).Exec(); err != nil {
|
||||
cclog.Errorf("Error while updating monitoring status for job %d: %v", job, err)
|
||||
return fmt.Errorf("failed to update monitoring status for job %d: %w", job, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *JobRepository) Execute(stmt sq.UpdateBuilder) error {
|
||||
if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil {
|
||||
return err
|
||||
cclog.Errorf("Error while executing statement: %v", err)
|
||||
return fmt.Errorf("failed to execute update statement: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user