mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-01-15 09:11:45 +01:00
Improve GetUsedNodes function
This commit is contained in:
@@ -66,6 +66,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"maps"
|
"maps"
|
||||||
"math"
|
"math"
|
||||||
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -76,7 +77,6 @@ import (
|
|||||||
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
||||||
sq "github.com/Masterminds/squirrel"
|
sq "github.com/Masterminds/squirrel"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
"github.com/xtgo/set"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -774,7 +774,16 @@ func (r *JobRepository) UpdateFootprint(
|
|||||||
return stmt.Set("footprint", string(rawFootprint)), nil
|
return stmt.Set("footprint", string(rawFootprint)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *JobRepository) GetUsedNodes(ts uint64) map[string][]string {
|
// GetUsedNodes returns a map of cluster names to sorted lists of unique hostnames
|
||||||
|
// that are currently in use by jobs that started before the given timestamp and
|
||||||
|
// are still in running state.
|
||||||
|
//
|
||||||
|
// The timestamp parameter (ts) is compared against job.start_time to find
|
||||||
|
// relevant jobs. Returns an error if the database query fails or row iteration
|
||||||
|
// encounters errors. Individual row parsing errors are logged but don't fail
|
||||||
|
// the entire operation.
|
||||||
|
func (r *JobRepository) GetUsedNodes(ts uint64) (map[string][]string, error) {
|
||||||
|
// Note: Query expects index on (job_state, start_time) for optimal performance
|
||||||
q := sq.Select("job.cluster", "job.resources").From("job").
|
q := sq.Select("job.cluster", "job.resources").From("job").
|
||||||
Where("job.start_time < ?", ts).
|
Where("job.start_time < ?", ts).
|
||||||
Where(sq.Eq{"job.job_state": "running"})
|
Where(sq.Eq{"job.job_state": "running"})
|
||||||
@@ -782,8 +791,7 @@ func (r *JobRepository) GetUsedNodes(ts uint64) map[string][]string {
|
|||||||
rows, err := q.RunWith(r.stmtCache).Query()
|
rows, err := q.RunWith(r.stmtCache).Query()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
queryString, queryVars, _ := q.ToSql()
|
queryString, queryVars, _ := q.ToSql()
|
||||||
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
|
return nil, fmt.Errorf("query failed [%s] %v: %w", queryString, queryVars, err)
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
@@ -794,16 +802,25 @@ func (r *JobRepository) GetUsedNodes(ts uint64) map[string][]string {
|
|||||||
cluster string
|
cluster string
|
||||||
rawResources []byte
|
rawResources []byte
|
||||||
resources []*schema.Resource
|
resources []*schema.Resource
|
||||||
|
skippedRows int
|
||||||
)
|
)
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
if err := rows.Scan(&cluster, &rawResources); err != nil {
|
if err := rows.Scan(&cluster, &rawResources); err != nil {
|
||||||
cclog.Warnf("Error scanning job row in GetUsedNodes: %v", err)
|
cclog.Warnf("Error scanning job row in GetUsedNodes: %v", err)
|
||||||
|
skippedRows++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
resources = resources[:0] // Clear slice, keep capacity
|
||||||
if err := json.Unmarshal(rawResources, &resources); err != nil {
|
if err := json.Unmarshal(rawResources, &resources); err != nil {
|
||||||
cclog.Warnf("Error unmarshaling resources for cluster %s: %v", cluster, err)
|
cclog.Warnf("Error unmarshaling resources for cluster %s: %v", cluster, err)
|
||||||
|
skippedRows++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(resources) == 0 {
|
||||||
|
cclog.Debugf("Job in cluster %s has no resources", cluster)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -817,19 +834,23 @@ func (r *JobRepository) GetUsedNodes(ts uint64) map[string][]string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := rows.Err(); err != nil {
|
if err := rows.Err(); err != nil {
|
||||||
cclog.Errorf("Error iterating rows in GetUsedNodes: %v", err)
|
return nil, fmt.Errorf("error iterating rows: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeList := make(map[string][]string)
|
if skippedRows > 0 {
|
||||||
|
cclog.Warnf("GetUsedNodes: Skipped %d rows due to parsing errors", skippedRows)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert sets to sorted slices
|
||||||
|
nodeList := make(map[string][]string, len(nodeSet))
|
||||||
for cluster, nodes := range nodeSet {
|
for cluster, nodes := range nodeSet {
|
||||||
// Convert map keys to slice
|
|
||||||
list := make([]string, 0, len(nodes))
|
list := make([]string, 0, len(nodes))
|
||||||
for node := range nodes {
|
for node := range nodes {
|
||||||
list = append(list, node)
|
list = append(list, node)
|
||||||
}
|
}
|
||||||
// set.Strings sorts the slice and ensures uniqueness
|
sort.Strings(list)
|
||||||
nodeList[cluster] = set.Strings(list)
|
nodeList[cluster] = list
|
||||||
}
|
}
|
||||||
|
|
||||||
return nodeList
|
return nodeList, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user