Print job db id instead of its address

This commit is contained in:
2026-02-06 14:06:56 +01:00
parent f671d8df90
commit fa7727c6ca
8 changed files with 41 additions and 41 deletions

View File

@@ -691,7 +691,7 @@ func (api *RestAPI) startJob(rw http.ResponseWriter, r *http.Request) {
for _, job := range jobs { for _, job := range jobs {
// Check if jobs are within the same day (prevent duplicates) // Check if jobs are within the same day (prevent duplicates)
if (req.StartTime - job.StartTime) < secondsPerDay { if (req.StartTime - job.StartTime) < secondsPerDay {
handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID), http.StatusUnprocessableEntity, rw) handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", *job.ID, job.JobID), http.StatusUnprocessableEntity, rw)
return return
} }
} }
@@ -860,7 +860,7 @@ func (api *RestAPI) deleteJobByRequest(rw http.ResponseWriter, r *http.Request)
rw.Header().Add("Content-Type", "application/json") rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK) rw.WriteHeader(http.StatusOK)
if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{ if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{
Message: fmt.Sprintf("Successfully deleted job %d", job.ID), Message: fmt.Sprintf("Successfully deleted job %d", *job.ID),
}); err != nil { }); err != nil {
cclog.Errorf("Failed to encode response: %v", err) cclog.Errorf("Failed to encode response: %v", err)
} }
@@ -926,17 +926,17 @@ func (api *RestAPI) deleteJobBefore(rw http.ResponseWriter, r *http.Request) {
func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobAPIRequest) { func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobAPIRequest) {
// Sanity checks // Sanity checks
if job.State != schema.JobStateRunning { if job.State != schema.JobStateRunning {
handleError(fmt.Errorf("jobId %d (id %d) on %s : job has already been stopped (state is: %s)", job.JobID, job.ID, job.Cluster, job.State), http.StatusUnprocessableEntity, rw) handleError(fmt.Errorf("jobId %d (id %d) on %s : job has already been stopped (state is: %s)", job.JobID, *job.ID, job.Cluster, job.State), http.StatusUnprocessableEntity, rw)
return return
} }
if job.StartTime > req.StopTime { if job.StartTime > req.StopTime {
handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime), http.StatusBadRequest, rw) handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, *job.ID, job.Cluster, req.StopTime, job.StartTime), http.StatusBadRequest, rw)
return return
} }
if req.State != "" && !req.State.Valid() { if req.State != "" && !req.State.Valid() {
handleError(fmt.Errorf("jobId %d (id %d) on %s : invalid requested job state: %#v", job.JobID, job.ID, job.Cluster, req.State), http.StatusBadRequest, rw) handleError(fmt.Errorf("jobId %d (id %d) on %s : invalid requested job state: %#v", job.JobID, *job.ID, job.Cluster, req.State), http.StatusBadRequest, rw)
return return
} else if req.State == "" { } else if req.State == "" {
req.State = schema.JobStateCompleted req.State = schema.JobStateCompleted
@@ -950,12 +950,12 @@ func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw) handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, *job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw)
return return
} }
} }
cclog.Infof("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) cclog.Infof("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", *job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State)
// Send a response (with status OK). This means that errors that happen from here on forward // Send a response (with status OK). This means that errors that happen from here on forward
// can *NOT* be communicated to the client. If reading from a MetricDataRepository or // can *NOT* be communicated to the client. If reading from a MetricDataRepository or

View File

@@ -126,7 +126,7 @@ func archivingWorker() {
// not using meta data, called to load JobMeta into Cache? // not using meta data, called to load JobMeta into Cache?
// will fail if job meta not in repository // will fail if job meta not in repository
if _, err := jobRepo.FetchMetadata(job); err != nil { if _, err := jobRepo.FetchMetadata(job); err != nil {
cclog.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error()) cclog.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", *job.ID, err.Error())
jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed) jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed)
archivePending.Done() archivePending.Done()
continue continue
@@ -136,7 +136,7 @@ func archivingWorker() {
// Use shutdown context to allow cancellation // Use shutdown context to allow cancellation
jobMeta, err := ArchiveJob(job, shutdownCtx) jobMeta, err := ArchiveJob(job, shutdownCtx)
if err != nil { if err != nil {
cclog.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) cclog.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", *job.ID, err.Error())
jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed) jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed)
archivePending.Done() archivePending.Done()
continue continue
@@ -145,24 +145,24 @@ func archivingWorker() {
stmt := sq.Update("job").Where("job.id = ?", job.ID) stmt := sq.Update("job").Where("job.id = ?", job.ID)
if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil { if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil {
cclog.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) cclog.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", *job.ID, err.Error())
archivePending.Done() archivePending.Done()
continue continue
} }
if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil { if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil {
cclog.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) cclog.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", *job.ID, err.Error())
archivePending.Done() archivePending.Done()
continue continue
} }
// Update the jobs database entry one last time: // Update the jobs database entry one last time:
stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful) stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful)
if err := jobRepo.Execute(stmt); err != nil { if err := jobRepo.Execute(stmt); err != nil {
cclog.Errorf("archiving job (dbid: %d) failed at db execute: %s", job.ID, err.Error()) cclog.Errorf("archiving job (dbid: %d) failed at db execute: %s", *job.ID, err.Error())
archivePending.Done() archivePending.Done()
continue continue
} }
cclog.Debugf("archiving job %d took %s", job.JobID, time.Since(start)) cclog.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
cclog.Infof("archiving job (dbid: %d) successful", job.ID) cclog.Infof("archiving job (dbid: %d) successful", *job.ID)
repository.CallJobStopHooks(job) repository.CallJobStopHooks(job)
archivePending.Done() archivePending.Done()

View File

@@ -57,13 +57,13 @@ func (r *queryResolver) rooflineHeatmap(
jobdata, err := metricdispatch.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0) jobdata, err := metricdispatch.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0)
if err != nil { if err != nil {
cclog.Warnf("Error while loading roofline metrics for job %d", job.ID) cclog.Warnf("Error while loading roofline metrics for job %d", *job.ID)
return nil, err return nil, err
} }
flops_, membw_ := jobdata["flops_any"], jobdata["mem_bw"] flops_, membw_ := jobdata["flops_any"], jobdata["mem_bw"]
if flops_ == nil && membw_ == nil { if flops_ == nil && membw_ == nil {
cclog.Warnf("rooflineHeatmap(): 'flops_any' or 'mem_bw' missing for job %d", job.ID) cclog.Warnf("rooflineHeatmap(): 'flops_any' or 'mem_bw' missing for job %d", *job.ID)
continue continue
// return nil, fmt.Errorf("GRAPH/UTIL > 'flops_any' or 'mem_bw' missing for job %d", job.ID) // return nil, fmt.Errorf("GRAPH/UTIL > 'flops_any' or 'mem_bw' missing for job %d", job.ID)
} }

View File

@@ -216,7 +216,7 @@ func enrichJobMetadata(job *schema.Job) error {
metricEnergy = math.Round(rawEnergy*100.0) / 100.0 metricEnergy = math.Round(rawEnergy*100.0) / 100.0
} }
} else { } else {
cclog.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, job.ID) cclog.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, *job.ID)
} }
job.EnergyFootprint[fp] = metricEnergy job.EnergyFootprint[fp] = metricEnergy
@@ -225,7 +225,7 @@ func enrichJobMetadata(job *schema.Job) error {
job.Energy = (math.Round(totalEnergy*100.0) / 100.0) job.Energy = (math.Round(totalEnergy*100.0) / 100.0)
if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil { if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil {
cclog.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", job.ID) cclog.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", *job.ID)
return err return err
} }

View File

@@ -64,7 +64,7 @@ func cacheKey(
resolution int, resolution int,
) string { ) string {
return fmt.Sprintf("%d(%s):[%v],[%v]-%d", return fmt.Sprintf("%d(%s):[%v],[%v]-%d",
job.ID, job.State, metrics, scopes, resolution) *job.ID, job.State, metrics, scopes, resolution)
} }
// LoadData retrieves metric data for a job from the appropriate backend (memory store for running jobs, // LoadData retrieves metric data for a job from the appropriate backend (memory store for running jobs,

View File

@@ -229,7 +229,7 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
} }
start := time.Now() start := time.Now()
cachekey := fmt.Sprintf("metadata:%d", job.ID) cachekey := fmt.Sprintf("metadata:%d", *job.ID)
if cached := r.cache.Get(cachekey, nil); cached != nil { if cached := r.cache.Get(cachekey, nil); cached != nil {
job.MetaData = cached.(map[string]string) job.MetaData = cached.(map[string]string)
return job.MetaData, nil return job.MetaData, nil
@@ -237,8 +237,8 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
if err := sq.Select("job.meta_data").From("job").Where("job.id = ?", job.ID). if err := sq.Select("job.meta_data").From("job").Where("job.id = ?", job.ID).
RunWith(r.stmtCache).QueryRow().Scan(&job.RawMetaData); err != nil { RunWith(r.stmtCache).QueryRow().Scan(&job.RawMetaData); err != nil {
cclog.Warnf("Error while scanning for job metadata (ID=%d): %v", job.ID, err) cclog.Warnf("Error while scanning for job metadata (ID=%d): %v", *job.ID, err)
return nil, fmt.Errorf("failed to fetch metadata for job %d: %w", job.ID, err) return nil, fmt.Errorf("failed to fetch metadata for job %d: %w", *job.ID, err)
} }
if len(job.RawMetaData) == 0 { if len(job.RawMetaData) == 0 {
@@ -246,8 +246,8 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
} }
if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil { if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil {
cclog.Warnf("Error while unmarshaling raw metadata json (ID=%d): %v", job.ID, err) cclog.Warnf("Error while unmarshaling raw metadata json (ID=%d): %v", *job.ID, err)
return nil, fmt.Errorf("failed to unmarshal metadata for job %d: %w", job.ID, err) return nil, fmt.Errorf("failed to unmarshal metadata for job %d: %w", *job.ID, err)
} }
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour) r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
@@ -270,12 +270,12 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
return fmt.Errorf("job cannot be nil") return fmt.Errorf("job cannot be nil")
} }
cachekey := fmt.Sprintf("metadata:%d", job.ID) cachekey := fmt.Sprintf("metadata:%d", *job.ID)
r.cache.Del(cachekey) r.cache.Del(cachekey)
if job.MetaData == nil { if job.MetaData == nil {
if _, err = r.FetchMetadata(job); err != nil { if _, err = r.FetchMetadata(job); err != nil {
cclog.Warnf("Error while fetching metadata for job, DB ID '%v'", job.ID) cclog.Warnf("Error while fetching metadata for job, DB ID '%v'", *job.ID)
return fmt.Errorf("failed to fetch metadata for job %d: %w", job.ID, err) return fmt.Errorf("failed to fetch metadata for job %d: %w", *job.ID, err)
} }
} }
@@ -289,16 +289,16 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er
} }
if job.RawMetaData, err = json.Marshal(job.MetaData); err != nil { if job.RawMetaData, err = json.Marshal(job.MetaData); err != nil {
cclog.Warnf("Error while marshaling metadata for job, DB ID '%v'", job.ID) cclog.Warnf("Error while marshaling metadata for job, DB ID '%v'", *job.ID)
return fmt.Errorf("failed to marshal metadata for job %d: %w", job.ID, err) return fmt.Errorf("failed to marshal metadata for job %d: %w", *job.ID, err)
} }
if _, err = sq.Update("job"). if _, err = sq.Update("job").
Set("meta_data", job.RawMetaData). Set("meta_data", job.RawMetaData).
Where("job.id = ?", job.ID). Where("job.id = ?", job.ID).
RunWith(r.stmtCache).Exec(); err != nil { RunWith(r.stmtCache).Exec(); err != nil {
cclog.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID) cclog.Warnf("Error while updating metadata for job, DB ID '%v'", *job.ID)
return fmt.Errorf("failed to update metadata in database for job %d: %w", job.ID, err) return fmt.Errorf("failed to update metadata in database for job %d: %w", *job.ID, err)
} }
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour) r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
@@ -324,8 +324,8 @@ func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, err
if err := sq.Select("job.footprint").From("job").Where("job.id = ?", job.ID). if err := sq.Select("job.footprint").From("job").Where("job.id = ?", job.ID).
RunWith(r.stmtCache).QueryRow().Scan(&job.RawFootprint); err != nil { RunWith(r.stmtCache).QueryRow().Scan(&job.RawFootprint); err != nil {
cclog.Warnf("Error while scanning for job footprint (ID=%d): %v", job.ID, err) cclog.Warnf("Error while scanning for job footprint (ID=%d): %v", *job.ID, err)
return nil, fmt.Errorf("failed to fetch footprint for job %d: %w", job.ID, err) return nil, fmt.Errorf("failed to fetch footprint for job %d: %w", *job.ID, err)
} }
if len(job.RawFootprint) == 0 { if len(job.RawFootprint) == 0 {
@@ -333,8 +333,8 @@ func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, err
} }
if err := json.Unmarshal(job.RawFootprint, &job.Footprint); err != nil { if err := json.Unmarshal(job.RawFootprint, &job.Footprint); err != nil {
cclog.Warnf("Error while unmarshaling raw footprint json (ID=%d): %v", job.ID, err) cclog.Warnf("Error while unmarshaling raw footprint json (ID=%d): %v", *job.ID, err)
return nil, fmt.Errorf("failed to unmarshal footprint for job %d: %w", job.ID, err) return nil, fmt.Errorf("failed to unmarshal footprint for job %d: %w", *job.ID, err)
} }
cclog.Debugf("Timer FetchFootprint %s", time.Since(start)) cclog.Debugf("Timer FetchFootprint %s", time.Since(start))
@@ -357,7 +357,7 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6
} }
start := time.Now() start := time.Now()
cachekey := fmt.Sprintf("energyFootprint:%d", job.ID) cachekey := fmt.Sprintf("energyFootprint:%d", *job.ID)
if cached := r.cache.Get(cachekey, nil); cached != nil { if cached := r.cache.Get(cachekey, nil); cached != nil {
job.EnergyFootprint = cached.(map[string]float64) job.EnergyFootprint = cached.(map[string]float64)
return job.EnergyFootprint, nil return job.EnergyFootprint, nil
@@ -365,8 +365,8 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6
if err := sq.Select("job.energy_footprint").From("job").Where("job.id = ?", job.ID). if err := sq.Select("job.energy_footprint").From("job").Where("job.id = ?", job.ID).
RunWith(r.stmtCache).QueryRow().Scan(&job.RawEnergyFootprint); err != nil { RunWith(r.stmtCache).QueryRow().Scan(&job.RawEnergyFootprint); err != nil {
cclog.Warnf("Error while scanning for job energy_footprint (ID=%d): %v", job.ID, err) cclog.Warnf("Error while scanning for job energy_footprint (ID=%d): %v", *job.ID, err)
return nil, fmt.Errorf("failed to fetch energy footprint for job %d: %w", job.ID, err) return nil, fmt.Errorf("failed to fetch energy footprint for job %d: %w", *job.ID, err)
} }
if len(job.RawEnergyFootprint) == 0 { if len(job.RawEnergyFootprint) == 0 {
@@ -374,8 +374,8 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6
} }
if err := json.Unmarshal(job.RawEnergyFootprint, &job.EnergyFootprint); err != nil { if err := json.Unmarshal(job.RawEnergyFootprint, &job.EnergyFootprint); err != nil {
cclog.Warnf("Error while unmarshaling raw energy footprint json (ID=%d): %v", job.ID, err) cclog.Warnf("Error while unmarshaling raw energy footprint json (ID=%d): %v", *job.ID, err)
return nil, fmt.Errorf("failed to unmarshal energy footprint for job %d: %w", job.ID, err) return nil, fmt.Errorf("failed to unmarshal energy footprint for job %d: %w", *job.ID, err)
} }
r.cache.Put(cachekey, job.EnergyFootprint, len(job.EnergyFootprint), 24*time.Hour) r.cache.Put(cachekey, job.EnergyFootprint, len(job.EnergyFootprint), 24*time.Hour)

View File

@@ -107,7 +107,7 @@ func RunTaggers() error {
tagger.Match(job) tagger.Match(job)
} }
for _, tagger := range jobTagger.stopTaggers { for _, tagger := range jobTagger.stopTaggers {
cclog.Infof("Run stop tagger for job %d", job.ID) cclog.Infof("Run stop tagger for job %d", *job.ID)
tagger.Match(job) tagger.Match(job)
} }
} }

View File

@@ -113,7 +113,7 @@ func RegisterFootprintWorker() {
stmt := sq.Update("job") stmt := sq.Update("job")
stmt, err = jobRepo.UpdateFootprint(stmt, job) stmt, err = jobRepo.UpdateFootprint(stmt, job)
if err != nil { if err != nil {
cclog.Errorf("update job (dbid: %d) statement build failed at footprint step: %s", job.ID, err.Error()) cclog.Errorf("update job (dbid: %d) statement build failed at footprint step: %s", *job.ID, err.Error())
ce++ ce++
continue continue
} }