From 19402d30af2c984e335099933206ecb7c8b6e8bc Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 14 Jan 2026 10:09:19 +0100 Subject: [PATCH] Review and improve error messages and doc comments --- internal/repository/job.go | 149 +++++++++++++++++++++++-------------- 1 file changed, 94 insertions(+), 55 deletions(-) diff --git a/internal/repository/job.go b/internal/repository/job.go index 78e1f3fe..293c28d4 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -156,27 +156,41 @@ func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) { return job, nil } +// Optimize performs database optimization by running VACUUM command. +// This reclaims unused space and defragments the database file. +// Should be run periodically during maintenance windows. func (r *JobRepository) Optimize() error { if _, err := r.DB.Exec(`VACUUM`); err != nil { - return err + cclog.Errorf("Error while executing VACUUM: %v", err) + return fmt.Errorf("failed to optimize database: %w", err) } return nil } +// Flush removes all data from job-related tables (jobtag, tag, job). +// WARNING: This is a destructive operation that deletes all job data. +// Use with extreme caution, typically only for testing or complete resets. func (r *JobRepository) Flush() error { if _, err := r.DB.Exec(`DELETE FROM jobtag`); err != nil { - return err + cclog.Errorf("Error while deleting from jobtag table: %v", err) + return fmt.Errorf("failed to flush jobtag table: %w", err) } if _, err := r.DB.Exec(`DELETE FROM tag`); err != nil { - return err + cclog.Errorf("Error while deleting from tag table: %v", err) + return fmt.Errorf("failed to flush tag table: %w", err) } if _, err := r.DB.Exec(`DELETE FROM job`); err != nil { - return err + cclog.Errorf("Error while deleting from job table: %v", err) + return fmt.Errorf("failed to flush job table: %w", err) } return nil } func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error) { + if job == nil { + return nil, fmt.Errorf("job cannot be nil") + } + start := time.Now() cachekey := fmt.Sprintf("metadata:%d", job.ID) if cached := r.cache.Get(cachekey, nil); cached != nil { @@ -186,8 +200,8 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error if err := sq.Select("job.meta_data").From("job").Where("job.id = ?", job.ID). RunWith(r.stmtCache).QueryRow().Scan(&job.RawMetaData); err != nil { - cclog.Warn("Error while scanning for job metadata") - return nil, err + cclog.Warnf("Error while scanning for job metadata (ID=%d): %v", job.ID, err) + return nil, fmt.Errorf("failed to fetch metadata for job %d: %w", job.ID, err) } if len(job.RawMetaData) == 0 { @@ -195,8 +209,8 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error } if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil { - cclog.Warn("Error while unmarshaling raw metadata json") - return nil, err + cclog.Warnf("Error while unmarshaling raw metadata json (ID=%d): %v", job.ID, err) + return nil, fmt.Errorf("failed to unmarshal metadata for job %d: %w", job.ID, err) } r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour) @@ -205,6 +219,10 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error } func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err error) { + if job == nil { + return fmt.Errorf("job cannot be nil") + } + cachekey := fmt.Sprintf("metadata:%d", job.ID) r.cache.Del(cachekey) if job.MetaData == nil { @@ -241,12 +259,16 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er } func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, error) { + if job == nil { + return nil, fmt.Errorf("job cannot be nil") + } + start := time.Now() if err := sq.Select("job.footprint").From("job").Where("job.id = ?", job.ID). RunWith(r.stmtCache).QueryRow().Scan(&job.RawFootprint); err != nil { - cclog.Warn("Error while scanning for job footprint") - return nil, err + cclog.Warnf("Error while scanning for job footprint (ID=%d): %v", job.ID, err) + return nil, fmt.Errorf("failed to fetch footprint for job %d: %w", job.ID, err) } if len(job.RawFootprint) == 0 { @@ -254,8 +276,8 @@ func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, err } if err := json.Unmarshal(job.RawFootprint, &job.Footprint); err != nil { - cclog.Warn("Error while unmarshaling raw footprint json") - return nil, err + cclog.Warnf("Error while unmarshaling raw footprint json (ID=%d): %v", job.ID, err) + return nil, fmt.Errorf("failed to unmarshal footprint for job %d: %w", job.ID, err) } cclog.Debugf("Timer FetchFootprint %s", time.Since(start)) @@ -263,6 +285,10 @@ func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, err } func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float64, error) { + if job == nil { + return nil, fmt.Errorf("job cannot be nil") + } + start := time.Now() cachekey := fmt.Sprintf("energyFootprint:%d", job.ID) if cached := r.cache.Get(cachekey, nil); cached != nil { @@ -272,8 +298,8 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6 if err := sq.Select("job.energy_footprint").From("job").Where("job.id = ?", job.ID). RunWith(r.stmtCache).QueryRow().Scan(&job.RawEnergyFootprint); err != nil { - cclog.Warn("Error while scanning for job energy_footprint") - return nil, err + cclog.Warnf("Error while scanning for job energy_footprint (ID=%d): %v", job.ID, err) + return nil, fmt.Errorf("failed to fetch energy footprint for job %d: %w", job.ID, err) } if len(job.RawEnergyFootprint) == 0 { @@ -281,8 +307,8 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6 } if err := json.Unmarshal(job.RawEnergyFootprint, &job.EnergyFootprint); err != nil { - cclog.Warn("Error while unmarshaling raw energy footprint json") - return nil, err + cclog.Warnf("Error while unmarshaling raw energy footprint json (ID=%d): %v", job.ID, err) + return nil, fmt.Errorf("failed to unmarshal energy footprint for job %d: %w", job.ID, err) } r.cache.Put(cachekey, job.EnergyFootprint, len(job.EnergyFootprint), 24*time.Hour) @@ -363,6 +389,10 @@ func (r *JobRepository) DeleteJobByID(id int64) error { } func (r *JobRepository) FindUserOrProjectOrJobname(user *schema.User, searchterm string) (jobid string, username string, project string, jobname string) { + if searchterm == "" { + return "", "", "", "" + } + if _, err := strconv.Atoi(searchterm); err == nil { // Return empty on successful conversion: parent method will redirect for integer jobId return searchterm, "", "", "" } else { // Has to have letters and logged-in user for other guesses @@ -394,6 +424,10 @@ var ( ) func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, table string, selectColumn string, whereColumn string, isLike bool) (result string, err error) { + if user == nil { + return "", fmt.Errorf("user cannot be nil") + } + compareStr := " = ?" query := searchterm if isLike { @@ -404,17 +438,11 @@ func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, ta theQuery := sq.Select(table+"."+selectColumn).Distinct().From(table). Where(table+"."+whereColumn+compareStr, query) - // theSql, args, theErr := theQuery.ToSql() - // if theErr != nil { - // cclog.Warn("Error while converting query to sql") - // return "", err - // } - // cclog.Debugf("SQL query (FindColumnValue): `%s`, args: %#v", theSql, args) - err := theQuery.RunWith(r.stmtCache).QueryRow().Scan(&result) if err != nil && err != sql.ErrNoRows { - return "", err + cclog.Warnf("Error while querying FindColumnValue (table=%s, column=%s): %v", table, selectColumn, err) + return "", fmt.Errorf("failed to find column value: %w", err) } else if err == nil { return result, nil } @@ -426,21 +454,26 @@ func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, ta } func (r *JobRepository) FindColumnValues(user *schema.User, query string, table string, selectColumn string, whereColumn string) (results []string, err error) { + if user == nil { + return nil, fmt.Errorf("user cannot be nil") + } + emptyResult := make([]string, 0) if user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport, schema.RoleManager}) { rows, err := sq.Select(table+"."+selectColumn).Distinct().From(table). Where(table+"."+whereColumn+" LIKE ?", fmt.Sprint("%", query, "%")). RunWith(r.stmtCache).Query() if err != nil && err != sql.ErrNoRows { - return emptyResult, err + cclog.Errorf("Error while querying FindColumnValues (table=%s, column=%s): %v", table, selectColumn, err) + return emptyResult, fmt.Errorf("failed to find column values: %w", err) } else if err == nil { + defer rows.Close() for rows.Next() { var result string err := rows.Scan(&result) if err != nil { - rows.Close() - cclog.Warnf("Error while scanning rows: %v", err) - return emptyResult, err + cclog.Warnf("Error while scanning rows in FindColumnValues: %v", err) + return emptyResult, fmt.Errorf("failed to scan column value: %w", err) } results = append(results, result) } @@ -482,8 +515,8 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in Where("job.cluster = ?", cluster). RunWith(r.stmtCache).Query() if err != nil { - cclog.Error("Error while running query") - return nil, err + cclog.Errorf("Error while running AllocatedNodes query for cluster=%s: %v", cluster, err) + return nil, fmt.Errorf("failed to query allocated nodes for cluster %s: %w", cluster, err) } var raw []byte @@ -493,12 +526,12 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in var resources []*schema.Resource var subcluster string if err := rows.Scan(&raw, &subcluster); err != nil { - cclog.Warn("Error while scanning rows") - return nil, err + cclog.Warnf("Error while scanning rows in AllocatedNodes: %v", err) + return nil, fmt.Errorf("failed to scan allocated nodes row: %w", err) } if err := json.Unmarshal(raw, &resources); err != nil { - cclog.Warn("Error while unmarshaling raw resources json") - return nil, err + cclog.Warnf("Error while unmarshaling raw resources json in AllocatedNodes: %v", err) + return nil, fmt.Errorf("failed to unmarshal resources in AllocatedNodes: %w", err) } hosts, ok := subclusters[subcluster] @@ -529,14 +562,14 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { Where("(? - job.start_time) > (job.walltime + ?)", currentTime, seconds). RunWith(r.DB).Exec() if err != nil { - cclog.Warn("Error while stopping jobs exceeding walltime") - return err + cclog.Warnf("Error while stopping jobs exceeding walltime: %v", err) + return fmt.Errorf("failed to stop jobs exceeding walltime: %w", err) } rowsAffected, err := res.RowsAffected() if err != nil { - cclog.Warn("Error while fetching affected rows after stopping due to exceeded walltime") - return err + cclog.Warnf("Error while fetching affected rows after stopping due to exceeded walltime: %v", err) + return fmt.Errorf("failed to get rows affected count: %w", err) } if rowsAffected > 0 { @@ -552,18 +585,19 @@ func (r *JobRepository) FindJobIdsByTag(tagID int64) ([]int64, error) { Where(sq.Eq{"jobtag.tag_id": tagID}).Distinct() rows, err := query.RunWith(r.stmtCache).Query() if err != nil { - cclog.Error("Error while running query") - return nil, err + cclog.Errorf("Error while running FindJobIdsByTag query for tagID=%d: %v", tagID, err) + return nil, fmt.Errorf("failed to find job IDs by tag %d: %w", tagID, err) } + defer rows.Close() + jobIds := make([]int64, 0, 100) for rows.Next() { var jobID int64 if err := rows.Scan(&jobID); err != nil { - rows.Close() - cclog.Warn("Error while scanning rows") - return nil, err + cclog.Warnf("Error while scanning rows in FindJobIdsByTag: %v", err) + return nil, fmt.Errorf("failed to scan job ID in FindJobIdsByTag: %w", err) } jobIds = append(jobIds, jobID) @@ -581,8 +615,8 @@ func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) { rows, err := query.RunWith(r.stmtCache).Query() if err != nil { - cclog.Error("Error while running query") - return nil, err + cclog.Errorf("Error while running FindRunningJobs query for cluster=%s: %v", cluster, err) + return nil, fmt.Errorf("failed to find running jobs for cluster %s: %w", cluster, err) } defer rows.Close() @@ -590,8 +624,8 @@ func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) { for rows.Next() { job, err := scanJob(rows) if err != nil { - cclog.Warn("Error while scanning rows") - return nil, err + cclog.Warnf("Error while scanning rows in FindRunningJobs: %v", err) + return nil, fmt.Errorf("failed to scan job in FindRunningJobs: %w", err) } jobs = append(jobs, job) } @@ -607,7 +641,8 @@ func (r *JobRepository) UpdateDuration() error { _, err := stmnt.RunWith(r.stmtCache).Exec() if err != nil { - return err + cclog.Errorf("Error while updating duration for running jobs: %v", err) + return fmt.Errorf("failed to update duration for running jobs: %w", err) } return nil @@ -634,8 +669,8 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64 rows, err := query.RunWith(r.stmtCache).Query() if err != nil { - cclog.Error("Error while running query") - return nil, err + cclog.Errorf("Error while running FindJobsBetween query: %v", err) + return nil, fmt.Errorf("failed to find jobs between %d and %d: %w", startTimeBegin, startTimeEnd, err) } defer rows.Close() @@ -643,8 +678,8 @@ func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64 for rows.Next() { job, err := scanJob(rows) if err != nil { - cclog.Warn("Error while scanning rows") - return nil, err + cclog.Warnf("Error while scanning rows in FindJobsBetween: %v", err) + return nil, fmt.Errorf("failed to scan job in FindJobsBetween: %w", err) } jobs = append(jobs, job) } @@ -662,13 +697,17 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32 Set("monitoring_status", monitoringStatus). Where("job.id = ?", job) - _, err = stmt.RunWith(r.stmtCache).Exec() - return err + if _, err = stmt.RunWith(r.stmtCache).Exec(); err != nil { + cclog.Errorf("Error while updating monitoring status for job %d: %v", job, err) + return fmt.Errorf("failed to update monitoring status for job %d: %w", job, err) + } + return nil } func (r *JobRepository) Execute(stmt sq.UpdateBuilder) error { if _, err := stmt.RunWith(r.stmtCache).Exec(); err != nil { - return err + cclog.Errorf("Error while executing statement: %v", err) + return fmt.Errorf("failed to execute update statement: %w", err) } return nil