This commit is contained in:
Christoph Kluge
2025-10-16 15:33:59 +02:00
27 changed files with 759 additions and 511 deletions

View File

@@ -4,7 +4,7 @@ scalar Any
scalar NullableFloat
scalar MetricScope
scalar JobState
scalar NodeState
scalar SchedulerState
scalar MonitoringState
type Node {
@@ -12,8 +12,11 @@ type Node {
hostname: String!
cluster: String!
subCluster: String!
runningJobs: Int!
nodeState: NodeState!
jobsRunning: Int!
cpusAllocated: Int
memoryAllocated: Int
gpusAllocated: Int
schedulerState: SchedulerState!
healthState: MonitoringState!
metaData: Any
}
@@ -361,7 +364,7 @@ type Query {
from: Time!
to: Time!
): [NodeMetrics!]!
nodeMetricsList(
cluster: String!
subCluster: String!
@@ -399,7 +402,7 @@ input NodeFilter {
hostname: StringInput
cluster: StringInput
subcluster: StringInput
nodeState: NodeState
schedulerState: SchedulerState
healthState: MonitoringState
}

View File

@@ -9,7 +9,7 @@ import "flag"
var (
flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB,
flagForceDB, flagDev, flagVersion, flagLogDateTime, flagApplyTags bool
flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagUiConfigFile, flagImportJob, flagLogLevel string
flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string
)
func cliInit() {
@@ -26,7 +26,6 @@ func cliInit() {
flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit")
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
flag.StringVar(&flagUiConfigFile, "ui-config", "./uiConfig.json", "Specify alternative path to `uiConfig.json`")
flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: <username>:[admin,support,manager,api,user]:<password>")
flag.StringVar(&flagDelUser, "del-user", "", "Remove a existing user. Argument format: <username>")
flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by its `username`")

View File

@@ -24,6 +24,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/tagger"
"github.com/ClusterCockpit/cc-backend/internal/taskManager"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/web"
ccconf "github.com/ClusterCockpit/cc-lib/ccConfig"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/runtimeEnv"
@@ -244,7 +245,7 @@ func main() {
var wg sync.WaitGroup
//Metric Store starts after all flags have been processes
// Metric Store starts after all flags have been processes
if config.InternalCCMSFlag {
if mscfg := ccconf.GetPackageConfig("metric-store"); mscfg != nil {
config.InitMetricStore(mscfg)
@@ -260,6 +261,9 @@ func main() {
taskManager.Start(ccconf.GetPackageConfig("cron"),
ccconf.GetPackageConfig("archive"))
cfg := ccconf.GetPackageConfig("ui")
web.Init(cfg)
serverInit()
wg.Add(1)

View File

@@ -52,8 +52,6 @@ func onFailureResponse(rw http.ResponseWriter, r *http.Request, err error) {
}
func serverInit() {
// Init Web Package (Primarily: uiDefaults)
web.Init(flagUiConfigFile)
// Setup the http.Handler/Router used by the server
graph.Init()
resolver := graph.GetResolverInstance()

View File

@@ -6,18 +6,11 @@
"user": "clustercockpit",
"group": "clustercockpit",
"validate": false,
"apiAllowedIPs": [
"*"
],
"apiAllowedIPs": ["*"],
"short-running-jobs-duration": 300,
"resampling": {
"trigger": 30,
"resolutions": [
600,
300,
120,
60
]
"resolutions": [600, 300, 120, 60]
}
},
"cron": {
@@ -53,4 +46,5 @@
}
}
]
}
}

20
go.mod
View File

@@ -1,12 +1,12 @@
module github.com/ClusterCockpit/cc-backend
go 1.23.5
go 1.24.0
toolchain go1.24.1
require (
github.com/99designs/gqlgen v0.17.78
github.com/ClusterCockpit/cc-lib v0.8.0
github.com/ClusterCockpit/cc-lib v0.10.1
github.com/Masterminds/squirrel v1.5.4
github.com/coreos/go-oidc/v3 v3.12.0
github.com/expr-lang/expr v1.17.6
@@ -24,7 +24,7 @@ require (
github.com/joho/godotenv v1.5.1
github.com/linkedin/goavro/v2 v2.14.0
github.com/mattn/go-sqlite3 v1.14.24
github.com/nats-io/nats.go v1.45.0
github.com/nats-io/nats.go v1.46.1
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/common v0.66.1
github.com/qustavo/sqlhooks/v2 v2.1.0
@@ -32,9 +32,9 @@ require (
github.com/swaggo/http-swagger v1.3.4
github.com/swaggo/swag v1.16.6
github.com/vektah/gqlparser/v2 v2.5.30
golang.org/x/crypto v0.41.0
golang.org/x/crypto v0.42.0
golang.org/x/oauth2 v0.30.0
golang.org/x/time v0.12.0
golang.org/x/time v0.13.0
)
require (
@@ -86,12 +86,12 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect
golang.org/x/mod v0.26.0 // indirect
golang.org/x/mod v0.27.0 // indirect
golang.org/x/net v0.43.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.28.0 // indirect
golang.org/x/tools v0.35.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/sys v0.36.0 // indirect
golang.org/x/text v0.29.0 // indirect
golang.org/x/tools v0.36.0 // indirect
google.golang.org/protobuf v1.36.8 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
sigs.k8s.io/yaml v1.6.0 // indirect

36
go.sum
View File

@@ -6,8 +6,8 @@ github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8=
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
github.com/ClusterCockpit/cc-lib v0.8.0 h1:kQRMOx30CJCy+Q6TgCK9rarJnJ/CKZPWlIEdIXYlxoA=
github.com/ClusterCockpit/cc-lib v0.8.0/go.mod h1:5xTwONu9pSp15mJ9CjBKGU9I3Jad8NfhrVHJZl50/yI=
github.com/ClusterCockpit/cc-lib v0.10.1 h1:tjGEH8mFGgznYxO8BKLiiar0eZR1Oytk8x5iIQHZR5s=
github.com/ClusterCockpit/cc-lib v0.10.1/go.mod h1:nvTZuxFCTwlos8I1rL5O1RPab7vRtkU8E/PGiaF6pQA=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM=
@@ -207,8 +207,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA=
github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nats.go v1.46.1 h1:bqQ2ZcxVd2lpYI97xYASeRTY3I5boe/IVmuUDPitHfo=
github.com/nats-io/nats.go v1.46.1/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
@@ -295,8 +295,8 @@ golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliY
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
@@ -304,8 +304,8 @@ golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.26.0 h1:EGMPT//Ezu+ylkCijjPc+f4Aih7sZvaAr+O3EHBxvZg=
golang.org/x/mod v0.26.0/go.mod h1:/j6NAhSk8iQ723BGAUyoAcn7SlD7s15Dp9Nd/SfeaFQ=
golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ=
golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
@@ -328,8 +328,8 @@ golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -341,8 +341,8 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@@ -361,18 +361,18 @@ golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE=
golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI=
golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/tools v0.35.0 h1:mBffYraMEf7aa0sB+NuKnuCy8qI/9Bughn8dC2Gu5r0=
golang.org/x/tools v0.35.0/go.mod h1:NKdj5HkL/73byiZSJjqJgKn3ep7KjFkBOkR/Hps3VPw=
golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg=
golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=

View File

@@ -63,11 +63,11 @@ models:
fields:
partitions:
resolver: true
Node:
model: "github.com/ClusterCockpit/cc-lib/schema.Node"
fields:
metaData:
resolver: true
# Node:
# model: "github.com/ClusterCockpit/cc-lib/schema.Node"
# fields:
# metaData:
# resolver: true
NullableFloat: { model: "github.com/ClusterCockpit/cc-lib/schema.Float" }
MetricScope: { model: "github.com/ClusterCockpit/cc-lib/schema.MetricScope" }
MetricValue: { model: "github.com/ClusterCockpit/cc-lib/schema.MetricValue" }
@@ -80,8 +80,9 @@ models:
Tag: { model: "github.com/ClusterCockpit/cc-lib/schema.Tag" }
Resource: { model: "github.com/ClusterCockpit/cc-lib/schema.Resource" }
JobState: { model: "github.com/ClusterCockpit/cc-lib/schema.JobState" }
MonitoringState:
{ model: "github.com/ClusterCockpit/cc-lib/schema.NodeState" }
Node: { model: "github.com/ClusterCockpit/cc-lib/schema.Node" }
SchedulerState:
{ model: "github.com/ClusterCockpit/cc-lib/schema.SchedulerState" }
HealthState:
{ model: "github.com/ClusterCockpit/cc-lib/schema.MonitoringState" }
JobMetric: { model: "github.com/ClusterCockpit/cc-lib/schema.JobMetric" }

View File

@@ -124,19 +124,19 @@ func setup(t *testing.T) *api.RestApi {
cclog.Init("info", true)
tmpdir := t.TempDir()
jobarchive := filepath.Join(tmpdir, "job-archive")
if err := os.Mkdir(jobarchive, 0777); err != nil {
if err := os.Mkdir(jobarchive, 0o777); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), fmt.Appendf(nil, "%d", 2), 0666); err != nil {
if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"), fmt.Appendf(nil, "%d", 2), 0o666); err != nil {
t.Fatal(err)
}
if err := os.Mkdir(filepath.Join(jobarchive, "testcluster"), 0777); err != nil {
if err := os.Mkdir(filepath.Join(jobarchive, "testcluster"), 0o777); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(jobarchive, "testcluster", "cluster.json"), []byte(testclusterJson), 0666); err != nil {
if err := os.WriteFile(filepath.Join(jobarchive, "testcluster", "cluster.json"), []byte(testclusterJson), 0o666); err != nil {
t.Fatal(err)
}
@@ -147,7 +147,7 @@ func setup(t *testing.T) *api.RestApi {
}
cfgFilePath := filepath.Join(tmpdir, "config.json")
if err := os.WriteFile(cfgFilePath, []byte(testconfig), 0666); err != nil {
if err := os.WriteFile(cfgFilePath, []byte(testconfig), 0o666); err != nil {
t.Fatal(err)
}
@@ -293,7 +293,7 @@ func TestRestApi(t *testing.T) {
job.SubCluster != "sc1" ||
job.Partition != "default" ||
job.Walltime != 3600 ||
job.ArrayJobId != 0 ||
job.ArrayJobID != 0 ||
job.NumNodes != 1 ||
job.NumHWThreads != 8 ||
job.NumAcc != 0 ||

View File

@@ -15,24 +15,13 @@ import (
"github.com/ClusterCockpit/cc-lib/schema"
)
type Node struct {
Name string `json:"hostname"`
States []string `json:"states"`
CpusAllocated int `json:"cpusAllocated"`
CpusTotal int `json:"cpusTotal"`
MemoryAllocated int `json:"memoryAllocated"`
MemoryTotal int `json:"memoryTotal"`
GpusAllocated int `json:"gpusAllocated"`
GpusTotal int `json:"gpusTotal"`
}
type UpdateNodeStatesRequest struct {
Nodes []Node `json:"nodes"`
Cluster string `json:"cluster" example:"fritz"`
Nodes []schema.NodePayload `json:"nodes"`
Cluster string `json:"cluster" example:"fritz"`
}
// this routine assumes that only one of them exists per node
func determineState(states []string) schema.NodeState {
func determineState(states []string) schema.SchedulerState {
for _, state := range states {
switch strings.ToLower(state) {
case "allocated":
@@ -77,15 +66,15 @@ func (api *RestApi) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
for _, node := range req.Nodes {
state := determineState(node.States)
nodeState := schema.Node{
nodeState := schema.NodeStateDB{
TimeStamp: time.Now().Unix(), NodeState: state,
Hostname: node.Name, Cluster: req.Cluster,
CpusAllocated: node.CpusAllocated, CpusTotal: node.CpusTotal,
MemoryAllocated: node.MemoryAllocated, MemoryTotal: node.MemoryTotal,
GpusAllocated: node.GpusAllocated, GpusTotal: node.GpusTotal,
HealthState: schema.MonitoringStateFull,
CpusAllocated: node.CpusAllocated,
MemoryAllocated: node.MemoryAllocated,
GpusAllocated: node.GpusAllocated,
HealthState: schema.MonitoringStateFull,
JobsRunning: node.JobsRunning,
}
repo.InsertNodeState(&nodeState)
repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState)
}
}

View File

@@ -112,7 +112,7 @@ type ComplexityRoot struct {
}
Job struct {
ArrayJobId func(childComplexity int) int
ArrayJobID func(childComplexity int) int
Cluster func(childComplexity int) int
ConcurrentJobs func(childComplexity int) int
Duration func(childComplexity int) int
@@ -272,14 +272,17 @@ type ComplexityRoot struct {
}
Node struct {
Cluster func(childComplexity int) int
HealthState func(childComplexity int) int
Hostname func(childComplexity int) int
ID func(childComplexity int) int
MetaData func(childComplexity int) int
NodeState func(childComplexity int) int
RunningJobs func(childComplexity int) int
SubCluster func(childComplexity int) int
Cluster func(childComplexity int) int
CpusAllocated func(childComplexity int) int
GpusAllocated func(childComplexity int) int
HealthState func(childComplexity int) int
Hostname func(childComplexity int) int
ID func(childComplexity int) int
JobsRunning func(childComplexity int) int
MemoryAllocated func(childComplexity int) int
MetaData func(childComplexity int) int
SchedulerState func(childComplexity int) int
SubCluster func(childComplexity int) int
}
NodeMetrics struct {
@@ -447,9 +450,10 @@ type MutationResolver interface {
UpdateConfiguration(ctx context.Context, name string, value string) (*string, error)
}
type NodeResolver interface {
RunningJobs(ctx context.Context, obj *schema.Node) (int, error)
NodeState(ctx context.Context, obj *schema.Node) (string, error)
HealthState(ctx context.Context, obj *schema.Node) (schema.NodeState, error)
ID(ctx context.Context, obj *schema.Node) (string, error)
SchedulerState(ctx context.Context, obj *schema.Node) (schema.SchedulerState, error)
HealthState(ctx context.Context, obj *schema.Node) (string, error)
MetaData(ctx context.Context, obj *schema.Node) (any, error)
}
type QueryResolver interface {
@@ -686,11 +690,11 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin
return e.complexity.IntRangeOutput.To(childComplexity), true
case "Job.arrayJobId":
if e.complexity.Job.ArrayJobId == nil {
if e.complexity.Job.ArrayJobID == nil {
break
}
return e.complexity.Job.ArrayJobId(childComplexity), true
return e.complexity.Job.ArrayJobID(childComplexity), true
case "Job.cluster":
if e.complexity.Job.Cluster == nil {
@@ -1485,6 +1489,20 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin
return e.complexity.Node.Cluster(childComplexity), true
case "Node.cpusAllocated":
if e.complexity.Node.CpusAllocated == nil {
break
}
return e.complexity.Node.CpusAllocated(childComplexity), true
case "Node.gpusAllocated":
if e.complexity.Node.GpusAllocated == nil {
break
}
return e.complexity.Node.GpusAllocated(childComplexity), true
case "Node.healthState":
if e.complexity.Node.HealthState == nil {
break
@@ -1506,6 +1524,20 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin
return e.complexity.Node.ID(childComplexity), true
case "Node.jobsRunning":
if e.complexity.Node.JobsRunning == nil {
break
}
return e.complexity.Node.JobsRunning(childComplexity), true
case "Node.memoryAllocated":
if e.complexity.Node.MemoryAllocated == nil {
break
}
return e.complexity.Node.MemoryAllocated(childComplexity), true
case "Node.metaData":
if e.complexity.Node.MetaData == nil {
break
@@ -1513,19 +1545,12 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin
return e.complexity.Node.MetaData(childComplexity), true
case "Node.nodeState":
if e.complexity.Node.NodeState == nil {
case "Node.schedulerState":
if e.complexity.Node.SchedulerState == nil {
break
}
return e.complexity.Node.NodeState(childComplexity), true
case "Node.runningJobs":
if e.complexity.Node.RunningJobs == nil {
break
}
return e.complexity.Node.RunningJobs(childComplexity), true
return e.complexity.Node.SchedulerState(childComplexity), true
case "Node.subCluster":
if e.complexity.Node.SubCluster == nil {
@@ -2343,7 +2368,7 @@ scalar Any
scalar NullableFloat
scalar MetricScope
scalar JobState
scalar NodeState
scalar SchedulerState
scalar MonitoringState
type Node {
@@ -2351,8 +2376,11 @@ type Node {
hostname: String!
cluster: String!
subCluster: String!
runningJobs: Int!
nodeState: NodeState!
jobsRunning: Int!
cpusAllocated: Int
memoryAllocated: Int
gpusAllocated: Int
schedulerState: SchedulerState!
healthState: MonitoringState!
metaData: Any
}
@@ -2700,7 +2728,7 @@ type Query {
from: Time!
to: Time!
): [NodeMetrics!]!
nodeMetricsList(
cluster: String!
subCluster: String!
@@ -2738,7 +2766,7 @@ input NodeFilter {
hostname: StringInput
cluster: StringInput
subcluster: StringInput
nodeState: NodeState
schedulerState: SchedulerState
healthState: MonitoringState
}
@@ -5341,7 +5369,7 @@ func (ec *executionContext) _Job_arrayJobId(ctx context.Context, field graphql.C
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) {
ctx = rctx // use context from middleware stack in children
return obj.ArrayJobId, nil
return obj.ArrayJobID, nil
})
if err != nil {
ec.Error(ctx, err)
@@ -9694,7 +9722,7 @@ func (ec *executionContext) _Node_id(ctx context.Context, field graphql.Collecte
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) {
ctx = rctx // use context from middleware stack in children
return obj.ID, nil
return ec.resolvers.Node().ID(rctx, obj)
})
if err != nil {
ec.Error(ctx, err)
@@ -9706,17 +9734,17 @@ func (ec *executionContext) _Node_id(ctx context.Context, field graphql.Collecte
}
return graphql.Null
}
res := resTmp.(int64)
res := resTmp.(string)
fc.Result = res
return ec.marshalNID2int64(ctx, field.Selections, res)
return ec.marshalNID2string(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Node_id(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Node",
Field: field,
IsMethod: false,
IsResolver: false,
IsMethod: true,
IsResolver: true,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type ID does not have child fields")
},
@@ -9856,8 +9884,8 @@ func (ec *executionContext) fieldContext_Node_subCluster(_ context.Context, fiel
return fc, nil
}
func (ec *executionContext) _Node_runningJobs(ctx context.Context, field graphql.CollectedField, obj *schema.Node) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Node_runningJobs(ctx, field)
func (ec *executionContext) _Node_jobsRunning(ctx context.Context, field graphql.CollectedField, obj *schema.Node) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Node_jobsRunning(ctx, field)
if err != nil {
return graphql.Null
}
@@ -9870,7 +9898,7 @@ func (ec *executionContext) _Node_runningJobs(ctx context.Context, field graphql
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.Node().RunningJobs(rctx, obj)
return obj.JobsRunning, nil
})
if err != nil {
ec.Error(ctx, err)
@@ -9887,12 +9915,12 @@ func (ec *executionContext) _Node_runningJobs(ctx context.Context, field graphql
return ec.marshalNInt2int(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Node_runningJobs(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
func (ec *executionContext) fieldContext_Node_jobsRunning(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Node",
Field: field,
IsMethod: true,
IsResolver: true,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Int does not have child fields")
},
@@ -9900,8 +9928,8 @@ func (ec *executionContext) fieldContext_Node_runningJobs(_ context.Context, fie
return fc, nil
}
func (ec *executionContext) _Node_nodeState(ctx context.Context, field graphql.CollectedField, obj *schema.Node) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Node_nodeState(ctx, field)
func (ec *executionContext) _Node_cpusAllocated(ctx context.Context, field graphql.CollectedField, obj *schema.Node) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Node_cpusAllocated(ctx, field)
if err != nil {
return graphql.Null
}
@@ -9914,7 +9942,130 @@ func (ec *executionContext) _Node_nodeState(ctx context.Context, field graphql.C
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.Node().NodeState(rctx, obj)
return obj.CpusAllocated, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
return graphql.Null
}
res := resTmp.(int)
fc.Result = res
return ec.marshalOInt2int(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Node_cpusAllocated(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Node",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Int does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _Node_memoryAllocated(ctx context.Context, field graphql.CollectedField, obj *schema.Node) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Node_memoryAllocated(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) {
ctx = rctx // use context from middleware stack in children
return obj.MemoryAllocated, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
return graphql.Null
}
res := resTmp.(int)
fc.Result = res
return ec.marshalOInt2int(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Node_memoryAllocated(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Node",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Int does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _Node_gpusAllocated(ctx context.Context, field graphql.CollectedField, obj *schema.Node) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Node_gpusAllocated(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) {
ctx = rctx // use context from middleware stack in children
return obj.GpusAllocated, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
return graphql.Null
}
res := resTmp.(int)
fc.Result = res
return ec.marshalOInt2int(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Node_gpusAllocated(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Node",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Int does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _Node_schedulerState(ctx context.Context, field graphql.CollectedField, obj *schema.Node) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Node_schedulerState(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) {
ctx = rctx // use context from middleware stack in children
return ec.resolvers.Node().SchedulerState(rctx, obj)
})
if err != nil {
ec.Error(ctx, err)
@@ -9926,19 +10077,19 @@ func (ec *executionContext) _Node_nodeState(ctx context.Context, field graphql.C
}
return graphql.Null
}
res := resTmp.(string)
res := resTmp.(schema.SchedulerState)
fc.Result = res
return ec.marshalNNodeState2string(ctx, field.Selections, res)
return ec.marshalNSchedulerState2githubᚗcomᚋClusterCockpitᚋccᚑlibᚋschemaᚐSchedulerState(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Node_nodeState(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
func (ec *executionContext) fieldContext_Node_schedulerState(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Node",
Field: field,
IsMethod: true,
IsResolver: true,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type NodeState does not have child fields")
return nil, errors.New("field of type SchedulerState does not have child fields")
},
}
return fc, nil
@@ -9970,9 +10121,9 @@ func (ec *executionContext) _Node_healthState(ctx context.Context, field graphql
}
return graphql.Null
}
res := resTmp.(schema.NodeState)
res := resTmp.(string)
fc.Result = res
return ec.marshalNMonitoringState2githubᚗcomᚋClusterCockpitᚋccᚑlibᚋschemaᚐNodeState(ctx, field.Selections, res)
return ec.marshalNMonitoringState2string(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Node_healthState(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
@@ -10216,10 +10367,16 @@ func (ec *executionContext) fieldContext_NodeStateResultList_items(_ context.Con
return ec.fieldContext_Node_cluster(ctx, field)
case "subCluster":
return ec.fieldContext_Node_subCluster(ctx, field)
case "runningJobs":
return ec.fieldContext_Node_runningJobs(ctx, field)
case "nodeState":
return ec.fieldContext_Node_nodeState(ctx, field)
case "jobsRunning":
return ec.fieldContext_Node_jobsRunning(ctx, field)
case "cpusAllocated":
return ec.fieldContext_Node_cpusAllocated(ctx, field)
case "memoryAllocated":
return ec.fieldContext_Node_memoryAllocated(ctx, field)
case "gpusAllocated":
return ec.fieldContext_Node_gpusAllocated(ctx, field)
case "schedulerState":
return ec.fieldContext_Node_schedulerState(ctx, field)
case "healthState":
return ec.fieldContext_Node_healthState(ctx, field)
case "metaData":
@@ -10944,10 +11101,16 @@ func (ec *executionContext) fieldContext_Query_node(ctx context.Context, field g
return ec.fieldContext_Node_cluster(ctx, field)
case "subCluster":
return ec.fieldContext_Node_subCluster(ctx, field)
case "runningJobs":
return ec.fieldContext_Node_runningJobs(ctx, field)
case "nodeState":
return ec.fieldContext_Node_nodeState(ctx, field)
case "jobsRunning":
return ec.fieldContext_Node_jobsRunning(ctx, field)
case "cpusAllocated":
return ec.fieldContext_Node_cpusAllocated(ctx, field)
case "memoryAllocated":
return ec.fieldContext_Node_memoryAllocated(ctx, field)
case "gpusAllocated":
return ec.fieldContext_Node_gpusAllocated(ctx, field)
case "schedulerState":
return ec.fieldContext_Node_schedulerState(ctx, field)
case "healthState":
return ec.fieldContext_Node_healthState(ctx, field)
case "metaData":
@@ -16667,7 +16830,7 @@ func (ec *executionContext) unmarshalInputNodeFilter(ctx context.Context, obj an
asMap[k] = v
}
fieldsInOrder := [...]string{"hostname", "cluster", "subcluster", "nodeState", "healthState"}
fieldsInOrder := [...]string{"hostname", "cluster", "subcluster", "schedulerState", "healthState"}
for _, k := range fieldsInOrder {
v, ok := asMap[k]
if !ok {
@@ -16695,16 +16858,16 @@ func (ec *executionContext) unmarshalInputNodeFilter(ctx context.Context, obj an
return it, err
}
it.Subcluster = data
case "nodeState":
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("nodeState"))
data, err := ec.unmarshalONodeState2ᚖstring(ctx, v)
case "schedulerState":
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("schedulerState"))
data, err := ec.unmarshalOSchedulerState2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑlibᚋschemaᚐSchedulerState(ctx, v)
if err != nil {
return it, err
}
it.NodeState = data
it.SchedulerState = data
case "healthState":
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("healthState"))
data, err := ec.unmarshalOMonitoringState2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑlibᚋschemaᚐNodeState(ctx, v)
data, err := ec.unmarshalOMonitoringState2ᚖstring(ctx, v)
if err != nil {
return it, err
}
@@ -18736,26 +18899,6 @@ func (ec *executionContext) _Node(ctx context.Context, sel ast.SelectionSet, obj
case "__typename":
out.Values[i] = graphql.MarshalString("Node")
case "id":
out.Values[i] = ec._Node_id(ctx, field, obj)
if out.Values[i] == graphql.Null {
atomic.AddUint32(&out.Invalids, 1)
}
case "hostname":
out.Values[i] = ec._Node_hostname(ctx, field, obj)
if out.Values[i] == graphql.Null {
atomic.AddUint32(&out.Invalids, 1)
}
case "cluster":
out.Values[i] = ec._Node_cluster(ctx, field, obj)
if out.Values[i] == graphql.Null {
atomic.AddUint32(&out.Invalids, 1)
}
case "subCluster":
out.Values[i] = ec._Node_subCluster(ctx, field, obj)
if out.Values[i] == graphql.Null {
atomic.AddUint32(&out.Invalids, 1)
}
case "runningJobs":
field := field
innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) {
@@ -18764,7 +18907,7 @@ func (ec *executionContext) _Node(ctx context.Context, sel ast.SelectionSet, obj
ec.Error(ctx, ec.Recover(ctx, r))
}
}()
res = ec._Node_runningJobs(ctx, field, obj)
res = ec._Node_id(ctx, field, obj)
if res == graphql.Null {
atomic.AddUint32(&fs.Invalids, 1)
}
@@ -18791,7 +18934,33 @@ func (ec *executionContext) _Node(ctx context.Context, sel ast.SelectionSet, obj
}
out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) })
case "nodeState":
case "hostname":
out.Values[i] = ec._Node_hostname(ctx, field, obj)
if out.Values[i] == graphql.Null {
atomic.AddUint32(&out.Invalids, 1)
}
case "cluster":
out.Values[i] = ec._Node_cluster(ctx, field, obj)
if out.Values[i] == graphql.Null {
atomic.AddUint32(&out.Invalids, 1)
}
case "subCluster":
out.Values[i] = ec._Node_subCluster(ctx, field, obj)
if out.Values[i] == graphql.Null {
atomic.AddUint32(&out.Invalids, 1)
}
case "jobsRunning":
out.Values[i] = ec._Node_jobsRunning(ctx, field, obj)
if out.Values[i] == graphql.Null {
atomic.AddUint32(&out.Invalids, 1)
}
case "cpusAllocated":
out.Values[i] = ec._Node_cpusAllocated(ctx, field, obj)
case "memoryAllocated":
out.Values[i] = ec._Node_memoryAllocated(ctx, field, obj)
case "gpusAllocated":
out.Values[i] = ec._Node_gpusAllocated(ctx, field, obj)
case "schedulerState":
field := field
innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) {
@@ -18800,7 +18969,7 @@ func (ec *executionContext) _Node(ctx context.Context, sel ast.SelectionSet, obj
ec.Error(ctx, ec.Recover(ctx, r))
}
}()
res = ec._Node_nodeState(ctx, field, obj)
res = ec._Node_schedulerState(ctx, field, obj)
if res == graphql.Null {
atomic.AddUint32(&fs.Invalids, 1)
}
@@ -21654,15 +21823,14 @@ func (ec *executionContext) marshalNMetricValue2githubᚗcomᚋClusterCockpitᚋ
return ec._MetricValue(ctx, sel, &v)
}
func (ec *executionContext) unmarshalNMonitoringState2githubᚗcomᚋClusterCockpitᚋccᚑlibᚋschemaᚐNodeState(ctx context.Context, v any) (schema.NodeState, error) {
tmp, err := graphql.UnmarshalString(v)
res := schema.NodeState(tmp)
func (ec *executionContext) unmarshalNMonitoringState2string(ctx context.Context, v any) (string, error) {
res, err := graphql.UnmarshalString(v)
return res, graphql.ErrorOnPath(ctx, err)
}
func (ec *executionContext) marshalNMonitoringState2githubᚗcomᚋClusterCockpitᚋccᚑlibᚋschemaᚐNodeState(ctx context.Context, sel ast.SelectionSet, v schema.NodeState) graphql.Marshaler {
func (ec *executionContext) marshalNMonitoringState2string(ctx context.Context, sel ast.SelectionSet, v string) graphql.Marshaler {
_ = sel
res := graphql.MarshalString(string(v))
res := graphql.MarshalString(v)
if res == graphql.Null {
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
@@ -21892,22 +22060,6 @@ func (ec *executionContext) marshalNNodeMetrics2ᚖgithubᚗcomᚋClusterCockpit
return ec._NodeMetrics(ctx, sel, v)
}
func (ec *executionContext) unmarshalNNodeState2string(ctx context.Context, v any) (string, error) {
res, err := graphql.UnmarshalString(v)
return res, graphql.ErrorOnPath(ctx, err)
}
func (ec *executionContext) marshalNNodeState2string(ctx context.Context, sel ast.SelectionSet, v string) graphql.Marshaler {
_ = sel
res := graphql.MarshalString(v)
if res == graphql.Null {
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
}
}
return res
}
func (ec *executionContext) marshalNNodeStateResultList2githubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐNodeStateResultList(ctx context.Context, sel ast.SelectionSet, v model.NodeStateResultList) graphql.Marshaler {
return ec._NodeStateResultList(ctx, sel, &v)
}
@@ -22084,6 +22236,23 @@ func (ec *executionContext) marshalNResource2ᚖgithubᚗcomᚋClusterCockpitᚋ
return ec._Resource(ctx, sel, v)
}
func (ec *executionContext) unmarshalNSchedulerState2githubᚗcomᚋClusterCockpitᚋccᚑlibᚋschemaᚐSchedulerState(ctx context.Context, v any) (schema.SchedulerState, error) {
tmp, err := graphql.UnmarshalString(v)
res := schema.SchedulerState(tmp)
return res, graphql.ErrorOnPath(ctx, err)
}
func (ec *executionContext) marshalNSchedulerState2githubᚗcomᚋClusterCockpitᚋccᚑlibᚋschemaᚐSchedulerState(ctx context.Context, sel ast.SelectionSet, v schema.SchedulerState) graphql.Marshaler {
_ = sel
res := graphql.MarshalString(string(v))
if res == graphql.Null {
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
}
}
return res
}
func (ec *executionContext) marshalNScopedStats2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐScopedStatsᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.ScopedStats) graphql.Marshaler {
ret := make(graphql.Array, len(v))
var wg sync.WaitGroup
@@ -22942,6 +23111,18 @@ func (ec *executionContext) marshalOID2ᚕstringᚄ(ctx context.Context, sel ast
return ret
}
func (ec *executionContext) unmarshalOInt2int(ctx context.Context, v any) (int, error) {
res, err := graphql.UnmarshalInt(v)
return res, graphql.ErrorOnPath(ctx, err)
}
func (ec *executionContext) marshalOInt2int(ctx context.Context, sel ast.SelectionSet, v int) graphql.Marshaler {
_ = sel
_ = ctx
res := graphql.MarshalInt(v)
return res
}
func (ec *executionContext) unmarshalOInt2ᚕintᚄ(ctx context.Context, v any) ([]int, error) {
if v == nil {
return nil, nil
@@ -23249,22 +23430,21 @@ func (ec *executionContext) marshalOMetricStatistics2githubᚗcomᚋClusterCockp
return ec._MetricStatistics(ctx, sel, &v)
}
func (ec *executionContext) unmarshalOMonitoringState2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑlibᚋschemaᚐNodeState(ctx context.Context, v any) (*schema.NodeState, error) {
func (ec *executionContext) unmarshalOMonitoringState2ᚖstring(ctx context.Context, v any) (*string, error) {
if v == nil {
return nil, nil
}
tmp, err := graphql.UnmarshalString(v)
res := schema.NodeState(tmp)
res, err := graphql.UnmarshalString(v)
return &res, graphql.ErrorOnPath(ctx, err)
}
func (ec *executionContext) marshalOMonitoringState2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑlibᚋschemaᚐNodeState(ctx context.Context, sel ast.SelectionSet, v *schema.NodeState) graphql.Marshaler {
func (ec *executionContext) marshalOMonitoringState2ᚖstring(ctx context.Context, sel ast.SelectionSet, v *string) graphql.Marshaler {
if v == nil {
return graphql.Null
}
_ = sel
_ = ctx
res := graphql.MarshalString(string(*v))
res := graphql.MarshalString(*v)
return res
}
@@ -23293,24 +23473,6 @@ func (ec *executionContext) unmarshalONodeFilter2ᚕᚖgithubᚗcomᚋClusterCoc
return res, nil
}
func (ec *executionContext) unmarshalONodeState2ᚖstring(ctx context.Context, v any) (*string, error) {
if v == nil {
return nil, nil
}
res, err := graphql.UnmarshalString(v)
return &res, graphql.ErrorOnPath(ctx, err)
}
func (ec *executionContext) marshalONodeState2ᚖstring(ctx context.Context, sel ast.SelectionSet, v *string) graphql.Marshaler {
if v == nil {
return graphql.Null
}
_ = sel
_ = ctx
res := graphql.MarshalString(*v)
return res
}
func (ec *executionContext) unmarshalOOrderByInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐOrderByInput(ctx context.Context, v any) (*model.OrderByInput, error) {
if v == nil {
return nil, nil
@@ -23327,6 +23489,25 @@ func (ec *executionContext) unmarshalOPageRequest2ᚖgithubᚗcomᚋClusterCockp
return &res, graphql.ErrorOnPath(ctx, err)
}
func (ec *executionContext) unmarshalOSchedulerState2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑlibᚋschemaᚐSchedulerState(ctx context.Context, v any) (*schema.SchedulerState, error) {
if v == nil {
return nil, nil
}
tmp, err := graphql.UnmarshalString(v)
res := schema.SchedulerState(tmp)
return &res, graphql.ErrorOnPath(ctx, err)
}
func (ec *executionContext) marshalOSchedulerState2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑlibᚋschemaᚐSchedulerState(ctx context.Context, sel ast.SelectionSet, v *schema.SchedulerState) graphql.Marshaler {
if v == nil {
return graphql.Null
}
_ = sel
_ = ctx
res := graphql.MarshalString(string(*v))
return res
}
func (ec *executionContext) marshalOSeries2ᚕgithubᚗcomᚋClusterCockpitᚋccᚑlibᚋschemaᚐSeriesᚄ(ctx context.Context, sel ast.SelectionSet, v []schema.Series) graphql.Marshaler {
if v == nil {
return graphql.Null

View File

@@ -2,4 +2,5 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package model

View File

@@ -171,11 +171,11 @@ type NamedStatsWithScope struct {
}
type NodeFilter struct {
Hostname *StringInput `json:"hostname,omitempty"`
Cluster *StringInput `json:"cluster,omitempty"`
Subcluster *StringInput `json:"subcluster,omitempty"`
NodeState *string `json:"nodeState,omitempty"`
HealthState *schema.NodeState `json:"healthState,omitempty"`
Hostname *StringInput `json:"hostname,omitempty"`
Cluster *StringInput `json:"cluster,omitempty"`
Subcluster *StringInput `json:"subcluster,omitempty"`
SchedulerState *schema.SchedulerState `json:"schedulerState,omitempty"`
HealthState *string `json:"healthState,omitempty"`
}
type NodeMetrics struct {

View File

@@ -305,19 +305,18 @@ func (r *mutationResolver) UpdateConfiguration(ctx context.Context, name string,
return nil, nil
}
// RunningJobs is the resolver for the runningJobs field.
func (r *nodeResolver) RunningJobs(ctx context.Context, obj *schema.Node) (int, error) {
panic(fmt.Errorf("not implemented: RunningJobs - runningJobs"))
// ID is the resolver for the id field.
func (r *nodeResolver) ID(ctx context.Context, obj *schema.Node) (string, error) {
panic(fmt.Errorf("not implemented: ID - id"))
}
// NodeState is the resolver for the nodeState field.
func (r *nodeResolver) NodeState(ctx context.Context, obj *schema.Node) (string, error) {
return string(obj.NodeState), nil
// SchedulerState is the resolver for the schedulerState field.
func (r *nodeResolver) SchedulerState(ctx context.Context, obj *schema.Node) (schema.SchedulerState, error) {
panic(fmt.Errorf("not implemented: SchedulerState - schedulerState"))
}
// HealthState is the resolver for the healthState field.
func (r *nodeResolver) HealthState(ctx context.Context, obj *schema.Node) (schema.NodeState, error) {
// FIXME: Why is Output of schema.NodeState Type?
func (r *nodeResolver) HealthState(ctx context.Context, obj *schema.Node) (string, error) {
panic(fmt.Errorf("not implemented: HealthState - healthState"))
}
@@ -367,43 +366,17 @@ func (r *queryResolver) AllocatedNodes(ctx context.Context, cluster string) ([]*
// Node is the resolver for the node field.
func (r *queryResolver) Node(ctx context.Context, id string) (*schema.Node, error) {
repo := repository.GetNodeRepository()
numericId, err := strconv.ParseInt(id, 10, 64)
if err != nil {
cclog.Warn("Error while parsing job id")
return nil, err
}
return repo.GetNode(numericId, false)
panic(fmt.Errorf("not implemented: Node - node"))
}
// Nodes is the resolver for the nodes field.
func (r *queryResolver) Nodes(ctx context.Context, filter []*model.NodeFilter, order *model.OrderByInput) (*model.NodeStateResultList, error) {
repo := repository.GetNodeRepository()
nodes, err := repo.QueryNodes(ctx, filter, order)
count := len(nodes)
return &model.NodeStateResultList{Items: nodes, Count: &count}, err
panic(fmt.Errorf("not implemented: Nodes - nodes"))
}
// NodeStates is the resolver for the nodeStates field.
func (r *queryResolver) NodeStates(ctx context.Context, filter []*model.NodeFilter) ([]*model.NodeStates, error) {
repo := repository.GetNodeRepository()
stateCounts, serr := repo.CountNodeStates(ctx, filter)
if serr != nil {
cclog.Warnf("Error while counting nodeStates: %s", serr.Error())
return nil, serr
}
healthCounts, herr := repo.CountHealthStates(ctx, filter)
if herr != nil {
cclog.Warnf("Error while counting healthStates: %s", herr.Error())
return nil, herr
}
allCounts := make([]*model.NodeStates, 0)
allCounts = append(stateCounts, healthCounts...)
return allCounts, nil
panic(fmt.Errorf("not implemented: NodeStates - nodeStates"))
}
// Job is the resolver for the job field.

View File

@@ -52,7 +52,7 @@ func GetJobRepository() *JobRepository {
}
var jobColumns []string = []string{
"job.id", "job.job_id", "job.hpc_user", "job.project", "job.hpc_cluster", "job.subcluster",
"job.id", "job.job_id", "job.hpc_user", "job.project", "job.cluster", "job.subcluster",
"job.start_time", "job.cluster_partition", "job.array_job_id", "job.num_nodes",
"job.num_hwthreads", "job.num_acc", "job.shared", "job.monitoring_status",
"job.smt", "job.job_state", "job.duration", "job.walltime", "job.resources",
@@ -60,7 +60,7 @@ var jobColumns []string = []string{
}
var jobCacheColumns []string = []string{
"job_cache.id", "job_cache.job_id", "job_cache.hpc_user", "job_cache.project", "job_cache.hpc_cluster",
"job_cache.id", "job_cache.job_id", "job_cache.hpc_user", "job_cache.project", "job_cache.cluster",
"job_cache.subcluster", "job_cache.start_time", "job_cache.cluster_partition",
"job_cache.array_job_id", "job_cache.num_nodes", "job_cache.num_hwthreads",
"job_cache.num_acc", "job_cache.shared", "job_cache.monitoring_status", "job_cache.smt",
@@ -73,7 +73,7 @@ func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) {
if err := row.Scan(
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster,
&job.StartTime, &job.Partition, &job.ArrayJobId, &job.NumNodes, &job.NumHWThreads,
&job.StartTime, &job.Partition, &job.ArrayJobID, &job.NumNodes, &job.NumHWThreads,
&job.NumAcc, &job.Shared, &job.MonitoringStatus, &job.SMT, &job.State,
&job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint, &job.Energy); err != nil {
cclog.Warnf("Error while scanning rows (Job): %v", err)
@@ -390,7 +390,7 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) {
start := time.Now()
partitions := r.cache.Get("partitions:"+cluster, func() (any, time.Duration, int) {
parts := []string{}
if err = r.DB.Select(&parts, `SELECT DISTINCT job.cluster_partition FROM job WHERE job.hpc_cluster = ?;`, cluster); err != nil {
if err = r.DB.Select(&parts, `SELECT DISTINCT job.cluster_partition FROM job WHERE job.cluster = ?;`, cluster); err != nil {
return nil, 0, 1000
}
@@ -410,7 +410,7 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
subclusters := make(map[string]map[string]int)
rows, err := sq.Select("resources", "subcluster").From("job").
Where("job.job_state = 'running'").
Where("job.hpc_cluster = ?", cluster).
Where("job.cluster = ?", cluster).
RunWith(r.stmtCache).Query()
if err != nil {
cclog.Error("Error while running query")
@@ -505,7 +505,7 @@ func (r *JobRepository) FindJobIdsByTag(tagId int64) ([]int64, error) {
// FIXME: Reconsider filtering short jobs with harcoded threshold
func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) {
query := sq.Select(jobColumns...).From("job").
Where(fmt.Sprintf("job.hpc_cluster = '%s'", cluster)).
Where(fmt.Sprintf("job.cluster = '%s'", cluster)).
Where("job.job_state = 'running'").
Where("job.duration > 600")
@@ -587,7 +587,7 @@ func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32
Where("job.id = ?", job)
_, err = stmt.RunWith(r.stmtCache).Exec()
return
return err
}
func (r *JobRepository) Execute(stmt sq.UpdateBuilder) error {

View File

@@ -14,18 +14,18 @@ import (
)
const NamedJobCacheInsert string = `INSERT INTO job_cache (
job_id, hpc_user, project, hpc_cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc,
job_id, hpc_user, project, cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc,
shared, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data
) VALUES (
:job_id, :hpc_user, :project, :hpc_cluster, :subcluster, :cluster_partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:job_id, :hpc_user, :project, :cluster, :subcluster, :cluster_partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:shared, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
);`
const NamedJobInsert string = `INSERT INTO job (
job_id, hpc_user, project, hpc_cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc,
job_id, hpc_user, project, cluster, subcluster, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc,
shared, monitoring_status, smt, job_state, start_time, duration, walltime, footprint, energy, energy_footprint, resources, meta_data
) VALUES (
:job_id, :hpc_user, :project, :hpc_cluster, :subcluster, :cluster_partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:job_id, :hpc_user, :project, :cluster, :subcluster, :cluster_partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:shared, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
);`
@@ -70,7 +70,7 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
}
_, err = r.DB.Exec(
"INSERT INTO job (job_id, hpc_cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, hpc_cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache")
"INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache")
if err != nil {
cclog.Warnf("Error while Job sync: %v", err)
return nil, err
@@ -120,7 +120,7 @@ func (r *JobRepository) Stop(
Where("job.id = ?", jobId)
_, err = stmt.RunWith(r.stmtCache).Exec()
return
return err
}
func (r *JobRepository) StopCached(
@@ -136,5 +136,5 @@ func (r *JobRepository) StopCached(
Where("job.id = ?", jobId)
_, err = stmt.RunWith(r.stmtCache).Exec()
return
return err
}

View File

@@ -31,7 +31,7 @@ func (r *JobRepository) Find(
Where("job.job_id = ?", *jobId)
if cluster != nil {
q = q.Where("job.hpc_cluster = ?", *cluster)
q = q.Where("job.cluster = ?", *cluster)
}
if startTime != nil {
q = q.Where("job.start_time = ?", *startTime)
@@ -52,7 +52,7 @@ func (r *JobRepository) FindCached(
Where("job_cache.job_id = ?", *jobId)
if cluster != nil {
q = q.Where("job_cache.hpc_cluster = ?", *cluster)
q = q.Where("job_cache.cluster = ?", *cluster)
}
if startTime != nil {
q = q.Where("job_cache.start_time = ?", *startTime)
@@ -78,7 +78,7 @@ func (r *JobRepository) FindAll(
Where("job.job_id = ?", *jobId)
if cluster != nil {
q = q.Where("job.hpc_cluster = ?", *cluster)
q = q.Where("job.cluster = ?", *cluster)
}
if startTime != nil {
q = q.Where("job.start_time = ?", *startTime)
@@ -183,7 +183,7 @@ func (r *JobRepository) FindByJobId(ctx context.Context, jobId int64, startTime
q := sq.Select(jobColumns...).
From("job").
Where("job.job_id = ?", jobId).
Where("job.hpc_cluster = ?", cluster).
Where("job.cluster = ?", cluster).
Where("job.start_time = ?", startTime)
q, qerr := SecurityCheck(ctx, q)
@@ -203,7 +203,7 @@ func (r *JobRepository) IsJobOwner(jobId int64, startTime int64, user string, cl
From("job").
Where("job.job_id = ?", jobId).
Where("job.hpc_user = ?", user).
Where("job.hpc_cluster = ?", cluster).
Where("job.cluster = ?", cluster).
Where("job.start_time = ?", startTime)
_, err := scanJob(q.RunWith(r.stmtCache).QueryRow())

View File

@@ -168,7 +168,7 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
query = buildMetaJsonCondition("jobName", filter.JobName, query)
}
if filter.Cluster != nil {
query = buildStringCondition("job.hpc_cluster", filter.Cluster, query)
query = buildStringCondition("job.cluster", filter.Cluster, query)
}
if filter.Partition != nil {
query = buildStringCondition("job.cluster_partition", filter.Partition, query)

View File

@@ -2,6 +2,7 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (

View File

@@ -1,7 +1,7 @@
CREATE TABLE "job_cache" (
id INTEGER PRIMARY KEY,
job_id BIGINT NOT NULL,
hpc_cluster VARCHAR(255) NOT NULL,
cluster VARCHAR(255) NOT NULL,
subcluster VARCHAR(255) NOT NULL,
submit_time BIGINT NOT NULL DEFAULT 0, -- Unix timestamp
start_time BIGINT NOT NULL DEFAULT 0, -- Unix timestamp
@@ -30,13 +30,13 @@ CREATE TABLE "job_cache" (
energy REAL NOT NULL DEFAULT 0.0,
energy_footprint TEXT DEFAULT NULL,
footprint TEXT DEFAULT NULL,
UNIQUE (job_id, hpc_cluster, start_time)
UNIQUE (job_id, cluster, start_time)
);
CREATE TABLE "job_new" (
id INTEGER PRIMARY KEY,
job_id BIGINT NOT NULL,
hpc_cluster TEXT NOT NULL,
cluster TEXT NOT NULL,
subcluster TEXT NOT NULL,
submit_time BIGINT NOT NULL DEFAULT 0, -- Unix timestamp
start_time BIGINT NOT NULL DEFAULT 0, -- Unix timestamp
@@ -65,10 +65,9 @@ CREATE TABLE "job_new" (
energy REAL NOT NULL DEFAULT 0.0,
energy_footprint TEXT DEFAULT NULL,
footprint TEXT DEFAULT NULL,
UNIQUE (job_id, hpc_cluster, start_time)
UNIQUE (job_id, cluster, start_time)
);
ALTER TABLE job RENAME COLUMN cluster TO hpc_cluster;
CREATE TABLE IF NOT EXISTS lookup_exclusive (
id INTEGER PRIMARY KEY,
@@ -76,20 +75,43 @@ CREATE TABLE IF NOT EXISTS lookup_exclusive (
);
INSERT INTO lookup_exclusive (id, name) VALUES
(0, 'multi_user'),
(1, 'none'),
(2, 'single_user');
(0, 'multi_user'),
(1, 'none'),
(2, 'single_user');
INSERT INTO job_new (
id, job_id, hpc_cluster, subcluster, submit_time, start_time, hpc_user, project,
id, job_id, cluster, subcluster, submit_time, start_time, hpc_user, project,
cluster_partition, array_job_id, duration, walltime, job_state, meta_data, resources,
num_nodes, num_hwthreads, num_acc, smt, shared, monitoring_status, energy,
energy_footprint, footprint
) SELECT
id, job_id, hpc_cluster, subcluster, 0, start_time, hpc_user, project,
cluster_partition, array_job_id, duration, walltime, job_state, meta_data, resources,
num_nodes, num_hwthreads, num_acc, smt, (SELECT name FROM lookup_exclusive WHERE id=job.exclusive), monitoring_status, energy,
energy_footprint, footprint
id,
job_id,
cluster,
subcluster,
0,
start_time,
hpc_user,
project,
cluster_partition,
array_job_id,
duration,
walltime,
job_state,
meta_data,
resources,
num_nodes,
num_hwthreads,
num_acc,
smt,
(
SELECT name FROM lookup_exclusive
WHERE id = job.exclusive
),
monitoring_status,
energy,
energy_footprint,
footprint
FROM job;
DROP TABLE lookup_exclusive;

View File

@@ -1,5 +1,3 @@
-- sqlfluff:dialect:sqlite
--
CREATE TABLE "node" (
id INTEGER PRIMARY KEY,
hostname VARCHAR(255) NOT NULL,
@@ -13,9 +11,6 @@ CREATE TABLE "node_state" (
id INTEGER PRIMARY KEY,
time_stamp INTEGER NOT NULL,
jobs_running INTEGER DEFAULT 0 NOT NULL,
cpus_total INTEGER DEFAULT 0 NOT NULL,
memory_total INTEGER DEFAULT 0 NOT NULL,
gpus_total INTEGER DEFAULT 0 NOT NULL,
cpus_allocated INTEGER DEFAULT 0 NOT NULL,
memory_allocated INTEGER DEFAULT 0 NOT NULL,
gpus_allocated INTEGER DEFAULT 0 NOT NULL,
@@ -32,104 +27,23 @@ CREATE TABLE "node_state" (
FOREIGN KEY (node_id) REFERENCES node (id)
);
-- DROP indices using old column name "cluster"
DROP INDEX IF EXISTS jobs_cluster;
DROP INDEX IF EXISTS jobs_cluster_user;
DROP INDEX IF EXISTS jobs_cluster_project;
DROP INDEX IF EXISTS jobs_cluster_subcluster;
DROP INDEX IF EXISTS jobs_cluster_starttime;
DROP INDEX IF EXISTS jobs_cluster_duration;
DROP INDEX IF EXISTS jobs_cluster_numnodes;
DROP INDEX IF EXISTS jobs_cluster_numhwthreads;
DROP INDEX IF EXISTS jobs_cluster_numacc;
DROP INDEX IF EXISTS jobs_cluster_energy;
DROP INDEX IF EXISTS jobs_cluster_partition;
DROP INDEX IF EXISTS jobs_cluster_partition_starttime;
DROP INDEX IF EXISTS jobs_cluster_partition_duration;
DROP INDEX IF EXISTS jobs_cluster_partition_numnodes;
DROP INDEX IF EXISTS jobs_cluster_partition_numhwthreads;
DROP INDEX IF EXISTS jobs_cluster_partition_numacc;
DROP INDEX IF EXISTS jobs_cluster_partition_energy;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_user;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_project;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_starttime;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_duration;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_numnodes;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_numhwthreads;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_numacc;
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_energy;
DROP INDEX IF EXISTS jobs_cluster_jobstate;
DROP INDEX IF EXISTS jobs_cluster_jobstate_user;
DROP INDEX IF EXISTS jobs_cluster_jobstate_project;
DROP INDEX IF EXISTS jobs_cluster_jobstate_starttime;
DROP INDEX IF EXISTS jobs_cluster_jobstate_duration;
DROP INDEX IF EXISTS jobs_cluster_jobstate_numnodes;
DROP INDEX IF EXISTS jobs_cluster_jobstate_numhwthreads;
DROP INDEX IF EXISTS jobs_cluster_jobstate_numacc;
DROP INDEX IF EXISTS jobs_cluster_jobstate_energy;
-- -- CREATE UPDATED indices with new column names
-- Cluster Filter
CREATE INDEX IF NOT EXISTS jobs_cluster ON job (hpc_cluster);
CREATE INDEX IF NOT EXISTS jobs_cluster_user ON job (hpc_cluster, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_cluster_project ON job (hpc_cluster, project);
CREATE INDEX IF NOT EXISTS jobs_cluster_subcluster ON job (hpc_cluster, subcluster);
-- Cluster Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_starttime ON job (hpc_cluster, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_duration ON job (hpc_cluster, duration);
CREATE INDEX IF NOT EXISTS jobs_cluster_numnodes ON job (hpc_cluster, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_numhwthreads ON job (hpc_cluster, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_numacc ON job (hpc_cluster, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_energy ON job (hpc_cluster, energy);
-- Cluster+Partition Filter
CREATE INDEX IF NOT EXISTS jobs_cluster_partition ON job (hpc_cluster, cluster_partition);
-- Cluster+Partition Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime ON job (hpc_cluster, cluster_partition, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_duration ON job (hpc_cluster, cluster_partition, duration);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numnodes ON job (hpc_cluster, cluster_partition, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numhwthreads ON job (hpc_cluster, cluster_partition, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numacc ON job (hpc_cluster, cluster_partition, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_energy ON job (hpc_cluster, cluster_partition, energy);
-- Cluster+Partition+Jobstate Filter
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate ON job (hpc_cluster, cluster_partition, job_state);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_user ON job (hpc_cluster, cluster_partition, job_state, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_project ON job (hpc_cluster, cluster_partition, job_state, project);
-- Cluster+Partition+Jobstate Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_starttime ON job (hpc_cluster, cluster_partition, job_state, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_duration ON job (hpc_cluster, cluster_partition, job_state, duration);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_numnodes ON job (hpc_cluster, cluster_partition, job_state, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_numhwthreads ON job (hpc_cluster, cluster_partition, job_state, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_numacc ON job (hpc_cluster, cluster_partition, job_state, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_energy ON job (hpc_cluster, cluster_partition, job_state, energy);
-- Cluster+JobState Filter
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate ON job (hpc_cluster, job_state);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_user ON job (hpc_cluster, job_state, hpc_user);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_project ON job (hpc_cluster, job_state, project);
-- Cluster+JobState Filter Sorting
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_starttime ON job (hpc_cluster, job_state, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_duration ON job (hpc_cluster, job_state, duration);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numnodes ON job (hpc_cluster, job_state, num_nodes);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numhwthreads ON job (hpc_cluster, job_state, num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numacc ON job (hpc_cluster, job_state, num_acc);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_energy ON job (hpc_cluster, job_state, energy);
--- --- END UPDATE existing indices
-- Add NEW Indices For New Job Table Columns
CREATE INDEX IF NOT EXISTS jobs_cluster_submittime ON job (hpc_cluster, submit_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_submittime ON job (hpc_cluster, cluster_partition, submit_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_submittime ON job (hpc_cluster, cluster_partition, job_state, submit_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_submittime ON job (hpc_cluster, job_state, submit_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_submittime ON job (cluster, submit_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_submittime ON job (cluster, cluster_partition, submit_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_submittime ON job (
cluster, cluster_partition, job_state, submit_time
);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_submittime ON job (cluster, job_state, submit_time);
-- Add NEW Indices For New Node Table VARCHAR Fields
CREATE INDEX IF NOT EXISTS nodes_cluster ON node (cluster);
CREATE INDEX IF NOT EXISTS nodes_cluster_subcluster ON node (cluster, subcluster);
-- Add NEW Indices For New Node_State Table Fields
CREATE INDEX IF NOT EXISTS nodeStates_state ON node_state (node_state);
CREATE INDEX IF NOT EXISTS nodeStates_health ON node_state (health_state);
CREATE INDEX IF NOT EXISTS nodeStates_nodeid_state ON node (node_id, node_state);
CREATE INDEX IF NOT EXISTS nodeStates_nodeid_health ON node (node_id, health_state);
CREATE INDEX IF NOT EXISTS nodestates_state ON node_state (node_state);
CREATE INDEX IF NOT EXISTS nodestates_health ON node_state (health_state);
CREATE INDEX IF NOT EXISTS nodestates_nodeid_state ON node_state (node_id, node_state);
CREATE INDEX IF NOT EXISTS nodestates_nodeid_health ON node_state (node_id, health_state);
-- Add NEW Indices For Increased Amounts of Tags
CREATE INDEX IF NOT EXISTS tags_jobid ON jobtag (job_id);

View File

@@ -2,6 +2,7 @@
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
@@ -9,7 +10,6 @@ import (
"database/sql"
"encoding/json"
"fmt"
"maps"
"sync"
"time"
@@ -49,89 +49,87 @@ func GetNodeRepository() *NodeRepository {
return nodeRepoInstance
}
// "node.id,", "node.meta_data"
var nodeColumns []string = []string{"node.hostname", "node.cluster", "node.subcluster"}
func (r *NodeRepository) FetchMetadata(node *schema.Node) (map[string]string, error) {
func (r *NodeRepository) FetchMetadata(hostname string, cluster string) (map[string]string, error) {
start := time.Now()
cachekey := fmt.Sprintf("metadata:%d", node.ID)
if cached := r.cache.Get(cachekey, nil); cached != nil {
node.MetaData = cached.(map[string]string)
return node.MetaData, nil
}
if err := sq.Select("node.meta_data").From("node").Where("node.id = ?", node.ID).
RunWith(r.stmtCache).QueryRow().Scan(&node.RawMetaData); err != nil {
RawMetaData := make([]byte, 0)
if err := sq.Select("node.meta_data").From("node").
Where("node.hostname = ?", hostname).
Where("node.cluster = ?", cluster).
RunWith(r.stmtCache).QueryRow().Scan(&RawMetaData); err != nil {
cclog.Warn("Error while scanning for node metadata")
return nil, err
}
if len(node.RawMetaData) == 0 {
if len(RawMetaData) == 0 {
return nil, nil
}
if err := json.Unmarshal(node.RawMetaData, &node.MetaData); err != nil {
MetaData := make(map[string]string)
if err := json.Unmarshal(RawMetaData, &MetaData); err != nil {
cclog.Warn("Error while unmarshaling raw metadata json")
return nil, err
}
r.cache.Put(cachekey, node.MetaData, len(node.RawMetaData), 24*time.Hour)
cclog.Debugf("Timer FetchMetadata %s", time.Since(start))
return node.MetaData, nil
return MetaData, nil
}
func (r *NodeRepository) UpdateMetadata(node *schema.Node, key, val string) (err error) {
cachekey := fmt.Sprintf("metadata:%d", node.ID)
r.cache.Del(cachekey)
if node.MetaData == nil {
if _, err = r.FetchMetadata(node); err != nil {
cclog.Warnf("Error while fetching metadata for node, DB ID '%v'", node.ID)
return err
}
}
//
// func (r *NodeRepository) UpdateMetadata(node *schema.Node, key, val string) (err error) {
// cachekey := fmt.Sprintf("metadata:%d", node.ID)
// r.cache.Del(cachekey)
// if node.MetaData == nil {
// if _, err = r.FetchMetadata(node); err != nil {
// cclog.Warnf("Error while fetching metadata for node, DB ID '%v'", node.ID)
// return err
// }
// }
//
// if node.MetaData != nil {
// cpy := make(map[string]string, len(node.MetaData)+1)
// maps.Copy(cpy, node.MetaData)
// cpy[key] = val
// node.MetaData = cpy
// } else {
// node.MetaData = map[string]string{key: val}
// }
//
// if node.RawMetaData, err = json.Marshal(node.MetaData); err != nil {
// cclog.Warnf("Error while marshaling metadata for node, DB ID '%v'", node.ID)
// return err
// }
//
// if _, err = sq.Update("node").
// Set("meta_data", node.RawMetaData).
// Where("node.id = ?", node.ID).
// RunWith(r.stmtCache).Exec(); err != nil {
// cclog.Warnf("Error while updating metadata for node, DB ID '%v'", node.ID)
// return err
// }
//
// r.cache.Put(cachekey, node.MetaData, len(node.RawMetaData), 24*time.Hour)
// return nil
// }
if node.MetaData != nil {
cpy := make(map[string]string, len(node.MetaData)+1)
maps.Copy(cpy, node.MetaData)
cpy[key] = val
node.MetaData = cpy
} else {
node.MetaData = map[string]string{key: val}
}
if node.RawMetaData, err = json.Marshal(node.MetaData); err != nil {
cclog.Warnf("Error while marshaling metadata for node, DB ID '%v'", node.ID)
return err
}
if _, err = sq.Update("node").
Set("meta_data", node.RawMetaData).
Where("node.id = ?", node.ID).
RunWith(r.stmtCache).Exec(); err != nil {
cclog.Warnf("Error while updating metadata for node, DB ID '%v'", node.ID)
return err
}
r.cache.Put(cachekey, node.MetaData, len(node.RawMetaData), 24*time.Hour)
return nil
}
func (r *NodeRepository) GetNode(id int64, withMeta bool) (*schema.Node, error) {
func (r *NodeRepository) GetNode(hostname string, cluster string, withMeta bool) (*schema.Node, error) {
node := &schema.Node{}
if err := sq.Select("id", "hostname", "cluster", "subcluster", "node_state",
"health_state").From("node").
Where("node.id = ?", id).RunWith(r.DB).
QueryRow().Scan(&node.ID, &node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState,
&node.HealthState); err != nil {
cclog.Warnf("Error while querying node '%v' from database", id)
if err := sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state",
"node_state.health_state", "MAX(node_state.time_stamp)").From("node_state").
Join("node ON nodes_state.node_id = node.id").GroupBy("node_state.node_id").
Where("node.hostname = ?", hostname).Where("node.cluster = ?", cluster).RunWith(r.DB).
QueryRow().Scan(&node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState, &node.HealthState); err != nil {
cclog.Warnf("Error while querying node '%s' from database: %v", hostname, err)
return nil, err
}
if withMeta {
var err error
var meta map[string]string
if meta, err = r.FetchMetadata(node); err != nil {
cclog.Warnf("Error while fetching metadata for node '%v'", id)
if meta, err = r.FetchMetadata(hostname, cluster); err != nil {
cclog.Warnf("Error while fetching metadata for node '%s'", hostname)
return nil, err
}
node.MetaData = meta
@@ -153,7 +151,7 @@ INSERT INTO node (hostname, cluster, subcluster)
// AddNode adds a Node to the node table. This can be triggered by a node collector registration or
// from a nodestate update from the job scheduler.
func (r *NodeRepository) AddNode(node *schema.Node) (int64, error) {
func (r *NodeRepository) AddNode(node *schema.NodeDB) (int64, error) {
var err error
res, err := r.DB.NamedExec(NamedNodeInsert, node)
@@ -170,30 +168,15 @@ func (r *NodeRepository) AddNode(node *schema.Node) (int64, error) {
return node.ID, nil
}
func (r *NodeRepository) InsertNodeState(nodeState *schema.Node) error {
subcluster, err := archive.GetSubClusterByNode(nodeState.Cluster, nodeState.Hostname)
if err != nil {
cclog.Errorf("Error while getting subcluster for node '%s' in cluster '%s': %v", nodeState.Hostname, nodeState.Cluster, err)
return err
}
nodeState.SubCluster = subcluster
_, err = r.DB.NamedExec(NamedNodeInsert, nodeState)
if err != nil {
cclog.Errorf("Error while insert node '%v' to database", nodeState.Hostname)
return err
}
return nil
}
const NamedNodeStateInsert string = `
INSERT INTO node (hostname, cluster, subcluster)
VALUES (:hostname, :cluster, :subcluster);`
INSERT INTO node_state (time_stamp, node_state, health_state, cpus_allocated,
memory_allocated, gpus_allocated, jobs_running, node_id)
VALUES (:time_stamp, :node_state, :health_state, :cpus_allocated, :memory_allocated, :gpus_allocated, :jobs_running, :node_id);`
// Outdated?
func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeState *schema.NodeState) error {
// TODO: Add real Monitoring Health State
// UpdateNodeState is called from the Node REST API to add a row in the node state table
func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeState *schema.NodeStateDB) error {
var id int64
if err := sq.Select("id").From("node").
@@ -205,9 +188,8 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt
cclog.Errorf("Error while getting subcluster for node '%s' in cluster '%s': %v", hostname, cluster, err)
return err
}
node := schema.Node{
Hostname: hostname, Cluster: cluster, SubCluster: subcluster, NodeState: *nodeState,
HealthState: schema.MonitoringStateFull,
node := schema.NodeDB{
Hostname: hostname, Cluster: cluster, SubCluster: subcluster,
}
id, err = r.AddNode(&node)
if err != nil {
@@ -222,11 +204,15 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt
return err
}
}
if _, err := sq.Update("node_state").Set("node_state", nodeState).Where("node.id = ?", id).RunWith(r.DB).Exec(); err != nil {
cclog.Errorf("error while updating node '%s'", hostname)
nodeState.NodeID = id
_, err := r.DB.NamedExec(NamedNodeStateInsert, nodeState)
if err != nil {
cclog.Errorf("Error while adding node state for '%v' to database", hostname)
return err
}
cclog.Infof("Updated node '%s' in database", hostname)
cclog.Infof("Updated node state for '%s' in database", hostname)
return nil
}
@@ -249,21 +235,21 @@ func (r *NodeRepository) DeleteNode(id int64) error {
return nil
}
// TODO: Implement order by
// QueryNodes returns a list of nodes based on a node filter. It always operates
// on the last state (largest timestamp).
func (r *NodeRepository) QueryNodes(
ctx context.Context,
filters []*model.NodeFilter,
order *model.OrderByInput, // Currently unused!
) ([]*schema.Node, error) {
query, qerr := AccessCheck(ctx, sq.Select(nodeColumns...).From("node"))
query, qerr := AccessCheck(ctx,
sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state",
"node_state.health_state", "MAX(node_state.time_stamp)").From("node").
Join("node_state ON nodes_state.node_id = node.id").GroupBy("node_state.node_id"))
if qerr != nil {
return nil, qerr
}
// Get latest Info aka closest Timestamp to $now
now := time.Now().Unix()
query = query.Join("node_state ON node_state.node_id = node.id").Where(sq.Gt{"node_state.time_stamp": (now - 60)}) // .Distinct()
for _, f := range filters {
if f.Hostname != nil {
query = buildStringCondition("node.hostname", f.Hostname, query)
@@ -274,8 +260,8 @@ func (r *NodeRepository) QueryNodes(
if f.Subcluster != nil {
query = buildStringCondition("node.subcluster", f.Subcluster, query)
}
if f.NodeState != nil {
query = query.Where("node.node_state = ?", f.NodeState)
if f.SchedulerState != nil {
query = query.Where("node.node_state = ?", f.SchedulerState)
}
if f.HealthState != nil {
query = query.Where("node.health_state = ?", f.HealthState)
@@ -306,11 +292,9 @@ func (r *NodeRepository) QueryNodes(
}
func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error) {
// Get latest Info aka closest Timestamo to $now
now := time.Now().Unix()
q := sq.Select("hostname", "cluster", "subcluster", "node_state", "health_state").
From("node").
Join("node_state ON node_state.node_id = node.id").Where(sq.Gt{"node_state.time_stamp": (now - 60)}).
q := sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state",
"node_state.health_state", "MAX(node_state.time_stamp)").From("node").
Join("node_state ON node_state.node_id = node.id").GroupBy("node_state.node_id").
Where("node.cluster = ?", cluster).OrderBy("node.hostname ASC")
rows, err := q.RunWith(r.DB).Query()
@@ -354,8 +338,8 @@ func (r *NodeRepository) CountNodeStates(ctx context.Context, filters []*model.N
if f.Subcluster != nil {
query = buildStringCondition("node.subcluster", f.Subcluster, query)
}
if f.NodeState != nil {
query = query.Where("node.node_state = ?", f.NodeState)
if f.SchedulerState != nil {
query = query.Where("node.node_state = ?", f.SchedulerState)
}
if f.HealthState != nil {
query = query.Where("node.health_state = ?", f.HealthState)
@@ -407,8 +391,8 @@ func (r *NodeRepository) CountHealthStates(ctx context.Context, filters []*model
if f.Subcluster != nil {
query = buildStringCondition("node.subcluster", f.Subcluster, query)
}
if f.NodeState != nil {
query = query.Where("node.node_state = ?", f.NodeState)
if f.SchedulerState != nil {
query = query.Where("node.node_state = ?", f.SchedulerState)
}
if f.HealthState != nil {
query = query.Where("node.health_state = ?", f.HealthState)

View File

@@ -0,0 +1,190 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
ccconf "github.com/ClusterCockpit/cc-lib/ccConfig"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
_ "github.com/mattn/go-sqlite3"
)
func nodeTestSetup(t *testing.T) {
const testconfig = `{
"main": {
"addr": "0.0.0.0:8080",
"validate": false,
"apiAllowedIPs": [
"*"
]
},
"archive": {
"kind": "file",
"path": "./var/job-archive"
},
"auth": {
"jwts": {
"max-age": "2m"
}
},
"clusters": [
{
"name": "testcluster",
"metricDataRepository": {"kind": "test", "url": "bla:8081"},
"filterRanges": {
"numNodes": { "from": 1, "to": 64 },
"duration": { "from": 0, "to": 86400 },
"startTime": { "from": "2022-01-01T00:00:00Z", "to": null }
}
}
]
}`
const testclusterJSON = `{
"name": "testcluster",
"subClusters": [
{
"name": "sc1",
"nodes": "host123,host124,host125",
"processorType": "Intel Core i7-4770",
"socketsPerNode": 1,
"coresPerSocket": 4,
"threadsPerCore": 2,
"flopRateScalar": {
"unit": {
"prefix": "G",
"base": "F/s"
},
"value": 14
},
"flopRateSimd": {
"unit": {
"prefix": "G",
"base": "F/s"
},
"value": 112
},
"memoryBandwidth": {
"unit": {
"prefix": "G",
"base": "B/s"
},
"value": 24
},
"numberOfNodes": 70,
"topology": {
"node": [0, 1, 2, 3, 4, 5, 6, 7],
"socket": [[0, 1, 2, 3, 4, 5, 6, 7]],
"memoryDomain": [[0, 1, 2, 3, 4, 5, 6, 7]],
"die": [[0, 1, 2, 3, 4, 5, 6, 7]],
"core": [[0], [1], [2], [3], [4], [5], [6], [7]]
}
}
],
"metricConfig": [
{
"name": "load_one",
"unit": { "base": ""},
"scope": "node",
"timestep": 60,
"aggregation": "avg",
"peak": 8,
"normal": 0,
"caution": 0,
"alert": 0
}
]
}`
cclog.Init("debug", true)
tmpdir := t.TempDir()
jobarchive := filepath.Join(tmpdir, "job-archive")
if err := os.Mkdir(jobarchive, 0o777); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(jobarchive, "version.txt"),
fmt.Appendf(nil, "%d", 2), 0o666); err != nil {
t.Fatal(err)
}
if err := os.Mkdir(filepath.Join(jobarchive, "testcluster"),
0o777); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(jobarchive, "testcluster", "cluster.json"),
[]byte(testclusterJSON), 0o666); err != nil {
t.Fatal(err)
}
dbfilepath := filepath.Join(tmpdir, "test.db")
err := MigrateDB("sqlite3", dbfilepath)
if err != nil {
t.Fatal(err)
}
cfgFilePath := filepath.Join(tmpdir, "config.json")
if err := os.WriteFile(cfgFilePath, []byte(testconfig), 0o666); err != nil {
t.Fatal(err)
}
ccconf.Init(cfgFilePath)
// Load and check main configuration
if cfg := ccconf.GetPackageConfig("main"); cfg != nil {
if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil {
config.Init(cfg, clustercfg)
} else {
cclog.Abort("Cluster configuration must be present")
}
} else {
cclog.Abort("Main configuration must be present")
}
archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive)
Connect("sqlite3", dbfilepath)
if err := archive.Init(json.RawMessage(archiveCfg), config.Keys.DisableArchive); err != nil {
t.Fatal(err)
}
}
func TestUpdateNodeState(t *testing.T) {
nodeTestSetup(t)
nodeState := schema.NodeStateDB{
TimeStamp: time.Now().Unix(), NodeState: "allocated",
CpusAllocated: 72,
MemoryAllocated: 480,
GpusAllocated: 0,
HealthState: schema.MonitoringStateFull,
JobsRunning: 1,
}
repo := GetNodeRepository()
err := repo.UpdateNodeState("host124", "testcluster", &nodeState)
if err != nil {
return
}
node, err := repo.GetNode("host124", "testcluster", false)
if err != nil {
return
}
if node.NodeState != "allocated" {
t.Errorf("wrong node state\ngot: %s \nwant: allocated ", node.NodeState)
}
}

View File

@@ -23,7 +23,7 @@ import (
var groupBy2column = map[model.Aggregate]string{
model.AggregateUser: "job.hpc_user",
model.AggregateProject: "job.project",
model.AggregateCluster: "job.hpc_cluster",
model.AggregateCluster: "job.cluster",
model.AggregateSubcluster: "job.subcluster",
}

View File

@@ -50,7 +50,7 @@ func setupUserTest(t *testing.T) *UserCfgRepo {
tmpdir := t.TempDir()
cfgFilePath := filepath.Join(tmpdir, "config.json")
if err := os.WriteFile(cfgFilePath, []byte(testconfig), 0666); err != nil {
if err := os.WriteFile(cfgFilePath, []byte(testconfig), 0o666); err != nil {
t.Fatal(err)
}
@@ -79,10 +79,15 @@ func TestGetUIConfig(t *testing.T) {
t.Fatal("No config")
}
tmp := cfg["plot_list_selectedMetrics"]
metrics := tmp.([]string)
str := metrics[2]
if str != "flops_any" {
t.Errorf("wrong config\ngot: %s \nwant: flops_any", str)
tmp, exists := cfg["metricConfig_jobListMetrics"]
if exists {
metrics := tmp.([]string)
str := metrics[2]
if str != "flops_any" {
t.Errorf("wrong config\ngot: %s \nwant: flops_any", str)
}
} else {
t.Fatal("Key metricConfig_jobListMetrics is missing")
}
}

View File

@@ -11,9 +11,7 @@ import (
"encoding/json"
"html/template"
"io/fs"
"log"
"net/http"
"os"
"strings"
"github.com/ClusterCockpit/cc-backend/internal/config"
@@ -117,18 +115,9 @@ var UIDefaultsMap map[string]any
// "status_view_selectedTopProjectCategory": "totalJobs",
// }
func Init(configFilePath string) error {
var rawConfig json.RawMessage = nil
raw, rerr := os.ReadFile(configFilePath)
if rerr != nil {
if !os.IsNotExist(rerr) {
log.Fatalf("UI-CONFIG ERROR: %v", rerr)
}
} else {
rawConfig = json.RawMessage(raw)
}
func Init(rawConfig json.RawMessage) error {
var err error
if rawConfig != nil {
config.Validate(configSchema, rawConfig)
if err = json.Unmarshal(rawConfig, &UIDefaults); err != nil {

View File

@@ -16,7 +16,7 @@ import (
func TestInit(t *testing.T) {
fp := "../../configs/config.json"
ccconf.Init(fp)
cfg := ccconf.GetPackageConfig("web")
cfg := ccconf.GetPackageConfig("ui")
Init(cfg)