Also update job archive on tag deletion

This commit is contained in:
Jan Eitzinger 2025-06-27 11:20:22 +02:00
parent e2e67e3977
commit 484c52d813
2 changed files with 59 additions and 20 deletions

View File

@ -476,6 +476,32 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
return nil return nil
} }
func (r *JobRepository) FindJobIdsByTag(tagId int64) ([]int64, error) {
query := sq.Select("job.id").From("job").
Join("jobtag ON jobtag.job_id = job.id").
Where(sq.Eq{"jobtag.tag_id": tagId}).Distinct()
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
jobIds := make([]int64, 0, 100)
for rows.Next() {
var jobId int64
if err := rows.Scan(&jobId); err != nil {
rows.Close()
log.Warn("Error while scanning rows")
return nil, err
}
jobIds = append(jobIds, jobId)
}
return jobIds, nil
}
// FIXME: Reconsider filtering short jobs with harcoded threshold // FIXME: Reconsider filtering short jobs with harcoded threshold
func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) { func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) {
query := sq.Select(jobColumns...).From("job"). query := sq.Select(jobColumns...).From("job").

View File

@ -75,7 +75,8 @@ func (r *JobRepository) AddTagDirect(job int64, tag int64) ([]*schema.Tag, error
return tags, archive.UpdateTags(j, archiveTags) return tags, archive.UpdateTags(j, archiveTags)
} }
// Removes a tag from a job by tag id // Removes a tag from a job by tag id.
// Used by GraphQL API
func (r *JobRepository) RemoveTag(user *schema.User, job, tag int64) ([]*schema.Tag, error) { func (r *JobRepository) RemoveTag(user *schema.User, job, tag int64) ([]*schema.Tag, error) {
j, err := r.FindByIdWithUser(user, job) j, err := r.FindByIdWithUser(user, job)
if err != nil { if err != nil {
@ -107,6 +108,7 @@ func (r *JobRepository) RemoveTag(user *schema.User, job, tag int64) ([]*schema.
} }
// Removes a tag from a job by tag info // Removes a tag from a job by tag info
// Used by REST API
func (r *JobRepository) RemoveJobTagByRequest(user *schema.User, job int64, tagType string, tagName string, tagScope string) ([]*schema.Tag, error) { func (r *JobRepository) RemoveJobTagByRequest(user *schema.User, job int64, tagType string, tagName string, tagScope string) ([]*schema.Tag, error) {
// Get Tag ID to delete // Get Tag ID to delete
tagID, exists := r.TagId(tagType, tagName, tagScope) tagID, exists := r.TagId(tagType, tagName, tagScope)
@ -146,7 +148,26 @@ func (r *JobRepository) RemoveJobTagByRequest(user *schema.User, job int64, tagT
return tags, archive.UpdateTags(j, archiveTags) return tags, archive.UpdateTags(j, archiveTags)
} }
func (r *JobRepository) removeTagFromArchiveJobs(jobIds []int64) {
for _, j := range jobIds {
tags, err := r.getArchiveTags(&j)
if err != nil {
log.Warn("Error while getting tags for job")
continue
}
job, err := r.FindByIdDirect(j)
if err != nil {
log.Warn("Error while getting job")
continue
}
archive.UpdateTags(job, tags)
}
}
// Removes a tag from db by tag info // Removes a tag from db by tag info
// Used by REST API. Does not update tagged jobs in Job archive.
func (r *JobRepository) RemoveTagByRequest(tagType string, tagName string, tagScope string) error { func (r *JobRepository) RemoveTagByRequest(tagType string, tagName string, tagScope string) error {
// Get Tag ID to delete // Get Tag ID to delete
tagID, exists := r.TagId(tagType, tagName, tagScope) tagID, exists := r.TagId(tagType, tagName, tagScope)
@ -155,29 +176,17 @@ func (r *JobRepository) RemoveTagByRequest(tagType string, tagName string, tagSc
return fmt.Errorf("tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope) return fmt.Errorf("tag does not exist (name, type, scope): %s, %s, %s", tagName, tagType, tagScope)
} }
// Handle Delete JobTagTable return r.RemoveTagById(tagID)
qJobTag := sq.Delete("jobtag").Where("jobtag.tag_id = ?", tagID)
if _, err := qJobTag.RunWith(r.stmtCache).Exec(); err != nil {
s, _, _ := qJobTag.ToSql()
log.Errorf("Error removing tag from table 'jobTag' with %s: %v", s, err)
return err
}
// Handle Delete TagTable
qTag := sq.Delete("tag").Where("tag.id = ?", tagID)
if _, err := qTag.RunWith(r.stmtCache).Exec(); err != nil {
s, _, _ := qTag.ToSql()
log.Errorf("Error removing tag from table 'tag' with %s: %v", s, err)
return err
}
return nil
} }
// Removes a tag from db by tag id // Removes a tag from db by tag id
// Used by GraphQL API.
func (r *JobRepository) RemoveTagById(tagID int64) error { func (r *JobRepository) RemoveTagById(tagID int64) error {
jobIds, err := r.FindJobIdsByTag(tagID)
if err != nil {
return err
}
// Handle Delete JobTagTable // Handle Delete JobTagTable
qJobTag := sq.Delete("jobtag").Where("jobtag.tag_id = ?", tagID) qJobTag := sq.Delete("jobtag").Where("jobtag.tag_id = ?", tagID)
@ -196,6 +205,9 @@ func (r *JobRepository) RemoveTagById(tagID int64) error {
return err return err
} }
// asynchronously update archive jobs
go r.removeTagFromArchiveJobs(jobIds)
return nil return nil
} }
@ -321,6 +333,7 @@ func (r *JobRepository) AddTagOrCreate(user *schema.User, jobId int64, tagType s
return tagId, nil return tagId, nil
} }
// used in auto tagger plugins
func (r *JobRepository) AddTagOrCreateDirect(jobId int64, tagType string, tagName string) (tagId int64, err error) { func (r *JobRepository) AddTagOrCreateDirect(jobId int64, tagType string, tagName string) (tagId int64, err error) {
tagScope := "global" tagScope := "global"