From 56556393203ddd81c10306cd360994d3f9016e87 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 6 Feb 2026 11:10:08 +0100 Subject: [PATCH 1/5] add subCluster jobFilter for statusDetail queries --- api/schema.graphqls | 3 ++- go.sum | 2 -- internal/graph/generated/generated.go | 20 +++++++++++++------ internal/graph/model/models_gen.go | 3 ++- .../cc-metric-store-queries.go | 2 +- internal/repository/jobQuery.go | 3 +++ internal/repository/node.go | 18 ++++++++--------- internal/repository/stats.go | 4 ++-- .../taskmanager/updateFootprintService.go | 2 +- .../src/status/dashdetails/UsageDash.svelte | 2 +- 10 files changed, 35 insertions(+), 24 deletions(-) diff --git a/api/schema.graphqls b/api/schema.graphqls index 7be43f73..24071752 100644 --- a/api/schema.graphqls +++ b/api/schema.graphqls @@ -429,7 +429,7 @@ type TimeRangeOutput { input NodeFilter { hostname: StringInput cluster: StringInput - subcluster: StringInput + subCluster: StringInput schedulerState: SchedulerState healthState: MonitoringState timeStart: Int @@ -444,6 +444,7 @@ input JobFilter { project: StringInput jobName: StringInput cluster: StringInput + subCluster: StringInput partition: StringInput duration: IntRange energy: FloatRange diff --git a/go.sum b/go.sum index 898520b5..43331fce 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,6 @@ github.com/99designs/gqlgen v0.17.85 h1:EkGx3U2FDcxQm8YDLQSpXIAVmpDyZ3IcBMOJi2nH github.com/99designs/gqlgen v0.17.85/go.mod h1:yvs8s0bkQlRfqg03YXr3eR4OQUowVhODT/tHzCXnbOU= github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8= github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU= -github.com/ClusterCockpit/cc-lib/v2 v2.2.0 h1:gqMsh7zsJMUhaXviXzaZ3gqXcLVgerjRJHzIcwX4FmQ= -github.com/ClusterCockpit/cc-lib/v2 v2.2.0/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw= github.com/ClusterCockpit/cc-lib/v2 v2.2.1 h1:iCVas+Jc61zFH5S2VG3H1sc7tsn+U4lOJwUYjYZEims= github.com/ClusterCockpit/cc-lib/v2 v2.2.1/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index c218c0af..e1e5ea71 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -2712,7 +2712,7 @@ type TimeRangeOutput { input NodeFilter { hostname: StringInput cluster: StringInput - subcluster: StringInput + subCluster: StringInput schedulerState: SchedulerState healthState: MonitoringState timeStart: Int @@ -2727,6 +2727,7 @@ input JobFilter { project: StringInput jobName: StringInput cluster: StringInput + subCluster: StringInput partition: StringInput duration: IntRange energy: FloatRange @@ -13199,7 +13200,7 @@ func (ec *executionContext) unmarshalInputJobFilter(ctx context.Context, obj any asMap[k] = v } - fieldsInOrder := [...]string{"tags", "dbId", "jobId", "arrayJobId", "user", "project", "jobName", "cluster", "partition", "duration", "energy", "minRunningFor", "numNodes", "numAccelerators", "numHWThreads", "startTime", "state", "metricStats", "shared", "schedule", "node"} + fieldsInOrder := [...]string{"tags", "dbId", "jobId", "arrayJobId", "user", "project", "jobName", "cluster", "subCluster", "partition", "duration", "energy", "minRunningFor", "numNodes", "numAccelerators", "numHWThreads", "startTime", "state", "metricStats", "shared", "schedule", "node"} for _, k := range fieldsInOrder { v, ok := asMap[k] if !ok { @@ -13262,6 +13263,13 @@ func (ec *executionContext) unmarshalInputJobFilter(ctx context.Context, obj any return it, err } it.Cluster = data + case "subCluster": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("subCluster")) + data, err := ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v) + if err != nil { + return it, err + } + it.SubCluster = data case "partition": ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("partition")) data, err := ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v) @@ -13400,7 +13408,7 @@ func (ec *executionContext) unmarshalInputNodeFilter(ctx context.Context, obj an asMap[k] = v } - fieldsInOrder := [...]string{"hostname", "cluster", "subcluster", "schedulerState", "healthState", "timeStart"} + fieldsInOrder := [...]string{"hostname", "cluster", "subCluster", "schedulerState", "healthState", "timeStart"} for _, k := range fieldsInOrder { v, ok := asMap[k] if !ok { @@ -13421,13 +13429,13 @@ func (ec *executionContext) unmarshalInputNodeFilter(ctx context.Context, obj an return it, err } it.Cluster = data - case "subcluster": - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("subcluster")) + case "subCluster": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("subCluster")) data, err := ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v) if err != nil { return it, err } - it.Subcluster = data + it.SubCluster = data case "schedulerState": ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("schedulerState")) data, err := ec.unmarshalOSchedulerState2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑlibᚋv2ᚋschemaᚐSchedulerState(ctx, v) diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index 06f0ffcf..24b33847 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -71,6 +71,7 @@ type JobFilter struct { Project *StringInput `json:"project,omitempty"` JobName *StringInput `json:"jobName,omitempty"` Cluster *StringInput `json:"cluster,omitempty"` + SubCluster *StringInput `json:"subCluster,omitempty"` Partition *StringInput `json:"partition,omitempty"` Duration *config.IntRange `json:"duration,omitempty"` Energy *FloatRange `json:"energy,omitempty"` @@ -186,7 +187,7 @@ type NamedStatsWithScope struct { type NodeFilter struct { Hostname *StringInput `json:"hostname,omitempty"` Cluster *StringInput `json:"cluster,omitempty"` - Subcluster *StringInput `json:"subcluster,omitempty"` + SubCluster *StringInput `json:"subCluster,omitempty"` SchedulerState *schema.SchedulerState `json:"schedulerState,omitempty"` HealthState *string `json:"healthState,omitempty"` TimeStart *int `json:"timeStart,omitempty"` diff --git a/internal/metricstoreclient/cc-metric-store-queries.go b/internal/metricstoreclient/cc-metric-store-queries.go index 338d7028..acd8c979 100644 --- a/internal/metricstoreclient/cc-metric-store-queries.go +++ b/internal/metricstoreclient/cc-metric-store-queries.go @@ -149,7 +149,7 @@ func (ccms *CCMetricStore) buildQueries( // Similar to buildQueries but uses full node topology instead of job-allocated resources. // // The function handles: -// - Subcluster topology resolution (either pre-loaded or per-node lookup) +// - SubCluster topology resolution (either pre-loaded or per-node lookup) // - Full node hardware thread lists (not job-specific subsets) // - All accelerators on each node // - Metric configuration validation with subcluster filtering diff --git a/internal/repository/jobQuery.go b/internal/repository/jobQuery.go index cf7010ee..658413e8 100644 --- a/internal/repository/jobQuery.go +++ b/internal/repository/jobQuery.go @@ -190,6 +190,9 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select if filter.Cluster != nil { query = buildStringCondition("job.cluster", filter.Cluster, query) } + if filter.SubCluster != nil { + query = buildStringCondition("job.subcluster", filter.SubCluster, query) + } if filter.Partition != nil { query = buildStringCondition("job.cluster_partition", filter.Partition, query) } diff --git a/internal/repository/node.go b/internal/repository/node.go index 2a22e8c4..df3aec8b 100644 --- a/internal/repository/node.go +++ b/internal/repository/node.go @@ -254,8 +254,8 @@ func (r *NodeRepository) QueryNodes( if f.Cluster != nil { query = buildStringCondition("cluster", f.Cluster, query) } - if f.Subcluster != nil { - query = buildStringCondition("subcluster", f.Subcluster, query) + if f.SubCluster != nil { + query = buildStringCondition("subcluster", f.SubCluster, query) } if f.Hostname != nil { query = buildStringCondition("hostname", f.Hostname, query) @@ -322,8 +322,8 @@ func (r *NodeRepository) CountNodes( if f.Cluster != nil { query = buildStringCondition("cluster", f.Cluster, query) } - if f.Subcluster != nil { - query = buildStringCondition("subcluster", f.Subcluster, query) + if f.SubCluster != nil { + query = buildStringCondition("subcluster", f.SubCluster, query) } if f.Hostname != nil { query = buildStringCondition("hostname", f.Hostname, query) @@ -440,8 +440,8 @@ func (r *NodeRepository) CountStates(ctx context.Context, filters []*model.NodeF if f.Cluster != nil { query = buildStringCondition("cluster", f.Cluster, query) } - if f.Subcluster != nil { - query = buildStringCondition("subcluster", f.Subcluster, query) + if f.SubCluster != nil { + query = buildStringCondition("subcluster", f.SubCluster, query) } if f.SchedulerState != nil { query = query.Where("node_state = ?", f.SchedulerState) @@ -504,8 +504,8 @@ func (r *NodeRepository) CountStatesTimed(ctx context.Context, filters []*model. if f.Cluster != nil { query = buildStringCondition("cluster", f.Cluster, query) } - if f.Subcluster != nil { - query = buildStringCondition("subcluster", f.Subcluster, query) + if f.SubCluster != nil { + query = buildStringCondition("subcluster", f.SubCluster, query) } if f.SchedulerState != nil { query = query.Where("node_state = ?", f.SchedulerState) @@ -573,7 +573,7 @@ func (r *NodeRepository) GetNodesForList( queryFilters = append(queryFilters, &model.NodeFilter{Cluster: &model.StringInput{Eq: &cluster}}) } if subCluster != "" { - queryFilters = append(queryFilters, &model.NodeFilter{Subcluster: &model.StringInput{Eq: &subCluster}}) + queryFilters = append(queryFilters, &model.NodeFilter{SubCluster: &model.StringInput{Eq: &subCluster}}) } if nodeFilter != "" && stateFilter != "notindb" { queryFilters = append(queryFilters, &model.NodeFilter{Hostname: &model.StringInput{Contains: &nodeFilter}}) diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 942d6037..0ceb92f2 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -196,7 +196,7 @@ func (r *JobRepository) buildStatsQuery( // - filter: Filters to apply (time range, cluster, job state, etc.) // - page: Optional pagination (ItemsPerPage: -1 disables pagination) // - sortBy: Optional sort column (totalJobs, totalWalltime, totalCoreHours, etc.) -// - groupBy: Required grouping dimension (User, Project, Cluster, or Subcluster) +// - groupBy: Required grouping dimension (User, Project, Cluster, or SubCluster) // // Returns a slice of JobsStatistics, one per group, with: // - ID: The group identifier (username, project name, cluster name, etc.) @@ -420,7 +420,7 @@ func LoadJobStat(job *schema.Job, metric string, statType string) float64 { // Parameters: // - ctx: Context for security checks // - filter: Filters to apply -// - groupBy: Grouping dimension (User, Project, Cluster, or Subcluster) +// - groupBy: Grouping dimension (User, Project, Cluster, or SubCluster) // // Returns JobsStatistics with only ID and TotalJobs populated for each group. func (r *JobRepository) JobCountGrouped( diff --git a/internal/taskmanager/updateFootprintService.go b/internal/taskmanager/updateFootprintService.go index 65f4c229..c2584f13 100644 --- a/internal/taskmanager/updateFootprintService.go +++ b/internal/taskmanager/updateFootprintService.go @@ -49,7 +49,7 @@ func RegisterFootprintWorker() { if err != nil { continue } - // NOTE: Additional Subcluster Loop Could Allow For Limited List Of Footprint-Metrics Only. + // NOTE: Additional SubCluster Loop Could Allow For Limited List Of Footprint-Metrics Only. // - Chunk-Size Would Then Be 'SubCluster' (Running Jobs, Transactions) as Lists Can Change Within SCs // - Would Require Review of 'updateFootprint' Usage (Logic Could Possibly Be Included Here Completely) allMetrics := make([]string, 0) diff --git a/web/frontend/src/status/dashdetails/UsageDash.svelte b/web/frontend/src/status/dashdetails/UsageDash.svelte index 79adedc3..3fa197ae 100644 --- a/web/frontend/src/status/dashdetails/UsageDash.svelte +++ b/web/frontend/src/status/dashdetails/UsageDash.svelte @@ -59,7 +59,7 @@ const canvasPrefix = $derived(`${presetCluster}-${presetSubCluster ? presetSubCluster : ''}`) const statusFilter = $derived(presetSubCluster - ? [{ state: ["running"] }, { cluster: { eq: presetCluster} }, { partition: { eq: presetSubCluster } }] + ? [{ state: ["running"] }, { cluster: { eq: presetCluster} }, { subCluster: { eq: presetSubCluster } }] : [{ state: ["running"] }, { cluster: { eq: presetCluster} }] ); const topJobsQuery = $derived(queryStore({ From fa7727c6ca838fbd3f7ae4da96c0adab4c927623 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 6 Feb 2026 14:06:56 +0100 Subject: [PATCH 2/5] Print job db id instead of its address --- internal/api/job.go | 14 +++---- internal/archiver/archiveWorker.go | 12 +++--- internal/graph/util.go | 4 +- internal/importer/initDB.go | 4 +- internal/metricdispatch/dataLoader.go | 2 +- internal/repository/job.go | 42 +++++++++---------- internal/tagger/tagger.go | 2 +- .../taskmanager/updateFootprintService.go | 2 +- 8 files changed, 41 insertions(+), 41 deletions(-) diff --git a/internal/api/job.go b/internal/api/job.go index 64f6a92c..c3d1fbbf 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -691,7 +691,7 @@ func (api *RestAPI) startJob(rw http.ResponseWriter, r *http.Request) { for _, job := range jobs { // Check if jobs are within the same day (prevent duplicates) if (req.StartTime - job.StartTime) < secondsPerDay { - handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID), http.StatusUnprocessableEntity, rw) + handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", *job.ID, job.JobID), http.StatusUnprocessableEntity, rw) return } } @@ -860,7 +860,7 @@ func (api *RestAPI) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) rw.Header().Add("Content-Type", "application/json") rw.WriteHeader(http.StatusOK) if err := json.NewEncoder(rw).Encode(DefaultAPIResponse{ - Message: fmt.Sprintf("Successfully deleted job %d", job.ID), + Message: fmt.Sprintf("Successfully deleted job %d", *job.ID), }); err != nil { cclog.Errorf("Failed to encode response: %v", err) } @@ -926,17 +926,17 @@ func (api *RestAPI) deleteJobBefore(rw http.ResponseWriter, r *http.Request) { func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Job, req StopJobAPIRequest) { // Sanity checks if job.State != schema.JobStateRunning { - handleError(fmt.Errorf("jobId %d (id %d) on %s : job has already been stopped (state is: %s)", job.JobID, job.ID, job.Cluster, job.State), http.StatusUnprocessableEntity, rw) + handleError(fmt.Errorf("jobId %d (id %d) on %s : job has already been stopped (state is: %s)", job.JobID, *job.ID, job.Cluster, job.State), http.StatusUnprocessableEntity, rw) return } if job.StartTime > req.StopTime { - handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime), http.StatusBadRequest, rw) + handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, *job.ID, job.Cluster, req.StopTime, job.StartTime), http.StatusBadRequest, rw) return } if req.State != "" && !req.State.Valid() { - handleError(fmt.Errorf("jobId %d (id %d) on %s : invalid requested job state: %#v", job.JobID, job.ID, job.Cluster, req.State), http.StatusBadRequest, rw) + handleError(fmt.Errorf("jobId %d (id %d) on %s : invalid requested job state: %#v", job.JobID, *job.ID, job.Cluster, req.State), http.StatusBadRequest, rw) return } else if req.State == "" { req.State = schema.JobStateCompleted @@ -950,12 +950,12 @@ func (api *RestAPI) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { - handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw) + handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, *job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw) return } } - cclog.Infof("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) + cclog.Infof("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", *job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) // Send a response (with status OK). This means that errors that happen from here on forward // can *NOT* be communicated to the client. If reading from a MetricDataRepository or diff --git a/internal/archiver/archiveWorker.go b/internal/archiver/archiveWorker.go index ecdd1756..0639757d 100644 --- a/internal/archiver/archiveWorker.go +++ b/internal/archiver/archiveWorker.go @@ -126,7 +126,7 @@ func archivingWorker() { // not using meta data, called to load JobMeta into Cache? // will fail if job meta not in repository if _, err := jobRepo.FetchMetadata(job); err != nil { - cclog.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error()) + cclog.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", *job.ID, err.Error()) jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed) archivePending.Done() continue @@ -136,7 +136,7 @@ func archivingWorker() { // Use shutdown context to allow cancellation jobMeta, err := ArchiveJob(job, shutdownCtx) if err != nil { - cclog.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) + cclog.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", *job.ID, err.Error()) jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed) archivePending.Done() continue @@ -145,24 +145,24 @@ func archivingWorker() { stmt := sq.Update("job").Where("job.id = ?", job.ID) if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil { - cclog.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) + cclog.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", *job.ID, err.Error()) archivePending.Done() continue } if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil { - cclog.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) + cclog.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", *job.ID, err.Error()) archivePending.Done() continue } // Update the jobs database entry one last time: stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful) if err := jobRepo.Execute(stmt); err != nil { - cclog.Errorf("archiving job (dbid: %d) failed at db execute: %s", job.ID, err.Error()) + cclog.Errorf("archiving job (dbid: %d) failed at db execute: %s", *job.ID, err.Error()) archivePending.Done() continue } cclog.Debugf("archiving job %d took %s", job.JobID, time.Since(start)) - cclog.Infof("archiving job (dbid: %d) successful", job.ID) + cclog.Infof("archiving job (dbid: %d) successful", *job.ID) repository.CallJobStopHooks(job) archivePending.Done() diff --git a/internal/graph/util.go b/internal/graph/util.go index dd5e388f..5458d0ff 100644 --- a/internal/graph/util.go +++ b/internal/graph/util.go @@ -57,13 +57,13 @@ func (r *queryResolver) rooflineHeatmap( jobdata, err := metricdispatch.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0) if err != nil { - cclog.Warnf("Error while loading roofline metrics for job %d", job.ID) + cclog.Warnf("Error while loading roofline metrics for job %d", *job.ID) return nil, err } flops_, membw_ := jobdata["flops_any"], jobdata["mem_bw"] if flops_ == nil && membw_ == nil { - cclog.Warnf("rooflineHeatmap(): 'flops_any' or 'mem_bw' missing for job %d", job.ID) + cclog.Warnf("rooflineHeatmap(): 'flops_any' or 'mem_bw' missing for job %d", *job.ID) continue // return nil, fmt.Errorf("GRAPH/UTIL > 'flops_any' or 'mem_bw' missing for job %d", job.ID) } diff --git a/internal/importer/initDB.go b/internal/importer/initDB.go index d88be7c7..87d92cd3 100644 --- a/internal/importer/initDB.go +++ b/internal/importer/initDB.go @@ -216,7 +216,7 @@ func enrichJobMetadata(job *schema.Job) error { metricEnergy = math.Round(rawEnergy*100.0) / 100.0 } } else { - cclog.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, job.ID) + cclog.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, *job.ID) } job.EnergyFootprint[fp] = metricEnergy @@ -225,7 +225,7 @@ func enrichJobMetadata(job *schema.Job) error { job.Energy = (math.Round(totalEnergy*100.0) / 100.0) if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil { - cclog.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", job.ID) + cclog.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", *job.ID) return err } diff --git a/internal/metricdispatch/dataLoader.go b/internal/metricdispatch/dataLoader.go index 09a8ac09..78808a74 100644 --- a/internal/metricdispatch/dataLoader.go +++ b/internal/metricdispatch/dataLoader.go @@ -64,7 +64,7 @@ func cacheKey( resolution int, ) string { return fmt.Sprintf("%d(%s):[%v],[%v]-%d", - job.ID, job.State, metrics, scopes, resolution) + *job.ID, job.State, metrics, scopes, resolution) } // LoadData retrieves metric data for a job from the appropriate backend (memory store for running jobs, diff --git a/internal/repository/job.go b/internal/repository/job.go index 434b8252..6b0b2b12 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -229,7 +229,7 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error } start := time.Now() - cachekey := fmt.Sprintf("metadata:%d", job.ID) + cachekey := fmt.Sprintf("metadata:%d", *job.ID) if cached := r.cache.Get(cachekey, nil); cached != nil { job.MetaData = cached.(map[string]string) return job.MetaData, nil @@ -237,8 +237,8 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error if err := sq.Select("job.meta_data").From("job").Where("job.id = ?", job.ID). RunWith(r.stmtCache).QueryRow().Scan(&job.RawMetaData); err != nil { - cclog.Warnf("Error while scanning for job metadata (ID=%d): %v", job.ID, err) - return nil, fmt.Errorf("failed to fetch metadata for job %d: %w", job.ID, err) + cclog.Warnf("Error while scanning for job metadata (ID=%d): %v", *job.ID, err) + return nil, fmt.Errorf("failed to fetch metadata for job %d: %w", *job.ID, err) } if len(job.RawMetaData) == 0 { @@ -246,8 +246,8 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error } if err := json.Unmarshal(job.RawMetaData, &job.MetaData); err != nil { - cclog.Warnf("Error while unmarshaling raw metadata json (ID=%d): %v", job.ID, err) - return nil, fmt.Errorf("failed to unmarshal metadata for job %d: %w", job.ID, err) + cclog.Warnf("Error while unmarshaling raw metadata json (ID=%d): %v", *job.ID, err) + return nil, fmt.Errorf("failed to unmarshal metadata for job %d: %w", *job.ID, err) } r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour) @@ -270,12 +270,12 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er return fmt.Errorf("job cannot be nil") } - cachekey := fmt.Sprintf("metadata:%d", job.ID) + cachekey := fmt.Sprintf("metadata:%d", *job.ID) r.cache.Del(cachekey) if job.MetaData == nil { if _, err = r.FetchMetadata(job); err != nil { - cclog.Warnf("Error while fetching metadata for job, DB ID '%v'", job.ID) - return fmt.Errorf("failed to fetch metadata for job %d: %w", job.ID, err) + cclog.Warnf("Error while fetching metadata for job, DB ID '%v'", *job.ID) + return fmt.Errorf("failed to fetch metadata for job %d: %w", *job.ID, err) } } @@ -289,16 +289,16 @@ func (r *JobRepository) UpdateMetadata(job *schema.Job, key, val string) (err er } if job.RawMetaData, err = json.Marshal(job.MetaData); err != nil { - cclog.Warnf("Error while marshaling metadata for job, DB ID '%v'", job.ID) - return fmt.Errorf("failed to marshal metadata for job %d: %w", job.ID, err) + cclog.Warnf("Error while marshaling metadata for job, DB ID '%v'", *job.ID) + return fmt.Errorf("failed to marshal metadata for job %d: %w", *job.ID, err) } if _, err = sq.Update("job"). Set("meta_data", job.RawMetaData). Where("job.id = ?", job.ID). RunWith(r.stmtCache).Exec(); err != nil { - cclog.Warnf("Error while updating metadata for job, DB ID '%v'", job.ID) - return fmt.Errorf("failed to update metadata in database for job %d: %w", job.ID, err) + cclog.Warnf("Error while updating metadata for job, DB ID '%v'", *job.ID) + return fmt.Errorf("failed to update metadata in database for job %d: %w", *job.ID, err) } r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour) @@ -324,8 +324,8 @@ func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, err if err := sq.Select("job.footprint").From("job").Where("job.id = ?", job.ID). RunWith(r.stmtCache).QueryRow().Scan(&job.RawFootprint); err != nil { - cclog.Warnf("Error while scanning for job footprint (ID=%d): %v", job.ID, err) - return nil, fmt.Errorf("failed to fetch footprint for job %d: %w", job.ID, err) + cclog.Warnf("Error while scanning for job footprint (ID=%d): %v", *job.ID, err) + return nil, fmt.Errorf("failed to fetch footprint for job %d: %w", *job.ID, err) } if len(job.RawFootprint) == 0 { @@ -333,8 +333,8 @@ func (r *JobRepository) FetchFootprint(job *schema.Job) (map[string]float64, err } if err := json.Unmarshal(job.RawFootprint, &job.Footprint); err != nil { - cclog.Warnf("Error while unmarshaling raw footprint json (ID=%d): %v", job.ID, err) - return nil, fmt.Errorf("failed to unmarshal footprint for job %d: %w", job.ID, err) + cclog.Warnf("Error while unmarshaling raw footprint json (ID=%d): %v", *job.ID, err) + return nil, fmt.Errorf("failed to unmarshal footprint for job %d: %w", *job.ID, err) } cclog.Debugf("Timer FetchFootprint %s", time.Since(start)) @@ -357,7 +357,7 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6 } start := time.Now() - cachekey := fmt.Sprintf("energyFootprint:%d", job.ID) + cachekey := fmt.Sprintf("energyFootprint:%d", *job.ID) if cached := r.cache.Get(cachekey, nil); cached != nil { job.EnergyFootprint = cached.(map[string]float64) return job.EnergyFootprint, nil @@ -365,8 +365,8 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6 if err := sq.Select("job.energy_footprint").From("job").Where("job.id = ?", job.ID). RunWith(r.stmtCache).QueryRow().Scan(&job.RawEnergyFootprint); err != nil { - cclog.Warnf("Error while scanning for job energy_footprint (ID=%d): %v", job.ID, err) - return nil, fmt.Errorf("failed to fetch energy footprint for job %d: %w", job.ID, err) + cclog.Warnf("Error while scanning for job energy_footprint (ID=%d): %v", *job.ID, err) + return nil, fmt.Errorf("failed to fetch energy footprint for job %d: %w", *job.ID, err) } if len(job.RawEnergyFootprint) == 0 { @@ -374,8 +374,8 @@ func (r *JobRepository) FetchEnergyFootprint(job *schema.Job) (map[string]float6 } if err := json.Unmarshal(job.RawEnergyFootprint, &job.EnergyFootprint); err != nil { - cclog.Warnf("Error while unmarshaling raw energy footprint json (ID=%d): %v", job.ID, err) - return nil, fmt.Errorf("failed to unmarshal energy footprint for job %d: %w", job.ID, err) + cclog.Warnf("Error while unmarshaling raw energy footprint json (ID=%d): %v", *job.ID, err) + return nil, fmt.Errorf("failed to unmarshal energy footprint for job %d: %w", *job.ID, err) } r.cache.Put(cachekey, job.EnergyFootprint, len(job.EnergyFootprint), 24*time.Hour) diff --git a/internal/tagger/tagger.go b/internal/tagger/tagger.go index 2a5a0a7d..067f16a9 100644 --- a/internal/tagger/tagger.go +++ b/internal/tagger/tagger.go @@ -107,7 +107,7 @@ func RunTaggers() error { tagger.Match(job) } for _, tagger := range jobTagger.stopTaggers { - cclog.Infof("Run stop tagger for job %d", job.ID) + cclog.Infof("Run stop tagger for job %d", *job.ID) tagger.Match(job) } } diff --git a/internal/taskmanager/updateFootprintService.go b/internal/taskmanager/updateFootprintService.go index 65f4c229..2524d837 100644 --- a/internal/taskmanager/updateFootprintService.go +++ b/internal/taskmanager/updateFootprintService.go @@ -113,7 +113,7 @@ func RegisterFootprintWorker() { stmt := sq.Update("job") stmt, err = jobRepo.UpdateFootprint(stmt, job) if err != nil { - cclog.Errorf("update job (dbid: %d) statement build failed at footprint step: %s", job.ID, err.Error()) + cclog.Errorf("update job (dbid: %d) statement build failed at footprint step: %s", *job.ID, err.Error()) ce++ continue } From a85f72fccd24b94103b9fcc8080101b706253f7f Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 6 Feb 2026 14:30:04 +0100 Subject: [PATCH 3/5] Change log level to debug for nodestate API endpoint --- internal/api/node.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/api/node.go b/internal/api/node.go index 930deb50..27cde7f0 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -100,8 +100,8 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { } } - cclog.Infof("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs)) - startDb := time.Now() + cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs)) + startDB := time.Now() for _, node := range req.Nodes { state := determineState(node.States) @@ -122,5 +122,5 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState) } - cclog.Infof("Timer updateNodeStates, SQLite Inserts: %s", time.Since(startDb)) + cclog.Debugf("Timer updateNodeStates, SQLite Inserts: %s", time.Since(startDB)) } From 0adf2bad9274b4b36cb0cb89f1a0ccba16111d31 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 6 Feb 2026 14:50:13 +0100 Subject: [PATCH 4/5] Add info log about applied tag --- internal/repository/tags.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/repository/tags.go b/internal/repository/tags.go index 861cbb76..612666da 100644 --- a/internal/repository/tags.go +++ b/internal/repository/tags.go @@ -455,6 +455,8 @@ func (r *JobRepository) AddTagOrCreateDirect(jobID int64, tagType string, tagNam } } + cclog.Infof("Adding tag %s:%s:%s (direct)", tagType, tagName, tagScope) + if _, err := r.AddTagDirect(jobID, tagID); err != nil { return 0, err } From 6294f8e26357fa7255b7e86a9d34c404bae8ccd6 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 6 Feb 2026 14:53:05 +0100 Subject: [PATCH 5/5] Review and improve detectApp implementation --- internal/tagger/detectApp.go | 52 ++++++++++++++++++++----------- internal/tagger/detectApp_test.go | 4 +-- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/internal/tagger/detectApp.go b/internal/tagger/detectApp.go index 2a89ea21..5519cbf0 100644 --- a/internal/tagger/detectApp.go +++ b/internal/tagger/detectApp.go @@ -29,8 +29,8 @@ const ( ) type appInfo struct { - tag string - strings []string + tag string + patterns []*regexp.Regexp } // AppTagger detects applications by matching patterns in job scripts. @@ -38,8 +38,8 @@ type appInfo struct { // configuration when files change. When a job script matches a pattern, // the corresponding application tag is automatically applied. type AppTagger struct { - // apps maps application tags to their matching patterns - apps map[string]appInfo + // apps holds application patterns in deterministic order + apps []appInfo // tagType is the type of tag ("app") tagType string // cfgPath is the path to watch for configuration changes @@ -48,13 +48,27 @@ type AppTagger struct { func (t *AppTagger) scanApp(f *os.File, fns string) { scanner := bufio.NewScanner(f) - ai := appInfo{tag: strings.TrimSuffix(fns, filepath.Ext(fns)), strings: make([]string, 0)} + tag := strings.TrimSuffix(fns, filepath.Ext(fns)) + ai := appInfo{tag: tag, patterns: make([]*regexp.Regexp, 0)} for scanner.Scan() { - ai.strings = append(ai.strings, scanner.Text()) + line := scanner.Text() + re, err := regexp.Compile(line) + if err != nil { + cclog.Errorf("invalid regex pattern '%s' in %s: %v", line, fns, err) + continue + } + ai.patterns = append(ai.patterns, re) } - delete(t.apps, ai.tag) - t.apps[ai.tag] = ai + + // Remove existing entry for this tag if present + for i, a := range t.apps { + if a.tag == tag { + t.apps = append(t.apps[:i], t.apps[i+1:]...) + break + } + } + t.apps = append(t.apps, ai) } // EventMatch checks if a filesystem event should trigger configuration reload. @@ -65,7 +79,6 @@ func (t *AppTagger) EventMatch(s string) bool { // EventCallback is triggered when the configuration directory changes. // It reloads all application pattern files from the watched directory. -// FIXME: Only process the file that caused the event func (t *AppTagger) EventCallback() { files, err := os.ReadDir(t.cfgPath) if err != nil { @@ -81,7 +94,9 @@ func (t *AppTagger) EventCallback() { continue } t.scanApp(f, fns) - f.Close() + if err := f.Close(); err != nil { + cclog.Errorf("error closing app file %s: %#v", fns, err) + } } } @@ -94,7 +109,7 @@ func (t *AppTagger) Register() error { t.cfgPath = defaultConfigPath } t.tagType = tagTypeApp - t.apps = make(map[string]appInfo, 0) + t.apps = make([]appInfo, 0) if !util.CheckFileExists(t.cfgPath) { return fmt.Errorf("configuration path does not exist: %s", t.cfgPath) @@ -114,7 +129,9 @@ func (t *AppTagger) Register() error { continue } t.scanApp(f, fns) - f.Close() + if err := f.Close(); err != nil { + cclog.Errorf("error closing app file %s: %#v", fns, err) + } } cclog.Infof("Setup file watch for %s", t.cfgPath) @@ -139,15 +156,14 @@ func (t *AppTagger) Match(job *schema.Job) { jobscript, ok := metadata["jobScript"] if ok { id := *job.ID + jobscriptLower := strings.ToLower(jobscript) out: for _, a := range t.apps { - tag := a.tag - for _, s := range a.strings { - matched, _ := regexp.MatchString(s, strings.ToLower(jobscript)) - if matched { - if !r.HasTag(id, t.tagType, tag) { - r.AddTagOrCreateDirect(id, t.tagType, tag) + for _, re := range a.patterns { + if re.MatchString(jobscriptLower) { + if !r.HasTag(id, t.tagType, a.tag) { + r.AddTagOrCreateDirect(id, t.tagType, a.tag) break out } } diff --git a/internal/tagger/detectApp_test.go b/internal/tagger/detectApp_test.go index 93cbbdb7..7cd05a08 100644 --- a/internal/tagger/detectApp_test.go +++ b/internal/tagger/detectApp_test.go @@ -66,7 +66,7 @@ func TestRegister(t *testing.T) { var tagger AppTagger tagger.cfgPath = appsDir tagger.tagType = tagTypeApp - tagger.apps = make(map[string]appInfo, 0) + tagger.apps = make([]appInfo, 0) files, err := os.ReadDir(appsDir) noErr(t, err) @@ -97,7 +97,7 @@ func TestMatch(t *testing.T) { var tagger AppTagger tagger.cfgPath = appsDir tagger.tagType = tagTypeApp - tagger.apps = make(map[string]appInfo, 0) + tagger.apps = make([]appInfo, 0) files, err := os.ReadDir(appsDir) noErr(t, err)