From fa7727c6ca838fbd3f7ae4da96c0adab4c927623 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 6 Feb 2026 14:06:56 +0100 Subject: [PATCH] Print job db id instead of its address --- internal/api/job.go | 14 +++---- internal/archiver/archiveWorker.go | 12 +++--- internal/graph/util.go | 4 +- internal/importer/initDB.go | 4 +- internal/metricdispatch/dataLoader.go | 2 +- internal/repository/job.go | 42 +++++++++---------- internal/tagger/tagger.go | 2 +- .../taskmanager/updateFootprintService.go | 2 +- 8 files changed, 41 insertions(+), 41 deletions(-) diff --git a/internal/api/job.go b/internal/api/job.go index 64f6a92c..c3d1fbbf 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -691,7 +691,7 @@ func (api *RestAPI) startJob(rw http.ResponseWriter, r *http.Request) { for _, job := range jobs { // Check if jobs are within the same day (prevent duplicates) if (req.StartTime - job.StartTime) < secondsPerDay { - handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID), http.StatusUnprocessableEntity, rw) + handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", *job.ID, job.JobID), http.StatusUnprocessableEntity, rw) return } } @@ -860,7 +860,7 @@ func (api *RestAPI) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) rw.Header().Add("Content-Type", "application/json") rw.WriteHeader(http.StatusOK) if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{ - Message: fmt.Sprintf("Successfully deleted job %d", job.ID), + Message: fmt.Sprintf("Successfully deleted job %d", *job.ID), }); err != nil { cclog.Errorf("Failed to encode response: %v", err) } @@ -926,17 +926,17 @@ func (api *RestAPI) deleteJobBefore(rw http.ResponseWriter, r *http.Request) { func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobAPIRequest) { // Sanity checks if job.State != schema.JobStateRunning { - handleError(fmt.Errorf("jobId %d (id %d) on %s : job has already been stopped (state is: %s)", job.JobID, job.ID, job.Cluster, job.State), http.StatusUnprocessableEntity, rw) + handleError(fmt.Errorf("jobId %d (id %d) on %s : job has already been stopped (state is: %s)", job.JobID, *job.ID, job.Cluster, job.State), http.StatusUnprocessableEntity, rw) return } if job.StartTime > req.StopTime { - handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime), http.StatusBadRequest, rw) + handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, *job.ID, job.Cluster, req.StopTime, job.StartTime), http.StatusBadRequest, rw) return } if req.State != "" && !req.State.Valid() { - handleError(fmt.Errorf("jobId %d (id %d) on %s : invalid requested job state: %#v", job.JobID, job.ID, job.Cluster, req.State), http.StatusBadRequest, rw) + handleError(fmt.Errorf("jobId %d (id %d) on %s : invalid requested job state: %#v", job.JobID, *job.ID, job.Cluster, req.State), http.StatusBadRequest, rw) return } else if req.State == "" { req.State = schema.JobStateCompleted @@ -950,12 +950,12 @@ func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { - handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw) + handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, *job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw) return } } - cclog.Infof("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) + cclog.Infof("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", *job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) // Send a response (with status OK). This means that errors that happen from here on forward // can *NOT* be communicated to the client. If reading from a MetricDataRepository or diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go index ecdd1756..0639757d 100644 --- a/internal/archiver/archiveWorker.go +++ b/internal/archiver/archiveWorker.go @@ -126,7 +126,7 @@ func archivingWorker() { // not using meta data, called to load JobMeta into Cache? // will fail if job meta not in repository if _, err := jobRepo.FetchMetadata(job); err != nil { - cclog.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error()) + cclog.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", *job.ID, err.Error()) jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed) archivePending.Done() continue @@ -136,7 +136,7 @@ func archivingWorker() { // Use shutdown context to allow cancellation jobMeta, err := ArchiveJob(job, shutdownCtx) if err != nil { - cclog.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) + cclog.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", *job.ID, err.Error()) jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed) archivePending.Done() continue @@ -145,24 +145,24 @@ func archivingWorker() { stmt := sq.Update("job").Where("job.id = ?", job.ID) if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil { - cclog.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) + cclog.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", *job.ID, err.Error()) archivePending.Done() continue } if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil { - cclog.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) + cclog.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", *job.ID, err.Error()) archivePending.Done() continue } // Update the jobs database entry one last time: stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful) if err := jobRepo.Execute(stmt); err != nil { - cclog.Errorf("archiving job (dbid: %d) failed at db execute: %s", job.ID, err.Error()) + cclog.Errorf("archiving job (dbid: %d) failed at db execute: %s", *job.ID, err.Error()) archivePending.Done() continue } cclog.Debugf("archiving job %d took %s", job.JobID, time.Since(start)) - cclog.Infof("archiving job (dbid: %d) successful", job.ID) + cclog.Infof("archiving job (dbid: %d) successful", *job.ID) repository.CallJobStopHooks(job) archivePending.Done() diff --git a/internal/graph/util.go b/internal/graph/util.go index dd5e388f..5458d0ff 100644 --- a/internal/graph/util.go +++ b/internal/graph/util.go @@ -57,13 +57,13 @@ func (r *queryResolver) rooflineHeatmap( jobdata, err := metricdispatch.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0) if err != nil { - cclog.Warnf("Error while loading roofline metrics for job %d", job.ID) + cclog.Warnf("Error while loading roofline metrics for job %d", *job.ID) return nil, err } flops_, membw_ := jobdata["flops_any"], jobdata["mem_bw"] if flops_ == nil && membw_ == nil { - cclog.Warnf("rooflineHeatmap(): 'flops_any' or 'mem_bw' missing for job %d", job.ID) + cclog.Warnf("rooflineHeatmap(): 'flops_any' or 'mem_bw' missing for job %d", *job.ID) continue // return nil, fmt.Errorf("GRAPH/UTIL > 'flops_any' or 'mem_bw' missing for job %d", job.ID) } diff --git a/internal/importer/initDB.go b/internal/importer/initDB.go index d88be7c7..87d92cd3 100644 --- a/internal/importer/initDB.go +++ b/internal/importer/initDB.go @@ -216,7 +216,7 @@ func enrichJobMetadata(job *schema.Job) error { metricEnergy = math.Round(rawEnergy*100.0) / 100.0 } } else { - cclog.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, job.ID) + cclog.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, *job.ID) } job.EnergyFootprint[fp] = metricEnergy @@ -225,7 +225,7 @@ func enrichJobMetadata(job *schema.Job) error { job.Energy = (math.Round(totalEnergy*100.0) / 100.0) if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil { - cclog.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", job.ID) + cclog.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", *job.ID) return err } diff --git a/internal/metricdispatch/dataLoader.go b/internal/metricdispatch/dataLoader.go index 09a8ac09..78808a74 100644 --- a/internal/metricdispatch/dataLoader.go +++ b/internal/metricdispatch/dataLoader.go @@ -64,7 +64,7 @@ func cacheKey( resolution int, ) string { return fmt.Sprintf("%d(%s):[%v],[%v]-%d", - job.ID, job.State, metrics, scopes, resolution) + *job.ID, job.State, metrics, scopes, resolution) } // LoadData retrieves metric data for a job from the appropriate backend (memory store for running jobs, diff --git a/internal/repository/job.go b/internal/repository/job.go index 434b8252..6b0b2b12 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -229,7 +229,7 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error } start := time.Now() - cachekey := fmt.Sprintf("metadata:%d", job.ID) + cachekey := fmt.Sprintf("metadata:%d", *job.ID) if cached := r.cache.Get(cachekey, nil); cached != nil { job.MetaData = cached.(map[string]string) return job.MetaData, nil @@ -237,8 +237,8 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error if err := sq.Select("job.meta_data").From("job").Where("job.id = ?", job.ID). RunWith(r.stmtCache).QueryRow().Scan(&job.RawMetaData); err != nil { - cclog.Warnf("Error while scanning for job metadata (ID=%d): %v", job.ID, err) - return nil, fmt.Errorf("failed to fetch metadata for job %d: %w", job.ID, err) + cclog.Warnf("Error while scanning for job metadata (ID=%d): %v", *job.ID, err) + return nil, fmt.Errorf("failed to fetch metadata for job %d: %w", *job.ID, err) } if len(job.RawMetaData) == 0 { @@ -246,8 +246,8 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error } if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil { - cclog.Warnf("Error while unmarshaling raw metadata json (ID=%d): %v", job.ID, err) - return nil, fmt.Errorf("failed to unmarshal metadata for job %d: %w", job.ID, err) + cclog.Warnf("Error while unmarshaling raw metadata json (ID=%d): %v", *job.ID, err) + return nil, fmt.Errorf("failed to unmarshal metadata for job %d: %w", *job.ID, err) } r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour) @@ -270,12 +270,12 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er return fmt.Errorf("job cannot be nil") } - cachekey := fmt.Sprintf("metadata:%d", job.ID) + cachekey := fmt.Sprintf("metadata:%d", *job.ID) r.cache.Del(cachekey) if job.MetaData == nil { if _, err = r.FetchMetadata(job); err != nil { - cclog.Warnf("Error while fetching metadata for job, DB ID '%v'", job.ID) - return fmt.Errorf("failed to fetch metadata for job %d: %w", job.ID, err) + cclog.Warnf("Error while fetching metadata for job, DB ID '%v'", *job.ID) + return fmt.Errorf("failed to fetch metadata for job %d: %w", *job.ID, err) } } @@ -289,16 +289,16 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er } if job.RawMetaData, err = json.Marshal(job.MetaData); err != nil { - cclog.Warnf("Error while marshaling metadata for job, DB ID '%v'", job.ID) - return fmt.Errorf("failed to marshal metadata for job %d: %w", job.ID, err) + cclog.Warnf("Error while marshaling metadata for job, DB ID '%v'", *job.ID) + return fmt.Errorf("failed to marshal metadata for job %d: %w", *job.ID, err) } if _, err = sq.Update("job"). Set("meta_data", job.RawMetaData). Where("job.id = ?", job.ID). RunWith(r.stmtCache).Exec(); err != nil { - cclog.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID) - return fmt.Errorf("failed to update metadata in database for job %d: %w", job.ID, err) + cclog.Warnf("Error while updating metadata for job, DB ID '%v'", *job.ID) + return fmt.Errorf("failed to update metadata in database for job %d: %w", *job.ID, err) } r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour) @@ -324,8 +324,8 @@ func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, err if err := sq.Select("job.footprint").From("job").Where("job.id = ?", job.ID). RunWith(r.stmtCache).QueryRow().Scan(&job.RawFootprint); err != nil { - cclog.Warnf("Error while scanning for job footprint (ID=%d): %v", job.ID, err) - return nil, fmt.Errorf("failed to fetch footprint for job %d: %w", job.ID, err) + cclog.Warnf("Error while scanning for job footprint (ID=%d): %v", *job.ID, err) + return nil, fmt.Errorf("failed to fetch footprint for job %d: %w", *job.ID, err) } if len(job.RawFootprint) == 0 { @@ -333,8 +333,8 @@ func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, err } if err := json.Unmarshal(job.RawFootprint, &job.Footprint); err != nil { - cclog.Warnf("Error while unmarshaling raw footprint json (ID=%d): %v", job.ID, err) - return nil, fmt.Errorf("failed to unmarshal footprint for job %d: %w", job.ID, err) + cclog.Warnf("Error while unmarshaling raw footprint json (ID=%d): %v", *job.ID, err) + return nil, fmt.Errorf("failed to unmarshal footprint for job %d: %w", *job.ID, err) } cclog.Debugf("Timer FetchFootprint %s", time.Since(start)) @@ -357,7 +357,7 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6 } start := time.Now() - cachekey := fmt.Sprintf("energyFootprint:%d", job.ID) + cachekey := fmt.Sprintf("energyFootprint:%d", *job.ID) if cached := r.cache.Get(cachekey, nil); cached != nil { job.EnergyFootprint = cached.(map[string]float64) return job.EnergyFootprint, nil @@ -365,8 +365,8 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6 if err := sq.Select("job.energy_footprint").From("job").Where("job.id = ?", job.ID). RunWith(r.stmtCache).QueryRow().Scan(&job.RawEnergyFootprint); err != nil { - cclog.Warnf("Error while scanning for job energy_footprint (ID=%d): %v", job.ID, err) - return nil, fmt.Errorf("failed to fetch energy footprint for job %d: %w", job.ID, err) + cclog.Warnf("Error while scanning for job energy_footprint (ID=%d): %v", *job.ID, err) + return nil, fmt.Errorf("failed to fetch energy footprint for job %d: %w", *job.ID, err) } if len(job.RawEnergyFootprint) == 0 { @@ -374,8 +374,8 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6 } if err := json.Unmarshal(job.RawEnergyFootprint, &job.EnergyFootprint); err != nil { - cclog.Warnf("Error while unmarshaling raw energy footprint json (ID=%d): %v", job.ID, err) - return nil, fmt.Errorf("failed to unmarshal energy footprint for job %d: %w", job.ID, err) + cclog.Warnf("Error while unmarshaling raw energy footprint json (ID=%d): %v", *job.ID, err) + return nil, fmt.Errorf("failed to unmarshal energy footprint for job %d: %w", *job.ID, err) } r.cache.Put(cachekey, job.EnergyFootprint, len(job.EnergyFootprint), 24*time.Hour) diff --git a/internal/tagger/tagger.go b/internal/tagger/tagger.go index 2a5a0a7d..067f16a9 100644 --- a/internal/tagger/tagger.go +++ b/internal/tagger/tagger.go @@ -107,7 +107,7 @@ func RunTaggers() error { tagger.Match(job) } for _, tagger := range jobTagger.stopTaggers { - cclog.Infof("Run stop tagger for job %d", job.ID) + cclog.Infof("Run stop tagger for job %d", *job.ID) tagger.Match(job) } } diff --git a/internal/taskmanager/updateFootprintService.go b/internal/taskmanager/updateFootprintService.go index 65f4c229..2524d837 100644 --- a/internal/taskmanager/updateFootprintService.go +++ b/internal/taskmanager/updateFootprintService.go @@ -113,7 +113,7 @@ func RegisterFootprintWorker() { stmt := sq.Update("job") stmt, err = jobRepo.UpdateFootprint(stmt, job) if err != nil { - cclog.Errorf("update job (dbid: %d) statement build failed at footprint step: %s", job.ID, err.Error()) + cclog.Errorf("update job (dbid: %d) statement build failed at footprint step: %s", *job.ID, err.Error()) ce++ continue }