mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-01-13 21:19:06 +01:00
subclusters instead of slurm partitions
This commit is contained in:
parent
2651b96499
commit
85ad6d9543
16
api_test.go
16
api_test.go
@ -30,9 +30,14 @@ func setup(t *testing.T) *api.RestApi {
|
|||||||
|
|
||||||
const testclusterJson = `{
|
const testclusterJson = `{
|
||||||
"name": "testcluster",
|
"name": "testcluster",
|
||||||
"partitions": [
|
"subClusters": [
|
||||||
{
|
{
|
||||||
"name": "default",
|
"name": "sc0",
|
||||||
|
"nodes": "host120,host121,host122"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "sc1",
|
||||||
|
"nodes": "host123,host124,host125",
|
||||||
"processorType": "Intel Core i7-4770",
|
"processorType": "Intel Core i7-4770",
|
||||||
"socketsPerNode": 1,
|
"socketsPerNode": 1,
|
||||||
"coresPerSocket": 4,
|
"coresPerSocket": 4,
|
||||||
@ -141,7 +146,7 @@ func TestRestApi(t *testing.T) {
|
|||||||
Timestep: 60,
|
Timestep: 60,
|
||||||
Series: []schema.Series{
|
Series: []schema.Series{
|
||||||
{
|
{
|
||||||
Hostname: "testhost",
|
Hostname: "host123",
|
||||||
Statistics: &schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3},
|
Statistics: &schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3},
|
||||||
Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3},
|
Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3},
|
||||||
},
|
},
|
||||||
@ -173,7 +178,7 @@ func TestRestApi(t *testing.T) {
|
|||||||
"tags": [{ "type": "testTagType", "name": "testTagName" }],
|
"tags": [{ "type": "testTagType", "name": "testTagName" }],
|
||||||
"resources": [
|
"resources": [
|
||||||
{
|
{
|
||||||
"hostname": "testhost",
|
"hostname": "host123",
|
||||||
"hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]
|
"hwthreads": [0, 1, 2, 3, 4, 5, 6, 7]
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
@ -211,6 +216,7 @@ func TestRestApi(t *testing.T) {
|
|||||||
job.User != "testuser" ||
|
job.User != "testuser" ||
|
||||||
job.Project != "testproj" ||
|
job.Project != "testproj" ||
|
||||||
job.Cluster != "testcluster" ||
|
job.Cluster != "testcluster" ||
|
||||||
|
job.SubCluster != "sc1" ||
|
||||||
job.Partition != "default" ||
|
job.Partition != "default" ||
|
||||||
job.ArrayJobId != 0 ||
|
job.ArrayJobId != 0 ||
|
||||||
job.NumNodes != 1 ||
|
job.NumNodes != 1 ||
|
||||||
@ -219,7 +225,7 @@ func TestRestApi(t *testing.T) {
|
|||||||
job.Exclusive != 1 ||
|
job.Exclusive != 1 ||
|
||||||
job.MonitoringStatus != 1 ||
|
job.MonitoringStatus != 1 ||
|
||||||
job.SMT != 1 ||
|
job.SMT != 1 ||
|
||||||
!reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "testhost", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) ||
|
!reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "host123", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) ||
|
||||||
job.StartTime.Unix() != 123456789 {
|
job.StartTime.Unix() != 123456789 {
|
||||||
t.Fatalf("unexpected job properties: %#v", job)
|
t.Fatalf("unexpected job properties: %#v", job)
|
||||||
}
|
}
|
||||||
|
@ -20,10 +20,14 @@ import (
|
|||||||
|
|
||||||
var db *sqlx.DB
|
var db *sqlx.DB
|
||||||
var lookupConfigStmt *sqlx.Stmt
|
var lookupConfigStmt *sqlx.Stmt
|
||||||
|
|
||||||
var lock sync.RWMutex
|
var lock sync.RWMutex
|
||||||
var uiDefaults map[string]interface{}
|
var uiDefaults map[string]interface{}
|
||||||
|
|
||||||
var cache *lrucache.Cache = lrucache.New(1024)
|
var cache *lrucache.Cache = lrucache.New(1024)
|
||||||
|
|
||||||
var Clusters []*model.Cluster
|
var Clusters []*model.Cluster
|
||||||
|
var nodeLists map[string]map[string]NodeList
|
||||||
|
|
||||||
func Init(usersdb *sqlx.DB, authEnabled bool, uiConfig map[string]interface{}, jobArchive string) error {
|
func Init(usersdb *sqlx.DB, authEnabled bool, uiConfig map[string]interface{}, jobArchive string) error {
|
||||||
db = usersdb
|
db = usersdb
|
||||||
@ -34,6 +38,7 @@ func Init(usersdb *sqlx.DB, authEnabled bool, uiConfig map[string]interface{}, j
|
|||||||
}
|
}
|
||||||
|
|
||||||
Clusters = []*model.Cluster{}
|
Clusters = []*model.Cluster{}
|
||||||
|
nodeLists = map[string]map[string]NodeList{}
|
||||||
for _, de := range entries {
|
for _, de := range entries {
|
||||||
raw, err := os.ReadFile(filepath.Join(jobArchive, de.Name(), "cluster.json"))
|
raw, err := os.ReadFile(filepath.Join(jobArchive, de.Name(), "cluster.json"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -53,8 +58,8 @@ func Init(usersdb *sqlx.DB, authEnabled bool, uiConfig map[string]interface{}, j
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(cluster.Name) == 0 || len(cluster.MetricConfig) == 0 || len(cluster.Partitions) == 0 {
|
if len(cluster.Name) == 0 || len(cluster.MetricConfig) == 0 || len(cluster.SubClusters) == 0 {
|
||||||
return errors.New("cluster.name, cluster.metricConfig and cluster.Partitions should not be empty")
|
return errors.New("cluster.name, cluster.metricConfig and cluster.SubClusters should not be empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, mc := range cluster.MetricConfig {
|
for _, mc := range cluster.MetricConfig {
|
||||||
@ -83,6 +88,19 @@ func Init(usersdb *sqlx.DB, authEnabled bool, uiConfig map[string]interface{}, j
|
|||||||
}
|
}
|
||||||
|
|
||||||
Clusters = append(Clusters, &cluster)
|
Clusters = append(Clusters, &cluster)
|
||||||
|
|
||||||
|
nodeLists[cluster.Name] = make(map[string]NodeList)
|
||||||
|
for _, sc := range cluster.SubClusters {
|
||||||
|
if sc.Nodes == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
nl, err := ParseNodeList(sc.Nodes)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("in %s/cluster.json: %w", cluster.Name, err)
|
||||||
|
}
|
||||||
|
nodeLists[cluster.Name][sc.Name] = nl
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if authEnabled {
|
if authEnabled {
|
||||||
@ -188,7 +206,7 @@ func UpdateConfig(key, value string, ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetClusterConfig(cluster string) *model.Cluster {
|
func GetCluster(cluster string) *model.Cluster {
|
||||||
for _, c := range Clusters {
|
for _, c := range Clusters {
|
||||||
if c.Name == cluster {
|
if c.Name == cluster {
|
||||||
return c
|
return c
|
||||||
@ -197,11 +215,11 @@ func GetClusterConfig(cluster string) *model.Cluster {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetPartition(cluster, partition string) *model.Partition {
|
func GetSubCluster(cluster, subcluster string) *model.SubCluster {
|
||||||
for _, c := range Clusters {
|
for _, c := range Clusters {
|
||||||
if c.Name == cluster {
|
if c.Name == cluster {
|
||||||
for _, p := range c.Partitions {
|
for _, p := range c.SubClusters {
|
||||||
if p.Name == partition {
|
if p.Name == subcluster {
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -222,3 +240,40 @@ func GetMetricConfig(cluster, metric string) *model.MetricConfig {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AssignSubCluster sets the `job.subcluster` property of the job based
|
||||||
|
// on its cluster and resources.
|
||||||
|
func AssignSubCluster(job *schema.BaseJob) error {
|
||||||
|
cluster := GetCluster(job.Cluster)
|
||||||
|
if cluster == nil {
|
||||||
|
return fmt.Errorf("unkown cluster: %#v", job.Cluster)
|
||||||
|
}
|
||||||
|
|
||||||
|
if job.SubCluster != "" {
|
||||||
|
for _, sc := range cluster.SubClusters {
|
||||||
|
if sc.Name == job.SubCluster {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("already assigned subcluster %#v unkown (cluster: %#v)", job.SubCluster, job.Cluster)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(job.Resources) == 0 {
|
||||||
|
return fmt.Errorf("job without any resources/hosts")
|
||||||
|
}
|
||||||
|
|
||||||
|
host0 := job.Resources[0].Hostname
|
||||||
|
for sc, nl := range nodeLists[job.Cluster] {
|
||||||
|
if nl != nil && nl.Contains(host0) {
|
||||||
|
job.SubCluster = sc
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if cluster.SubClusters[0].Nodes == "" {
|
||||||
|
job.SubCluster = cluster.SubClusters[0].Name
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("no subcluster found for cluster %#v and host %#v", job.Cluster, host0)
|
||||||
|
}
|
||||||
|
136
config/nodelist.go
Normal file
136
config/nodelist.go
Normal file
@ -0,0 +1,136 @@
|
|||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NLExprString string
|
||||||
|
|
||||||
|
func (nle NLExprString) consume(input string) (next string, ok bool) {
|
||||||
|
str := string(nle)
|
||||||
|
if strings.HasPrefix(input, str) {
|
||||||
|
return strings.TrimPrefix(input, str), true
|
||||||
|
}
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
|
||||||
|
type NLExprIntRange struct {
|
||||||
|
start, end int64
|
||||||
|
zeroPadded bool
|
||||||
|
digits int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nle NLExprIntRange) consume(input string) (next string, ok bool) {
|
||||||
|
if !nle.zeroPadded || nle.digits < 1 {
|
||||||
|
log.Error("node list: only zero-padded ranges are allowed")
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(input) < nle.digits {
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
|
||||||
|
numerals, rest := input[:nle.digits], input[nle.digits:]
|
||||||
|
for len(numerals) > 1 && numerals[0] == '0' {
|
||||||
|
numerals = numerals[1:]
|
||||||
|
}
|
||||||
|
|
||||||
|
x, err := strconv.ParseInt(numerals, 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
|
||||||
|
if nle.start <= x && x <= nle.end {
|
||||||
|
return rest, true
|
||||||
|
}
|
||||||
|
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
|
||||||
|
type NodeList [][]interface {
|
||||||
|
consume(input string) (next string, ok bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nl *NodeList) Contains(name string) bool {
|
||||||
|
var ok bool
|
||||||
|
for _, term := range *nl {
|
||||||
|
str := name
|
||||||
|
for _, expr := range term {
|
||||||
|
str, ok = expr.consume(str)
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok && str == "" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseNodeList(raw string) (NodeList, error) {
|
||||||
|
nl := NodeList{}
|
||||||
|
|
||||||
|
isLetter := func(r byte) bool { return ('a' <= r && r <= 'z') || ('A' <= r && r <= 'Z') }
|
||||||
|
isDigit := func(r byte) bool { return '0' <= r && r <= '9' }
|
||||||
|
|
||||||
|
for _, rawterm := range strings.Split(raw, ",") {
|
||||||
|
exprs := []interface {
|
||||||
|
consume(input string) (next string, ok bool)
|
||||||
|
}{}
|
||||||
|
for i := 0; i < len(rawterm); i++ {
|
||||||
|
c := rawterm[i]
|
||||||
|
if isLetter(c) || isDigit(c) {
|
||||||
|
j := i
|
||||||
|
for j < len(rawterm) && (isLetter(rawterm[j]) || isDigit(rawterm[j])) {
|
||||||
|
j++
|
||||||
|
}
|
||||||
|
exprs = append(exprs, NLExprString(rawterm[i:j]))
|
||||||
|
i = j - 1
|
||||||
|
} else if c == '[' {
|
||||||
|
end := strings.Index(rawterm[i:], "]")
|
||||||
|
if end == -1 {
|
||||||
|
return nil, fmt.Errorf("node list: unclosed '['")
|
||||||
|
}
|
||||||
|
|
||||||
|
minus := strings.Index(rawterm[i:i+end], "-")
|
||||||
|
if minus == -1 {
|
||||||
|
return nil, fmt.Errorf("node list: no '-' found inside '[...]'")
|
||||||
|
}
|
||||||
|
|
||||||
|
s1, s2 := rawterm[i+1:i+minus], rawterm[i+minus+1:i+end]
|
||||||
|
if len(s1) != len(s2) || len(s1) == 0 {
|
||||||
|
return nil, fmt.Errorf("node list: %#v and %#v are not of equal length or of length zero", s1, s2)
|
||||||
|
}
|
||||||
|
|
||||||
|
x1, err := strconv.ParseInt(s1, 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("node list: %w", err)
|
||||||
|
}
|
||||||
|
x2, err := strconv.ParseInt(s2, 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("node list: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
exprs = append(exprs, NLExprIntRange{
|
||||||
|
start: x1,
|
||||||
|
end: x2,
|
||||||
|
digits: len(s1),
|
||||||
|
zeroPadded: true,
|
||||||
|
})
|
||||||
|
i += end
|
||||||
|
} else {
|
||||||
|
return nil, fmt.Errorf("node list: invalid character: %#v", rune(c))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nl = append(nl, exprs)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nl, nil
|
||||||
|
}
|
37
config/nodelist_test.go
Normal file
37
config/nodelist_test.go
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNodeList(t *testing.T) {
|
||||||
|
nl, err := ParseNodeList("hallo,wel123t,emmy[01-99],fritz[005-500],woody[100-200]")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// fmt.Printf("terms\n")
|
||||||
|
// for i, term := range nl.terms {
|
||||||
|
// fmt.Printf("term %d: %#v\n", i, term)
|
||||||
|
// }
|
||||||
|
|
||||||
|
if nl.Contains("hello") || nl.Contains("woody") {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
|
||||||
|
if nl.Contains("fritz1") || nl.Contains("fritz9") || nl.Contains("fritz004") || nl.Contains("woody201") {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
|
||||||
|
if !nl.Contains("hallo") || !nl.Contains("wel123t") {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
|
||||||
|
if !nl.Contains("emmy01") || !nl.Contains("emmy42") || !nl.Contains("emmy99") {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
|
||||||
|
if !nl.Contains("woody100") || !nl.Contains("woody199") {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
@ -6,7 +6,7 @@ type Cluster struct {
|
|||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
MetricConfig []*MetricConfig `json:"metricConfig"`
|
MetricConfig []*MetricConfig `json:"metricConfig"`
|
||||||
FilterRanges *FilterRanges `json:"filterRanges"`
|
FilterRanges *FilterRanges `json:"filterRanges"`
|
||||||
Partitions []*Partition `json:"partitions"`
|
SubClusters []*SubCluster `json:"subClusters"`
|
||||||
|
|
||||||
// NOT part of the API:
|
// NOT part of the API:
|
||||||
MetricDataRepository *MetricDataRepository `json:"metricDataRepository"`
|
MetricDataRepository *MetricDataRepository `json:"metricDataRepository"`
|
||||||
|
@ -122,8 +122,16 @@ type PageRequest struct {
|
|||||||
Page int `json:"page"`
|
Page int `json:"page"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Partition struct {
|
type StringInput struct {
|
||||||
|
Eq *string `json:"eq"`
|
||||||
|
Contains *string `json:"contains"`
|
||||||
|
StartsWith *string `json:"startsWith"`
|
||||||
|
EndsWith *string `json:"endsWith"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type SubCluster struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
|
Nodes string `json:"nodes"`
|
||||||
ProcessorType string `json:"processorType"`
|
ProcessorType string `json:"processorType"`
|
||||||
SocketsPerNode int `json:"socketsPerNode"`
|
SocketsPerNode int `json:"socketsPerNode"`
|
||||||
CoresPerSocket int `json:"coresPerSocket"`
|
CoresPerSocket int `json:"coresPerSocket"`
|
||||||
@ -134,13 +142,6 @@ type Partition struct {
|
|||||||
Topology *Topology `json:"topology"`
|
Topology *Topology `json:"topology"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type StringInput struct {
|
|
||||||
Eq *string `json:"eq"`
|
|
||||||
Contains *string `json:"contains"`
|
|
||||||
StartsWith *string `json:"startsWith"`
|
|
||||||
EndsWith *string `json:"endsWith"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type TimeRange struct {
|
type TimeRange struct {
|
||||||
From *time.Time `json:"from"`
|
From *time.Time `json:"from"`
|
||||||
To *time.Time `json:"to"`
|
To *time.Time `json:"to"`
|
||||||
|
@ -33,11 +33,12 @@ type Cluster {
|
|||||||
name: String!
|
name: String!
|
||||||
metricConfig: [MetricConfig!]!
|
metricConfig: [MetricConfig!]!
|
||||||
filterRanges: FilterRanges!
|
filterRanges: FilterRanges!
|
||||||
partitions: [Partition!]!
|
subClusters: [SubCluster!]!
|
||||||
}
|
}
|
||||||
|
|
||||||
type Partition {
|
type SubCluster {
|
||||||
name: String!
|
name: String!
|
||||||
|
nodes: String!
|
||||||
processorType: String!
|
processorType: String!
|
||||||
socketsPerNode: Int!
|
socketsPerNode: Int!
|
||||||
coresPerSocket: Int!
|
coresPerSocket: Int!
|
||||||
|
@ -18,6 +18,10 @@ import (
|
|||||||
"github.com/ClusterCockpit/cc-backend/schema"
|
"github.com/ClusterCockpit/cc-backend/schema"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (r *clusterResolver) SubClusters(ctx context.Context, obj *model.Cluster) ([]*model.SubCluster, error) {
|
||||||
|
panic(fmt.Errorf("not implemented"))
|
||||||
|
}
|
||||||
|
|
||||||
func (r *jobResolver) MetaData(ctx context.Context, obj *schema.Job) (interface{}, error) {
|
func (r *jobResolver) MetaData(ctx context.Context, obj *schema.Job) (interface{}, error) {
|
||||||
return r.Repo.FetchMetadata(obj)
|
return r.Repo.FetchMetadata(obj)
|
||||||
}
|
}
|
||||||
@ -204,7 +208,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, partiti
|
|||||||
}
|
}
|
||||||
|
|
||||||
if metrics == nil {
|
if metrics == nil {
|
||||||
for _, mc := range config.GetClusterConfig(cluster).MetricConfig {
|
for _, mc := range config.GetCluster(cluster).MetricConfig {
|
||||||
metrics = append(metrics, mc.Name)
|
metrics = append(metrics, mc.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -236,6 +240,9 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, partiti
|
|||||||
return nodeMetrics, nil
|
return nodeMetrics, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cluster returns generated.ClusterResolver implementation.
|
||||||
|
func (r *Resolver) Cluster() generated.ClusterResolver { return &clusterResolver{r} }
|
||||||
|
|
||||||
// Job returns generated.JobResolver implementation.
|
// Job returns generated.JobResolver implementation.
|
||||||
func (r *Resolver) Job() generated.JobResolver { return &jobResolver{r} }
|
func (r *Resolver) Job() generated.JobResolver { return &jobResolver{r} }
|
||||||
|
|
||||||
@ -245,6 +252,7 @@ func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResol
|
|||||||
// Query returns generated.QueryResolver implementation.
|
// Query returns generated.QueryResolver implementation.
|
||||||
func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
|
func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
|
||||||
|
|
||||||
|
type clusterResolver struct{ *Resolver }
|
||||||
type jobResolver struct{ *Resolver }
|
type jobResolver struct{ *Resolver }
|
||||||
type mutationResolver struct{ *Resolver }
|
type mutationResolver struct{ *Resolver }
|
||||||
type queryResolver struct{ *Resolver }
|
type queryResolver struct{ *Resolver }
|
||||||
|
@ -32,8 +32,8 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
|
|||||||
|
|
||||||
// `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those.
|
// `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those.
|
||||||
for _, cluster := range config.Clusters {
|
for _, cluster := range config.Clusters {
|
||||||
for _, partition := range cluster.Partitions {
|
for _, subcluster := range cluster.SubClusters {
|
||||||
corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as int)", partition.SocketsPerNode, partition.CoresPerSocket)
|
corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as int)", subcluster.SocketsPerNode, subcluster.CoresPerSocket)
|
||||||
var query sq.SelectBuilder
|
var query sq.SelectBuilder
|
||||||
if groupBy == nil {
|
if groupBy == nil {
|
||||||
query = sq.Select(
|
query = sq.Select(
|
||||||
@ -54,7 +54,7 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF
|
|||||||
|
|
||||||
query = query.
|
query = query.
|
||||||
Where("job.cluster = ?", cluster.Name).
|
Where("job.cluster = ?", cluster.Name).
|
||||||
Where("job.partition = ?", partition.Name)
|
Where("job.subcluster = ?", subcluster.Name)
|
||||||
|
|
||||||
query = repository.SecurityCheck(ctx, query)
|
query = repository.SecurityCheck(ctx, query)
|
||||||
for _, f := range filter {
|
for _, f := range filter {
|
||||||
|
@ -157,7 +157,7 @@ func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) {
|
|||||||
// Writes a running job to the job-archive
|
// Writes a running job to the job-archive
|
||||||
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
|
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
|
||||||
allMetrics := make([]string, 0)
|
allMetrics := make([]string, 0)
|
||||||
metricConfigs := config.GetClusterConfig(job.Cluster).MetricConfig
|
metricConfigs := config.GetCluster(job.Cluster).MetricConfig
|
||||||
for _, mc := range metricConfigs {
|
for _, mc := range metricConfigs {
|
||||||
allMetrics = append(allMetrics, mc.Name)
|
allMetrics = append(allMetrics, mc.Name)
|
||||||
}
|
}
|
||||||
|
@ -227,7 +227,7 @@ var (
|
|||||||
|
|
||||||
func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scopes []schema.MetricScope) ([]ApiQuery, []schema.MetricScope, error) {
|
func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scopes []schema.MetricScope) ([]ApiQuery, []schema.MetricScope, error) {
|
||||||
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
|
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources))
|
||||||
topology := config.GetPartition(job.Cluster, job.Partition).Topology
|
topology := config.GetSubCluster(job.Cluster, job.SubCluster).Topology
|
||||||
assignedScope := []schema.MetricScope{}
|
assignedScope := []schema.MetricScope{}
|
||||||
|
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
|
@ -79,7 +79,7 @@ func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ct
|
|||||||
}
|
}
|
||||||
|
|
||||||
if metrics == nil {
|
if metrics == nil {
|
||||||
cluster := config.GetClusterConfig(job.Cluster)
|
cluster := config.GetCluster(job.Cluster)
|
||||||
for _, mc := range cluster.MetricConfig {
|
for _, mc := range cluster.MetricConfig {
|
||||||
metrics = append(metrics, mc.Name)
|
metrics = append(metrics, mc.Name)
|
||||||
}
|
}
|
||||||
@ -167,7 +167,7 @@ func LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []s
|
|||||||
}
|
}
|
||||||
|
|
||||||
if metrics == nil {
|
if metrics == nil {
|
||||||
for _, m := range config.GetClusterConfig(cluster).MetricConfig {
|
for _, m := range config.GetCluster(cluster).MetricConfig {
|
||||||
metrics = append(metrics, m.Name)
|
metrics = append(metrics, m.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -122,12 +122,13 @@ func (r *JobRepository) ImportJob(jobMeta *schema.JobMeta, jobData *schema.JobDa
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This function also sets the subcluster if necessary!
|
||||||
func SanityChecks(job *schema.BaseJob) error {
|
func SanityChecks(job *schema.BaseJob) error {
|
||||||
if c := config.GetClusterConfig(job.Cluster); c == nil {
|
if c := config.GetCluster(job.Cluster); c == nil {
|
||||||
return fmt.Errorf("no such cluster: %#v", job.Cluster)
|
return fmt.Errorf("no such cluster: %#v", job.Cluster)
|
||||||
}
|
}
|
||||||
if p := config.GetPartition(job.Cluster, job.Partition); p == nil {
|
if err := config.AssignSubCluster(job); err != nil {
|
||||||
return fmt.Errorf("no such partition: %#v (on cluster %#v)", job.Partition, job.Cluster)
|
return err
|
||||||
}
|
}
|
||||||
if !job.State.Valid() {
|
if !job.State.Valid() {
|
||||||
return fmt.Errorf("not a valid job state: %#v", job.State)
|
return fmt.Errorf("not a valid job state: %#v", job.State)
|
||||||
|
@ -31,17 +31,17 @@ func (r *JobRepository) Init() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var jobColumns []string = []string{
|
var jobColumns []string = []string{
|
||||||
"job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.start_time", "job.partition", "job.array_job_id",
|
"job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.subcluster", "job.start_time", "job.partition", "job.array_job_id",
|
||||||
"job.num_nodes", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state",
|
"job.num_nodes", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state",
|
||||||
"job.duration", "job.resources", // "job.meta_data",
|
"job.duration", "job.walltime", "job.resources", // "job.meta_data",
|
||||||
}
|
}
|
||||||
|
|
||||||
func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
|
func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
|
||||||
job := &schema.Job{}
|
job := &schema.Job{}
|
||||||
if err := row.Scan(
|
if err := row.Scan(
|
||||||
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId,
|
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId,
|
||||||
&job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
|
&job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
|
||||||
&job.Duration, &job.RawResources /*&job.MetaData*/); err != nil {
|
&job.Duration, &job.Walltime, &job.RawResources /*&job.MetaData*/); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user