diff --git a/go.mod b/go.mod index 9cb82fbc..808b2e7a 100644 --- a/go.mod +++ b/go.mod @@ -109,6 +109,7 @@ require ( github.com/urfave/cli/v2 v2.27.7 // indirect github.com/urfave/cli/v3 v3.6.1 // indirect github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 // indirect + github.com/xtgo/set v1.0.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect diff --git a/go.sum b/go.sum index 8d3904ae..39571309 100644 --- a/go.sum +++ b/go.sum @@ -318,6 +318,8 @@ github.com/vektah/gqlparser/v2 v2.5.31 h1:YhWGA1mfTjID7qJhd1+Vxhpk5HTgydrGU9IgkW github.com/vektah/gqlparser/v2 v2.5.31/go.mod h1:c1I28gSOVNzlfc4WuDlqU7voQnsqI6OG2amkBAFmgts= github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 h1:FnBeRrxr7OU4VvAzt5X7s6266i6cSVkkFPS0TuXWbIg= github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= +github.com/xtgo/set v1.0.0 h1:6BCNBRv3ORNDQ7fyoJXRv+tstJz3m1JVFQErfeZz2pY= +github.com/xtgo/set v1.0.0/go.mod h1:d3NHzGzSa0NmB2NhFyECA+QdRp29oEn2xbT+TpeFoM8= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/internal/repository/job.go b/internal/repository/job.go index 99970ce1..1ff8047e 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -76,6 +76,7 @@ import ( "github.com/ClusterCockpit/cc-lib/v2/schema" sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" + "github.com/xtgo/set" ) var ( @@ -772,3 +773,63 @@ func (r *JobRepository) UpdateFootprint( return stmt.Set("footprint", string(rawFootprint)), nil } + +func (r *JobRepository) GetUsedNodes(ts uint64) map[string][]string { + q := sq.Select("job.cluster", "job.resources").From("job"). + Where("job.start_time < ?", ts). + Where(sq.Eq{"job.job_state": "running"}) + + rows, err := q.RunWith(r.stmtCache).Query() + if err != nil { + queryString, queryVars, _ := q.ToSql() + cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err) + return nil + } + defer rows.Close() + + // Use a map of sets for efficient deduplication + nodeSet := make(map[string]map[string]struct{}) + + var ( + cluster string + rawResources []byte + resources []*schema.Resource + ) + + for rows.Next() { + if err := rows.Scan(&cluster, &rawResources); err != nil { + cclog.Warnf("Error scanning job row in GetUsedNodes: %v", err) + continue + } + + if err := json.Unmarshal(rawResources, &resources); err != nil { + cclog.Warnf("Error unmarshaling resources for cluster %s: %v", cluster, err) + continue + } + + if _, ok := nodeSet[cluster]; !ok { + nodeSet[cluster] = make(map[string]struct{}) + } + + for _, res := range resources { + nodeSet[cluster][res.Hostname] = struct{}{} + } + } + + if err := rows.Err(); err != nil { + cclog.Errorf("Error iterating rows in GetUsedNodes: %v", err) + } + + nodeList := make(map[string][]string) + for cluster, nodes := range nodeSet { + // Convert map keys to slice + list := make([]string, 0, len(nodes)) + for node := range nodes { + list = append(list, node) + } + // set.Strings sorts the slice and ensures uniqueness + nodeList[cluster] = set.Strings(list) + } + + return nodeList +}