From 71b75eea0e53b99b27b18ed78f5600ce4e199b6f Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 14 Jan 2026 08:49:55 +0100 Subject: [PATCH] Improve GetUsedNodes function --- internal/repository/job.go | 41 ++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/internal/repository/job.go b/internal/repository/job.go index 1ff8047e..78e1f3fe 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -66,6 +66,7 @@ import ( "fmt" "maps" "math" + "sort" "strconv" "sync" "time" @@ -76,7 +77,6 @@ import ( "github.com/ClusterCockpit/cc-lib/v2/schema" sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" - "github.com/xtgo/set" ) var ( @@ -774,7 +774,16 @@ func (r *JobRepository) UpdateFootprint( return stmt.Set("footprint", string(rawFootprint)), nil } -func (r *JobRepository) GetUsedNodes(ts uint64) map[string][]string { +// GetUsedNodes returns a map of cluster names to sorted lists of unique hostnames +// that are currently in use by jobs that started before the given timestamp and +// are still in running state. +// +// The timestamp parameter (ts) is compared against job.start_time to find +// relevant jobs. Returns an error if the database query fails or row iteration +// encounters errors. Individual row parsing errors are logged but don't fail +// the entire operation. +func (r *JobRepository) GetUsedNodes(ts uint64) (map[string][]string, error) { + // Note: Query expects index on (job_state, start_time) for optimal performance q := sq.Select("job.cluster", "job.resources").From("job"). Where("job.start_time < ?", ts). Where(sq.Eq{"job.job_state": "running"}) @@ -782,8 +791,7 @@ func (r *JobRepository) GetUsedNodes(ts uint64) map[string][]string { rows, err := q.RunWith(r.stmtCache).Query() if err != nil { queryString, queryVars, _ := q.ToSql() - cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err) - return nil + return nil, fmt.Errorf("query failed [%s] %v: %w", queryString, queryVars, err) } defer rows.Close() @@ -794,16 +802,25 @@ func (r *JobRepository) GetUsedNodes(ts uint64) map[string][]string { cluster string rawResources []byte resources []*schema.Resource + skippedRows int ) for rows.Next() { if err := rows.Scan(&cluster, &rawResources); err != nil { cclog.Warnf("Error scanning job row in GetUsedNodes: %v", err) + skippedRows++ continue } + resources = resources[:0] // Clear slice, keep capacity if err := json.Unmarshal(rawResources, &resources); err != nil { cclog.Warnf("Error unmarshaling resources for cluster %s: %v", cluster, err) + skippedRows++ + continue + } + + if len(resources) == 0 { + cclog.Debugf("Job in cluster %s has no resources", cluster) continue } @@ -817,19 +834,23 @@ func (r *JobRepository) GetUsedNodes(ts uint64) map[string][]string { } if err := rows.Err(); err != nil { - cclog.Errorf("Error iterating rows in GetUsedNodes: %v", err) + return nil, fmt.Errorf("error iterating rows: %w", err) } - nodeList := make(map[string][]string) + if skippedRows > 0 { + cclog.Warnf("GetUsedNodes: Skipped %d rows due to parsing errors", skippedRows) + } + + // Convert sets to sorted slices + nodeList := make(map[string][]string, len(nodeSet)) for cluster, nodes := range nodeSet { - // Convert map keys to slice list := make([]string, 0, len(nodes)) for node := range nodes { list = append(list, node) } - // set.Strings sorts the slice and ensures uniqueness - nodeList[cluster] = set.Strings(list) + sort.Strings(list) + nodeList[cluster] = list } - return nodeList + return nodeList, nil }