19 Commits

Author SHA1 Message Date
Bole Ma
acabdb6a61 Refactored CreateJobMeta 2023-10-26 17:43:54 +02:00
Bole Ma
6ac1182c06 Added SlurmDB Payload 2023-10-26 16:22:26 +02:00
Bole Ma
9fe497173d Added JobResource back to Job 2023-10-26 03:21:51 +02:00
Bole Ma
a3fbdbcf90 Added JSON Payload Converter 2023-10-25 23:27:06 +02:00
Bole Ma
59f6658344 Refactored queryAllJobs 2023-09-26 17:01:59 +02:00
Bole Ma
36b0f33208 Added support for extracting Resources and MetaData 2023-09-26 15:54:33 +02:00
Bole Ma
41cd1171fb Removed deprecated libraries 2023-09-26 02:58:34 +02:00
4c16e44507 Cleanup 2023-09-06 14:29:20 +02:00
8377e51608 Add FindRunningJobs to job repository 2023-09-06 14:28:29 +02:00
Bole Ma
e949f34e94 Merge branch '135-batch-scheduler-integration' of github.com:ClusterCockpit/cc-backend into 135-batch-scheduler-integration 2023-09-05 23:26:38 +02:00
Bole Ma
feeb0231e7 Added code handling OpenAPI structs 2023-09-05 23:26:34 +02:00
d4f09b4679 Remove generated REST client 2023-09-01 07:36:29 +02:00
03175681b6 Merge branch 'master' into 135-batch-scheduler-integration 2023-09-01 07:17:02 +02:00
Bole Ma
a43d03457b Added function queryAllJobs (adapted from Python) 2023-08-25 15:45:07 +02:00
125f3cd254 Show JobId instead ID. Add output 2023-08-21 09:19:18 +02:00
Bole Ma
d9e7a48e2f Added slurmNats code that could not parse BaseJob correctly 2023-08-18 15:38:02 +02:00
Bole Ma
c0f9eb7869 Merge branch 'master' of github.com:ClusterCockpit/cc-backend into 135-batch-scheduler-integration 2023-08-04 14:26:42 +02:00
Bole Ma
7aa7750177 Added test implementation for Nats subscriber 2023-07-07 16:15:05 +02:00
e5f7ad8ac0 Introduce batch scheduler plugins 2023-06-22 08:25:32 +02:00
19 changed files with 1847 additions and 570 deletions

18
go.mod
View File

@@ -18,6 +18,7 @@ require (
github.com/influxdata/influxdb-client-go/v2 v2.12.2 github.com/influxdata/influxdb-client-go/v2 v2.12.2
github.com/jmoiron/sqlx v1.3.5 github.com/jmoiron/sqlx v1.3.5
github.com/mattn/go-sqlite3 v1.14.16 github.com/mattn/go-sqlite3 v1.14.16
github.com/nats-io/nats.go v1.28.0
github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.40.0 github.com/prometheus/common v0.40.0
github.com/qustavo/sqlhooks/v2 v2.1.0 github.com/qustavo/sqlhooks/v2 v2.1.0
@@ -25,12 +26,14 @@ require (
github.com/swaggo/http-swagger v1.3.3 github.com/swaggo/http-swagger v1.3.3
github.com/swaggo/swag v1.16.1 github.com/swaggo/swag v1.16.1
github.com/vektah/gqlparser/v2 v2.5.8 github.com/vektah/gqlparser/v2 v2.5.8
golang.org/x/crypto v0.12.0 golang.org/x/crypto v0.13.0
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea
) )
require ( require (
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/ClusterCockpit/slurm-rest-client-0_0_37 v0.0.0-20230901125459-dc653ac37420 // indirect
github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20231010134848-707e8b20bde7 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect github.com/KyleBanks/depth v1.2.1 // indirect
github.com/agnivade/levenshtein v1.1.1 // indirect github.com/agnivade/levenshtein v1.1.1 // indirect
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
@@ -38,6 +41,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/containerd v1.6.18 // indirect github.com/containerd/containerd v1.6.18 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deepmap/oapi-codegen v1.12.4 // indirect github.com/deepmap/oapi-codegen v1.12.4 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect
@@ -56,6 +60,7 @@ require (
github.com/josharian/intern v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect
@@ -64,21 +69,26 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/nats-io/nats-server/v2 v2.9.21 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/swaggo/files v1.0.0 // indirect github.com/swaggo/files v1.0.0 // indirect
github.com/urfave/cli/v2 v2.25.7 // indirect github.com/urfave/cli/v2 v2.25.7 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.uber.org/atomic v1.10.0 // indirect go.uber.org/atomic v1.10.0 // indirect
golang.org/x/mod v0.12.0 // indirect golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.14.0 // indirect golang.org/x/net v0.15.0 // indirect
golang.org/x/oauth2 v0.5.0 // indirect golang.org/x/oauth2 v0.5.0 // indirect
golang.org/x/sys v0.11.0 // indirect golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.12.0 // indirect golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.12.0 // indirect golang.org/x/tools v0.12.0 // indirect
google.golang.org/appengine v1.6.7 // indirect google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.30.0 // indirect google.golang.org/protobuf v1.30.0 // indirect

31
go.sum
View File

@@ -82,6 +82,12 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
github.com/ClusterCockpit/cc-units v0.4.0 h1:zP5DOu99GmErW0tCDf0gcLrlWt42RQ9dpoONEOh4cI0= github.com/ClusterCockpit/cc-units v0.4.0 h1:zP5DOu99GmErW0tCDf0gcLrlWt42RQ9dpoONEOh4cI0=
github.com/ClusterCockpit/cc-units v0.4.0/go.mod h1:3S3PAhAayS3pbgcT4q9Vn9VJw22Op51X0YimtG77zBw= github.com/ClusterCockpit/cc-units v0.4.0/go.mod h1:3S3PAhAayS3pbgcT4q9Vn9VJw22Op51X0YimtG77zBw=
github.com/ClusterCockpit/slurm-rest-client-0_0_37 v0.0.0-20230901125459-dc653ac37420 h1:AeQY40KrLQmSSyyHdbNdAqgln+0+p1dLag5yspE5M8A=
github.com/ClusterCockpit/slurm-rest-client-0_0_37 v0.0.0-20230901125459-dc653ac37420/go.mod h1:oNgVG2puNj9cNw/KgqLbgE1pPOn8jXORX3ErP58LcAA=
github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20230906120742-0f15562ea666 h1:8PofHcOwEMmeAFqJjvAEgnu7rbRHAwJhd2XJ9u/YxiU=
github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20230906120742-0f15562ea666/go.mod h1:vxaj1my0GNoCXx4bYyOTA/IZP/IOZImtdOIn4T7pCa4=
github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20231010134848-707e8b20bde7 h1:YY/qDtFsp1DOJw/jyobiIBiIh1/yD2IVOdcK7EVEIKs=
github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20231010134848-707e8b20bde7/go.mod h1:vxaj1my0GNoCXx4bYyOTA/IZP/IOZImtdOIn4T7pCa4=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/Masterminds/squirrel v1.5.3 h1:YPpoceAcxuzIljlr5iWpNKaql7hLeG1KLSrhvdHpkZc= github.com/Masterminds/squirrel v1.5.3 h1:YPpoceAcxuzIljlr5iWpNKaql7hLeG1KLSrhvdHpkZc=
@@ -461,6 +467,7 @@ github.com/go-ldap/ldap/v3 v3.4.4/go.mod h1:fe1MsuN5eJJ1FeLT/LEBVdWfNWKh459R7aXg
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
@@ -593,6 +600,7 @@ github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8l
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
@@ -816,6 +824,8 @@ github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdY
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@@ -887,6 +897,7 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr
github.com/maxbrunsfeld/counterfeiter/v6 v6.2.2/go.mod h1:eD9eIE7cdwcMi9rYluz88Jz2VyhSmden33/aXg4oVIY= github.com/maxbrunsfeld/counterfeiter/v6 v6.2.2/go.mod h1:eD9eIE7cdwcMi9rYluz88Jz2VyhSmden33/aXg4oVIY=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/pkcs11 v1.0.3/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs= github.com/miekg/pkcs11 v1.0.3/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4= github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
@@ -931,6 +942,15 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/nakagami/firebirdsql v0.0.0-20190310045651-3c02a58cfed8/go.mod h1:86wM1zFnC6/uDBfZGNwB65O+pR2OFi5q/YQaEUid1qA= github.com/nakagami/firebirdsql v0.0.0-20190310045651-3c02a58cfed8/go.mod h1:86wM1zFnC6/uDBfZGNwB65O+pR2OFi5q/YQaEUid1qA=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/nats-server/v2 v2.9.21 h1:2TBTh0UDE74eNXQmV4HofsmRSCiVN0TH2Wgrp6BD6fk=
github.com/nats-io/nats-server/v2 v2.9.21/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU=
github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c=
github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/neo4j/neo4j-go-driver v1.8.1-0.20200803113522-b626aa943eba/go.mod h1:ncO5VaFWh0Nrt+4KT4mOZboaczBZcLuHrG+/sUeP8gI= github.com/neo4j/neo4j-go-driver v1.8.1-0.20200803113522-b626aa943eba/go.mod h1:ncO5VaFWh0Nrt+4KT4mOZboaczBZcLuHrG+/sUeP8gI=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
@@ -1144,6 +1164,7 @@ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1F
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/swaggo/files v1.0.0 h1:1gGXVIeUFCS/dta17rnP0iOpr6CXFwKD7EO5ID233e4= github.com/swaggo/files v1.0.0 h1:1gGXVIeUFCS/dta17rnP0iOpr6CXFwKD7EO5ID233e4=
github.com/swaggo/files v1.0.0/go.mod h1:N59U6URJLyU1PQgFqPM7wXLMhJx7QAolnvfQkqO13kc= github.com/swaggo/files v1.0.0/go.mod h1:N59U6URJLyU1PQgFqPM7wXLMhJx7QAolnvfQkqO13kc=
@@ -1167,6 +1188,7 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs= github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/vektah/gqlparser/v2 v2.5.8 h1:pm6WOnGdzFOCfcQo9L3+xzW51mKrlwTEg4Wr7AH1JW4= github.com/vektah/gqlparser/v2 v2.5.8 h1:pm6WOnGdzFOCfcQo9L3+xzW51mKrlwTEg4Wr7AH1JW4=
github.com/vektah/gqlparser/v2 v2.5.8/go.mod h1:z8xXUff237NntSuH8mLFijZ+1tjV1swDbpDqjJmk6ME= github.com/vektah/gqlparser/v2 v2.5.8/go.mod h1:z8xXUff237NntSuH8mLFijZ+1tjV1swDbpDqjJmk6ME=
github.com/vishvananda/netlink v0.0.0-20181108222139-023a6dafdcdf/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk= github.com/vishvananda/netlink v0.0.0-20181108222139-023a6dafdcdf/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk=
@@ -1286,6 +1308,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -1409,6 +1433,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/oauth2 v0.0.0-20180227000427-d7d64896b5ff/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180227000427-d7d64896b5ff/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -1568,6 +1594,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@@ -1586,6 +1614,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -1595,6 +1625,7 @@ golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@@ -290,6 +290,30 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {
return scanJob(q.RunWith(r.stmtCache).QueryRow()) return scanJob(q.RunWith(r.stmtCache).QueryRow())
} }
func (r *JobRepository) FindRunningJobs(cluster string) (map[int64]*schema.Job, error) {
jobs := map[int64]*schema.Job{}
q := sq.Select(jobColumns...).From("job").Where("job.cluster = ?", cluster)
q = q.Where("job.job_state = ?", "running")
rows, err := q.RunWith(r.stmtCache).Query()
if err != nil {
log.Errorf("Error while querying running jobs: %v", err)
return nil, err
}
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
rows.Close()
log.Warn("Error while scanning rows (Jobs)")
return nil, err
}
jobs[job.JobID] = job
}
return jobs, nil
}
func (r *JobRepository) FindConcurrentJobs( func (r *JobRepository) FindConcurrentJobs(
ctx context.Context, ctx context.Context,
job *schema.Job) (*model.JobLinkResultList, error) { job *schema.Job) (*model.JobLinkResultList, error) {

View File

@@ -70,30 +70,28 @@ func (r *JobRepository) buildStatsQuery(
var query sq.SelectBuilder var query sq.SelectBuilder
castType := r.getCastType() castType := r.getCastType()
// fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType)
if col != "" { if col != "" {
// Scan columns: id, totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours // Scan columns: id, totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
query = sq.Select(col, "COUNT(job.id) as totalJobs", query = sq.Select(col, "COUNT(job.id) as totalJobs",
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s) as totalWalltime`, time.Now().Unix(), castType), fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s) as totalWalltime", castType),
fmt.Sprintf(`CAST(SUM(job.num_nodes) as %s) as totalNodes`, castType), fmt.Sprintf("CAST(SUM(job.num_nodes) as %s) as totalNodes", castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as %s) as totalNodeHours`, time.Now().Unix(), castType), fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s) as totalNodeHours", castType),
fmt.Sprintf(`CAST(SUM(job.num_hwthreads) as %s) as totalCores`, castType), fmt.Sprintf("CAST(SUM(job.num_hwthreads) as %s) as totalCores", castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as %s) as totalCoreHours`, time.Now().Unix(), castType), fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s) as totalCoreHours", castType),
fmt.Sprintf(`CAST(SUM(job.num_acc) as %s) as totalAccs`, castType), fmt.Sprintf("CAST(SUM(job.num_acc) as %s) as totalAccs", castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s) as totalAccHours`, time.Now().Unix(), castType), fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s) as totalAccHours", castType),
).From("job").GroupBy(col) ).From("job").GroupBy(col)
} else { } else {
// Scan columns: totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours // Scan columns: totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
query = sq.Select("COUNT(job.id)", query = sq.Select("COUNT(job.id)",
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s)`, time.Now().Unix(), castType), fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
fmt.Sprintf(`CAST(SUM(job.num_nodes) as %s)`, castType), fmt.Sprintf("CAST(SUM(job.num_nodes) as %s)", castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as %s)`, time.Now().Unix(), castType), fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType),
fmt.Sprintf(`CAST(SUM(job.num_hwthreads) as %s)`, castType), fmt.Sprintf("CAST(SUM(job.num_hwthreads) as %s)", castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as %s)`, time.Now().Unix(), castType), fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType),
fmt.Sprintf(`CAST(SUM(job.num_acc) as %s)`, castType), fmt.Sprintf("CAST(SUM(job.num_acc) as %s)", castType),
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s)`, time.Now().Unix(), castType), fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s)", castType),
).From("job") ).From("job")
} }

View File

@@ -0,0 +1,479 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package scheduler
import (
"encoding/json"
"fmt"
"os"
"github.com/ClusterCockpit/cc-backend/pkg/log"
)
type MetricConfig struct {
Name string `json:"name"`
Unit struct {
Base string `json:"base"`
} `json:"unit"`
Scope string `json:"scope"`
Aggregation string `json:"aggregation"`
Timestep int `json:"timestep"`
Peak float64 `json:"peak"`
Normal float64 `json:"normal"`
Caution float64 `json:"caution"`
Alert float64 `json:"alert"`
}
type SubCluster struct {
Name string `json:"name"`
Nodes string `json:"nodes"`
ProcessorType string `json:"processorType"`
SocketsPerNode int `json:"socketsPerNode"`
CoresPerSocket int `json:"coresPerSocket"`
ThreadsPerCore int `json:"threadsPerCore"`
FlopRateScalar struct {
Unit struct {
Base string `json:"base"`
Prefix string `json:"prefix"`
} `json:"unit"`
Value float64 `json:"value"`
} `json:"flopRateScalar"`
FlopRateSimd struct {
Unit struct {
Base string `json:"base"`
Prefix string `json:"prefix"`
} `json:"unit"`
Value float64 `json:"value"`
} `json:"flopRateSimd"`
MemoryBandwidth struct {
Unit struct {
Base string `json:"base"`
Prefix string `json:"prefix"`
} `json:"unit"`
Value float64 `json:"value"`
} `json:"memoryBandwidth"`
Topology struct {
Node []int `json:"node"`
Socket [][]int `json:"socket"`
MemoryDomain [][]int `json:"memoryDomain"`
Core [][]int `json:"core"`
Accelerators []struct {
ID string `json:"id"`
Type string `json:"type"`
Model string `json:"model"`
} `json:"accelerators"`
} `json:"topology"`
}
type ClusterConfig struct {
Name string `json:"name"`
MetricConfig []MetricConfig `json:"metricConfig"`
SubClusters []SubCluster `json:"subClusters"`
}
type Metadata struct {
Plugin struct {
Type string `json:"type"`
Name string `json:"name"`
} `json:"plugin"`
Slurm struct {
Version struct {
Major int `json:"major"`
Micro int `json:"micro"`
Minor int `json:"minor"`
} `json:"version"`
Release string `json:"release"`
} `json:"Slurm"`
}
type JobResource struct {
Nodes string `json:"nodes"`
AllocatedCores int `json:"allocated_cores"`
AllocatedHosts int `json:"allocated_hosts"`
AllocatedNodes []AllocatedNode `json:"allocated_nodes"`
}
type AllocatedNode struct {
Sockets map[string]Socket `json:"sockets"`
Nodename string `json:"nodename"`
CPUsUsed *int `json:"cpus_used"`
MemoryUsed *int `json:"memory_used"`
MemoryAllocated *int `json:"memory_allocated"`
}
type Socket struct {
Cores map[string]string `json:"cores"`
}
type Job struct {
Account string `json:"account"`
AccrueTime int `json:"accrue_time"`
AdminComment string `json:"admin_comment"`
ArrayJobID int64 `json:"array_job_id"`
ArrayTaskID interface{} `json:"array_task_id"`
ArrayMaxTasks int `json:"array_max_tasks"`
ArrayTaskString string `json:"array_task_string"`
AssociationID int `json:"association_id"`
BatchFeatures string `json:"batch_features"`
BatchFlag bool `json:"batch_flag"`
BatchHost string `json:"batch_host"`
Flags []string `json:"flags"`
BurstBuffer string `json:"burst_buffer"`
BurstBufferState string `json:"burst_buffer_state"`
Cluster string `json:"cluster"`
ClusterFeatures string `json:"cluster_features"`
Command string `json:"command"`
Comment string `json:"comment"`
Container string `json:"container"`
Contiguous bool `json:"contiguous"`
CoreSpec interface{} `json:"core_spec"`
ThreadSpec interface{} `json:"thread_spec"`
CoresPerSocket interface{} `json:"cores_per_socket"`
BillableTres interface{} `json:"billable_tres"`
CPUPerTask interface{} `json:"cpus_per_task"`
CPUFrequencyMinimum interface{} `json:"cpu_frequency_minimum"`
CPUFrequencyMaximum interface{} `json:"cpu_frequency_maximum"`
CPUFrequencyGovernor interface{} `json:"cpu_frequency_governor"`
CPUPerTres string `json:"cpus_per_tres"`
Deadline int `json:"deadline"`
DelayBoot int `json:"delay_boot"`
Dependency string `json:"dependency"`
DerivedExitCode int `json:"derived_exit_code"`
EligibleTime int `json:"eligible_time"`
EndTime int64 `json:"end_time"`
ExcludedNodes string `json:"excluded_nodes"`
ExitCode int `json:"exit_code"`
Features string `json:"features"`
FederationOrigin string `json:"federation_origin"`
FederationSiblingsActive string `json:"federation_siblings_active"`
FederationSiblingsViable string `json:"federation_siblings_viable"`
GresDetail []string `json:"gres_detail"`
GroupID int `json:"group_id"`
GroupName string `json:"group_name"`
JobID int64 `json:"job_id"`
JobResources JobResource `json:"job_resources"`
JobState string `json:"job_state"`
LastSchedEvaluation int `json:"last_sched_evaluation"`
Licenses string `json:"licenses"`
MaxCPUs int `json:"max_cpus"`
MaxNodes int `json:"max_nodes"`
MCSLabel string `json:"mcs_label"`
MemoryPerTres string `json:"memory_per_tres"`
Name string `json:"name"`
Nodes string `json:"nodes"`
Nice interface{} `json:"nice"`
TasksPerCore interface{} `json:"tasks_per_core"`
TasksPerNode int `json:"tasks_per_node"`
TasksPerSocket interface{} `json:"tasks_per_socket"`
TasksPerBoard int `json:"tasks_per_board"`
CPUs int32 `json:"cpus"`
NodeCount int32 `json:"node_count"`
Tasks int `json:"tasks"`
HETJobID int `json:"het_job_id"`
HETJobIDSet string `json:"het_job_id_set"`
HETJobOffset int `json:"het_job_offset"`
Partition string `json:"partition"`
MemoryPerNode interface{} `json:"memory_per_node"`
MemoryPerCPU int `json:"memory_per_cpu"`
MinimumCPUsPerNode int `json:"minimum_cpus_per_node"`
MinimumTmpDiskPerNode int `json:"minimum_tmp_disk_per_node"`
PreemptTime int `json:"preempt_time"`
PreSUSTime int `json:"pre_sus_time"`
Priority int `json:"priority"`
Profile interface{} `json:"profile"`
QoS string `json:"qos"`
Reboot bool `json:"reboot"`
RequiredNodes string `json:"required_nodes"`
Requeue bool `json:"requeue"`
ResizeTime int `json:"resize_time"`
RestartCnt int `json:"restart_cnt"`
ResvName string `json:"resv_name"`
Shared *string `json:"shared"`
ShowFlags []string `json:"show_flags"`
SocketsPerBoard int `json:"sockets_per_board"`
SocketsPerNode interface{} `json:"sockets_per_node"`
StartTime int64 `json:"start_time"`
StateDescription string `json:"state_description"`
StateReason string `json:"state_reason"`
StandardError string `json:"standard_error"`
StandardInput string `json:"standard_input"`
StandardOutput string `json:"standard_output"`
SubmitTime int `json:"submit_time"`
SuspendTime int `json:"suspend_time"`
SystemComment string `json:"system_comment"`
TimeLimit int `json:"time_limit"`
TimeMinimum int `json:"time_minimum"`
ThreadsPerCore interface{} `json:"threads_per_core"`
TresBind string `json:"tres_bind"`
TresFreq string `json:"tres_freq"`
TresPerJob string `json:"tres_per_job"`
TresPerNode string `json:"tres_per_node"`
TresPerSocket string `json:"tres_per_socket"`
TresPerTask string `json:"tres_per_task"`
TresReqStr string `json:"tres_req_str"`
TresAllocStr string `json:"tres_alloc_str"`
UserID int `json:"user_id"`
UserName string `json:"user_name"`
Wckey string `json:"wckey"`
CurrentWorkingDirectory string `json:"current_working_directory"`
}
type SlurmPayload struct {
Meta Metadata `json:"meta"`
Errors []interface{} `json:"errors"`
Jobs []Job `json:"jobs"`
}
type DumpedComment struct {
Administrator interface{} `json:"administrator"`
Job interface{} `json:"job"`
System interface{} `json:"system"`
}
type MaxLimits struct {
Running struct {
Tasks int `json:"tasks"`
} `json:"max"`
}
type ArrayInfo struct {
JobID int `json:"job_id"`
Limits MaxLimits `json:"limits"`
Task interface{} `json:"task"`
TaskID interface{} `json:"task_id"`
}
type Association struct {
Account string `json:"account"`
Cluster string `json:"cluster"`
Partition interface{} `json:"partition"`
User string `json:"user"`
}
type TimeInfo struct {
Elapsed int64 `json:"elapsed"`
Eligible int64 `json:"eligible"`
End int64 `json:"end"`
Start int64 `json:"start"`
Submission int64 `json:"submission"`
Suspended int64 `json:"suspended"`
System struct {
Seconds int `json:"seconds"`
Microseconds int `json:"microseconds"`
} `json:"system"`
Limit int `json:"limit"`
Total struct {
Seconds int `json:"seconds"`
Microseconds int `json:"microseconds"`
} `json:"total"`
User struct {
Seconds int `json:"seconds"`
Microseconds int `json:"microseconds"`
} `json:"user"`
}
type ExitCode struct {
Status string `json:"status"`
ReturnCode int `json:"return_code"`
}
type DumpedJob struct {
Account string `json:"account"`
Comment DumpedComment `json:"comment"`
AllocationNodes int `json:"allocation_nodes"`
Array ArrayInfo `json:"array"`
Association Association `json:"association"`
Cluster string `json:"cluster"`
Constraints string `json:"constraints"`
Container interface{} `json:"container"`
DerivedExitCode ExitCode `json:"derived_exit_code"`
Time TimeInfo `json:"time"`
ExitCode ExitCode `json:"exit_code"`
Flags []string `json:"flags"`
Group string `json:"group"`
Het struct {
JobID int `json:"job_id"`
JobOffset interface{} `json:"job_offset"`
} `json:"het"`
JobID int64 `json:"job_id"`
Name string `json:"name"`
MCS struct {
Label string `json:"label"`
} `json:"mcs"`
Nodes string `json:"nodes"`
Partition string `json:"partition"`
Priority int `json:"priority"`
QoS string `json:"qos"`
Required struct {
CPUs int `json:"CPUs"`
Memory int `json:"memory"`
} `json:"required"`
KillRequestUser interface{} `json:"kill_request_user"`
Reservation struct {
ID int `json:"id"`
Name int `json:"name"`
} `json:"reservation"`
State struct {
Current string `json:"current"`
Reason string `json:"reason"`
} `json:"state"`
Steps []struct {
Nodes struct {
List []string `json:"list"`
Count int `json:"count"`
Range string `json:"range"`
} `json:"nodes"`
Tres struct {
Requested struct {
Max []interface{} `json:"max"`
Min []interface{} `json:"min"`
Average []interface{} `json:"average"`
Total []interface{} `json:"total"`
} `json:"requested"`
Consumed struct {
Max []interface{} `json:"max"`
Min []interface{} `json:"min"`
Average []interface{} `json:"average"`
Total []interface{} `json:"total"`
} `json:"consumed"`
Allocated []struct {
Type string `json:"type"`
Name interface{} `json:"name"`
ID int `json:"id"`
Count int `json:"count"`
} `json:"allocated"`
} `json:"tres"`
Time TimeInfo `json:"time"`
ExitCode ExitCode `json:"exit_code"`
Tasks struct {
Count int `json:"count"`
} `json:"tasks"`
PID interface{} `json:"pid"`
CPU struct {
RequestedFrequency struct {
Min int `json:"min"`
Max int `json:"max"`
} `json:"requested_frequency"`
Governor []interface{} `json:"governor"`
} `json:"CPU"`
KillRequestUser interface{} `json:"kill_request_user"`
State string `json:"state"`
Statistics struct {
CPU struct {
ActualFrequency int `json:"actual_frequency"`
} `json:"CPU"`
Energy struct {
Consumed int `json:"consumed"`
} `json:"energy"`
} `json:"statistics"`
Step struct {
JobID int `json:"job_id"`
Het struct {
Component interface{} `json:"component"`
} `json:"het"`
ID string `json:"id"`
Name string `json:"name"`
} `json:"step"`
Task struct {
Distribution string `json:"distribution"`
} `json:"task"`
} `json:"steps"`
Tres struct {
Allocated []struct {
Type string `json:"type"`
Name interface{} `json:"name"`
ID int `json:"id"`
Count int `json:"count"`
} `json:"allocated"`
Requested []struct {
Type string `json:"type"`
Name interface{} `json:"name"`
ID int `json:"id"`
Count int `json:"count"`
} `json:"requested"`
} `json:"tres"`
User string `json:"user"`
Wckey struct {
Wckey string `json:"wckey"`
Flags []string `json:"flags"`
} `json:"wckey"`
WorkingDirectory string `json:"working_directory"`
}
type SlurmDBPayload struct {
Meta Metadata `json:"meta"`
Errors []string `json:"errors"`
Jobs []DumpedJob `json:"jobs"`
}
func DecodeClusterConfig(filename string) (ClusterConfig, error) {
var clusterConfig ClusterConfig
file, err := os.Open(filename)
if err != nil {
log.Errorf("Cluster config file not found. No cores/GPU ids available.")
return clusterConfig, err
}
defer file.Close()
decoder := json.NewDecoder(file)
err = decoder.Decode(&clusterConfig)
if err != nil {
log.Errorf("Error decoding cluster config file: %v", err)
}
log.Printf("Name: %s\n", clusterConfig.Name)
log.Printf("MetricConfig:\n")
for _, metric := range clusterConfig.MetricConfig {
log.Printf(" Name: %s\n", metric.Name)
log.Printf(" Unit Base: %s\n", metric.Unit.Base)
log.Printf(" Scope: %s\n", metric.Scope)
log.Printf(" Aggregation: %s\n", metric.Aggregation)
log.Printf(" Timestep: %d\n", metric.Timestep)
log.Printf(" Peak: %f\n", metric.Peak)
log.Printf(" Normal: %f\n", metric.Normal)
log.Printf(" Caution: %f\n", metric.Caution)
log.Printf(" Alert: %f\n", metric.Alert)
}
log.Printf("SubClusters:\n")
for _, subCluster := range clusterConfig.SubClusters {
log.Printf(" Name: %s\n", subCluster.Name)
log.Printf(" Nodes: %s\n", subCluster.Nodes)
log.Printf(" Processor Type: %s\n", subCluster.ProcessorType)
log.Printf(" Sockets Per Node: %d\n", subCluster.SocketsPerNode)
log.Printf(" Cores Per Socket: %d\n", subCluster.CoresPerSocket)
log.Printf(" Threads Per Core: %d\n", subCluster.ThreadsPerCore)
log.Printf(" Flop Rate Scalar Unit Base: %s\n", subCluster.FlopRateScalar.Unit.Base)
log.Printf(" Flop Rate Scalar Unit Prefix: %s\n", subCluster.FlopRateScalar.Unit.Prefix)
log.Printf(" Flop Rate Scalar Value: %f\n", subCluster.FlopRateScalar.Value)
log.Printf(" Flop Rate Simd Unit Base: %s\n", subCluster.FlopRateSimd.Unit.Base)
log.Printf(" Flop Rate Simd Unit Prefix: %s\n", subCluster.FlopRateSimd.Unit.Prefix)
log.Printf(" Flop Rate Simd Value: %f\n", subCluster.FlopRateSimd.Value)
log.Printf(" Memory Bandwidth Unit Base: %s\n", subCluster.MemoryBandwidth.Unit.Base)
log.Printf(" Memory Bandwidth Unit Prefix: %s\n", subCluster.MemoryBandwidth.Unit.Prefix)
log.Printf(" Memory Bandwidth Value: %f\n", subCluster.MemoryBandwidth.Value)
log.Printf(" Topology Node: %v\n", subCluster.Topology.Node)
log.Printf(" Topology Socket: %v\n", subCluster.Topology.Socket)
log.Printf(" Topology Memory Domain: %v\n", subCluster.Topology.MemoryDomain)
log.Printf(" Topology Core: %v\n", subCluster.Topology.Core)
log.Printf(" Topology Accelerators:\n")
for _, accelerator := range subCluster.Topology.Accelerators {
log.Printf(" ID: %s\n", accelerator.ID)
log.Printf(" Type: %s\n", accelerator.Type)
log.Printf(" Model: %s\n", accelerator.Model)
}
}
return clusterConfig, nil
}
func UnmarshalSlurmPayload(jsonPayload string) (SlurmPayload, error) {
var slurmData SlurmPayload
err := json.Unmarshal([]byte(jsonPayload), &slurmData)
if err != nil {
return slurmData, fmt.Errorf("failed to unmarshal JSON data: %v", err)
}
return slurmData, nil
}

View File

@@ -0,0 +1,27 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package scheduler
import "encoding/json"
type BatchScheduler interface {
Init(rawConfig json.RawMessage) error
Sync()
}
var sd BatchScheduler
func Init(rawConfig json.RawMessage) error {
sd = &SlurmNatsScheduler{}
sd.Init(rawConfig)
return nil
}
func GetHandle() BatchScheduler {
return sd
}

View File

@@ -0,0 +1,214 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package scheduler
import (
"database/sql"
"encoding/json"
"fmt"
"strings"
"sync"
"github.com/ClusterCockpit/cc-backend/internal/importer"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/nats-io/nats.go"
)
type SlurmNatsConfig struct {
URL string `json:"url"`
}
type SlurmNatsScheduler struct {
url string
RepositoryMutex sync.Mutex
JobRepository *repository.JobRepository
}
type StopJobRequest struct {
// Stop Time of job as epoch
StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"`
State schema.JobState `json:"jobState" validate:"required" example:"completed"` // Final job state
JobId *int64 `json:"jobId" example:"123000"` // Cluster Job ID of job
Cluster *string `json:"cluster" example:"fritz"` // Cluster of job
StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch
}
func (sd *SlurmNatsScheduler) startJob(req *schema.JobMeta) {
fmt.Printf("DEBUG: %+v\n", *req)
log.Printf("Server Name: %s - Job ID: %v", req.BaseJob.Cluster, req.BaseJob.JobID)
log.Printf("User: %s - Project: %s", req.BaseJob.User, req.BaseJob.Project)
if req.State == "" {
req.State = schema.JobStateRunning
}
if err := importer.SanityChecks(&req.BaseJob); err != nil {
log.Errorf("Sanity checks failed: %s", err.Error())
return
}
// aquire lock to avoid race condition between API calls
var unlockOnce sync.Once
sd.RepositoryMutex.Lock()
defer unlockOnce.Do(sd.RepositoryMutex.Unlock)
// Check if combination of (job_id, cluster_id, start_time) already exists:
jobs, err := sd.JobRepository.FindAll(&req.JobID, &req.Cluster, nil)
if err != nil && err != sql.ErrNoRows {
log.Errorf("checking for duplicate failed: %s", err.Error())
return
} else if err == nil {
for _, job := range jobs {
if (req.StartTime - job.StartTimeUnix) < 86400 {
log.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID)
return
}
}
}
id, err := sd.JobRepository.Start(req)
if err != nil {
log.Errorf("insert into database failed: %s", err.Error())
return
}
// unlock here, adding Tags can be async
unlockOnce.Do(sd.RepositoryMutex.Unlock)
for _, tag := range req.Tags {
if _, err := sd.JobRepository.AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
log.Errorf("adding tag to new job %d failed: %s", id, err.Error())
return
}
}
log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime)
}
func (sd *SlurmNatsScheduler) checkAndHandleStopJob(job *schema.Job, req *StopJobRequest) {
// Sanity checks
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
log.Errorf("stopTime must be larger than startTime and only running jobs can be stopped")
return
}
if req.State != "" && !req.State.Valid() {
log.Errorf("invalid job state: %#v", req.State)
return
} else if req.State == "" {
req.State = schema.JobStateCompleted
}
// Mark job as stopped in the database (update state and duration)
job.Duration = int32(req.StopTime - job.StartTime.Unix())
job.State = req.State
if err := sd.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
log.Errorf("marking job as stopped failed: %s", err.Error())
return
}
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime)
// Monitoring is disabled...
if job.MonitoringStatus == schema.MonitoringStatusDisabled {
return
}
// Trigger async archiving
sd.JobRepository.TriggerArchiving(job)
}
func (sd *SlurmNatsScheduler) stopJob(req *StopJobRequest) {
// if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
// log.Errorf("missing role: %v", auth.GetRoleString(auth.RoleApi))
// return
// }
log.Printf("Server Name: %s - Job ID: %v", *req.Cluster, req.JobId)
// Fetch job (that will be stopped) from db
var job *schema.Job
var err error
if req.JobId == nil {
log.Errorf("the field 'jobId' is required")
return
}
job, err = sd.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
if err != nil {
log.Errorf("finding job failed: %s", err.Error())
return
}
sd.checkAndHandleStopJob(job, req)
}
func (sd *SlurmNatsScheduler) Init(rawConfig json.RawMessage) error {
servers := []string{"nats://127.0.0.1:4222", "nats://127.0.0.1:1223"}
nc, err := nats.Connect(strings.Join(servers, ","))
if err != nil {
log.Fatal(err.Error())
}
defer nc.Close()
getStatusTxt := func(nc *nats.Conn) string {
switch nc.Status() {
case nats.CONNECTED:
return "Connected"
case nats.CLOSED:
return "Closed"
default:
return "Other"
}
}
log.Printf("The connection status is %v\n", getStatusTxt(nc))
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
log.Fatal(err.Error())
}
defer ec.Close()
// Define the object
type encodedMessage struct {
ServerName string
ResponseCode int
}
wg := sync.WaitGroup{}
wg.Add(1)
// Subscribe
if _, err := ec.Subscribe("test", func(s *encodedMessage) {
log.Printf("Server Name: %s - Response Code: %v", s.ServerName, s.ResponseCode)
if s.ResponseCode == 500 {
wg.Done()
}
}); err != nil {
log.Fatal(err.Error())
}
if _, err := ec.Subscribe("startJob", sd.startJob); err != nil {
log.Fatal(err.Error())
}
if _, err := ec.Subscribe("stopJob", sd.stopJob); err != nil {
log.Fatal(err.Error())
}
// Wait for a message to come in
wg.Wait()
return nil
}
func (sd *SlurmNatsScheduler) Sync() {
}

View File

@@ -0,0 +1,565 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package scheduler
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
type SlurmRestSchedulerConfig struct {
URL string `json:"url"`
JobRepository *repository.JobRepository
clusterConfig ClusterConfig
client *http.Client
nodeClusterMap map[string]string
clusterPCIEAddressMap map[string][]string
}
func (cfg *SlurmRestSchedulerConfig) queryDB(qtime int64, clusterName string) ([]interface{}, error) {
apiEndpoint := "/slurmdb/v0.0.38/jobs"
// Construct the query parameters
queryParams := url.Values{}
queryParams.Set("users", "user1,user2")
queryParams.Set("submit_time", "2023-01-01T00:00:00")
// Add the query parameters to the API endpoint
apiEndpoint += "?" + queryParams.Encode()
// Create a new HTTP GET request
req, err := http.NewRequest("GET", apiEndpoint, nil)
if err != nil {
log.Errorf("Error creating request: %v", err)
}
// Send the request
resp, err := cfg.client.Do(req)
if err != nil {
log.Errorf("Error sending request: %v", err)
}
defer resp.Body.Close()
// Check the response status code
if resp.StatusCode != http.StatusOK {
log.Errorf("API request failed with status: %v", resp.Status)
}
// Read the response body
// Here you can parse the response body as needed
// For simplicity, let's just print the response body
var dbOutput []byte
_, err = resp.Body.Read(dbOutput)
if err != nil {
log.Errorf("Error reading response body: %v", err)
}
log.Errorf("API response: %v", string(dbOutput))
dataJobs := make(map[string]interface{})
err = json.Unmarshal(dbOutput, &dataJobs)
if err != nil {
log.Errorf("Error parsing JSON response: %v", err)
os.Exit(1)
}
if _, ok := dataJobs["jobs"]; !ok {
log.Errorf("ERROR: jobs not found - response incomplete")
os.Exit(1)
}
jobs, _ := dataJobs["jobs"].([]interface{})
return jobs, nil
}
func (cfg *SlurmRestSchedulerConfig) fetchJobs() (SlurmPayload, error) {
var ctlOutput []byte
apiEndpoint := "http://:8080/slurm/v0.0.38/jobs"
// Create a new HTTP GET request with query parameters
req, err := http.NewRequest("GET", apiEndpoint, nil)
if err != nil {
log.Errorf("Error creating request: %v", err)
}
// Send the request
resp, err := cfg.client.Do(req)
if err != nil {
log.Errorf("Error sending request: %v", err)
}
defer resp.Body.Close()
// Check the response status code
if resp.StatusCode != http.StatusOK {
log.Errorf("API request failed with status: %v", resp.Status)
}
_, err = resp.Body.Read(ctlOutput)
log.Printf("Received JSON Data: %v", ctlOutput)
if err != nil {
log.Errorf("Error reading response body: %v", err)
}
var jobsResponse SlurmPayload
err = json.Unmarshal(ctlOutput, &jobsResponse)
if err != nil {
log.Errorf("Error parsing JSON response: %v", err)
return jobsResponse, err
}
return jobsResponse, nil
}
func fetchJobsLocal() (SlurmPayload, error) {
// Read the Slurm Payload JSON file
jobsData, err := os.ReadFile("slurm_0038.json")
if err != nil {
fmt.Println("Error reading Slurm Payload JSON file:", err)
}
var jobsResponse SlurmPayload
err = json.Unmarshal(jobsData, &jobsResponse)
if err != nil {
log.Errorf("Error parsing Slurm Payload JSON response: %v", err)
return jobsResponse, err
}
return jobsResponse, nil
}
func fetchDumpedJobsLocal() (SlurmDBPayload, error) {
// Read the SlurmDB Payload JSON file
jobsData, err := os.ReadFile("slurmdb_0038-large.json")
if err != nil {
fmt.Println("Error reading SlurmDB Payload JSON file:", err)
}
var jobsResponse SlurmDBPayload
err = json.Unmarshal(jobsData, &jobsResponse)
if err != nil {
log.Errorf("Error parsing SlurmDB Payload JSON response: %v", err)
return jobsResponse, err
}
return jobsResponse, nil
}
func printSlurmInfo(job Job) string {
text := fmt.Sprintf(`
JobId=%v JobName=%v
UserId=%v(%v) GroupId=%v
Account=%v QOS=%v
Requeue=%v Restarts=%v BatchFlag=%v
TimeLimit=%v
SubmitTime=%v
Partition=%v
NodeList=%v
NumNodes=%v NumCPUs=%v NumTasks=%v CPUs/Task=%v
NTasksPerNode:Socket:Core=%v:%v:%v
TRES_req=%v
TRES_alloc=%v
Command=%v
WorkDir=%v
StdErr=%v
StdOut=%v`,
job.JobID, job.Name,
job.UserName, job.UserID, job.GroupID,
job.Account, job.QoS,
job.Requeue, job.RestartCnt, job.BatchFlag,
job.TimeLimit, job.SubmitTime,
job.Partition,
job.Nodes,
job.NodeCount, job.CPUs, job.Tasks, job.CPUPerTask,
job.TasksPerBoard, job.TasksPerSocket, job.TasksPerCore,
job.TresAllocStr,
job.TresAllocStr,
job.Command,
job.CurrentWorkingDirectory,
job.StandardError,
job.StandardOutput,
)
return text
}
func exitWithError(err error, output []byte) {
if exitError, ok := err.(*exec.ExitError); ok {
if exitError.ExitCode() == 28 {
fmt.Fprintf(os.Stderr, "ERROR: API call failed with timeout; check slurmrestd.\nOutput:\n%s\n", output)
} else {
fmt.Fprintf(os.Stderr, "ERROR: API call failed with code %d;\nOutput:\n%s\n", exitError.ExitCode(), output)
}
} else {
log.Errorf("ERROR: %v", err)
}
os.Exit(1)
}
func (cfg *SlurmRestSchedulerConfig) Init() error {
var err error
cfg.clusterConfig, err = DecodeClusterConfig("cluster-alex.json")
cfg.nodeClusterMap = make(map[string]string)
cfg.clusterPCIEAddressMap = make(map[string][]string)
for _, subCluster := range cfg.clusterConfig.SubClusters {
cfg.ConstructNodeClusterMap(subCluster.Nodes, subCluster.Name)
pcieAddresses := make([]string, 0, 32)
for idx, accelerator := range subCluster.Topology.Accelerators {
pcieAddresses[idx] = accelerator.ID
}
cfg.clusterPCIEAddressMap[subCluster.Name] = pcieAddresses
}
// Create an HTTP client
cfg.client = &http.Client{}
return err
}
func (cfg *SlurmRestSchedulerConfig) checkAndHandleStopJob(job *schema.Job, req *StopJobRequest) {
// Sanity checks
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
log.Errorf("stopTime must be larger than startTime and only running jobs can be stopped")
return
}
if req.State != "" && !req.State.Valid() {
log.Errorf("invalid job state: %#v", req.State)
return
} else if req.State == "" {
req.State = schema.JobStateCompleted
}
// Mark job as stopped in the database (update state and duration)
job.Duration = int32(req.StopTime - job.StartTime.Unix())
job.State = req.State
if err := cfg.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
log.Errorf("marking job as stopped failed: %s", err.Error())
return
}
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime)
// Monitoring is disabled...
if job.MonitoringStatus == schema.MonitoringStatusDisabled {
return
}
// Trigger async archiving
cfg.JobRepository.TriggerArchiving(job)
}
func (cfg *SlurmRestSchedulerConfig) ConstructNodeClusterMap(nodes string, cluster string) {
if cfg.nodeClusterMap == nil {
cfg.nodeClusterMap = make(map[string]string)
}
// Split the input by commas
groups := strings.Split(nodes, ",")
for _, group := range groups {
// Use regular expressions to match numbers and ranges
numberRangeRegex := regexp.MustCompile(`a\[(\d+)-(\d+)\]`)
numberRegex := regexp.MustCompile(`a(\d+)`)
if numberRangeRegex.MatchString(group) {
// Extract nodes from ranges
matches := numberRangeRegex.FindStringSubmatch(group)
if len(matches) == 3 {
start, _ := strconv.Atoi(matches[1])
end, _ := strconv.Atoi(matches[2])
for i := start; i <= end; i++ {
cfg.nodeClusterMap[matches[0]+fmt.Sprintf("%04d", i)] = cluster
}
}
} else if numberRegex.MatchString(group) {
// Extract individual node
matches := numberRegex.FindStringSubmatch(group)
if len(matches) == 2 {
cfg.nodeClusterMap[group] = cluster
}
}
}
}
func extractElements(indicesStr string, addresses []string) ([]string, error) {
// Split the input string by commas to get individual index ranges
indexRanges := strings.Split(indicesStr, ",")
var selected []string
for _, indexRange := range indexRanges {
// Split each index range by hyphen to separate start and end indices
rangeParts := strings.Split(indexRange, "-")
if len(rangeParts) == 1 {
// If there's only one part, it's a single index
index, err := strconv.Atoi(rangeParts[0])
if err != nil {
return nil, err
}
selected = append(selected, addresses[index])
} else if len(rangeParts) == 2 {
// If there are two parts, it's a range
start, err := strconv.Atoi(rangeParts[0])
if err != nil {
return nil, err
}
end, err := strconv.Atoi(rangeParts[1])
if err != nil {
return nil, err
}
// Add all indices in the range to the result
for i := start; i <= end; i++ {
selected = append(selected, addresses[i])
}
} else {
// Invalid format
return nil, fmt.Errorf("invalid index range: %s", indexRange)
}
}
return selected, nil
}
func (cfg *SlurmRestSchedulerConfig) CreateJobMeta(job Job) (*schema.JobMeta, error) {
var exclusive int32
if job.Shared == nil {
exclusive = 1
} else {
exclusive = 0
}
totalGPUs := 0
var resources []*schema.Resource
for nodeIndex, node := range job.JobResources.AllocatedNodes {
var res schema.Resource
res.Hostname = node.Nodename
log.Debugf("Node %s Cores map size: %d\n", node.Nodename, len(node.Sockets))
if node.CPUsUsed == nil || node.MemoryAllocated == nil {
log.Fatalf("Either node.Cpus or node.Memory is nil\n")
}
for k, v := range node.Sockets {
fmt.Printf("core id[%s] value[%s]\n", k, v)
threadID, _ := strconv.Atoi(k)
res.HWThreads = append(res.HWThreads, threadID)
}
re := regexp.MustCompile(`\(([^)]*)\)`)
matches := re.FindStringSubmatch(job.GresDetail[nodeIndex])
if len(matches) < 2 {
return nil, fmt.Errorf("no substring found in brackets")
}
nodePCIEAddresses := cfg.clusterPCIEAddressMap[cfg.nodeClusterMap[node.Nodename]]
selectedPCIEAddresses, err := extractElements(matches[1], nodePCIEAddresses)
totalGPUs += len(selectedPCIEAddresses)
if err != nil {
return nil, err
}
// For core/GPU id mapping, need to query from cluster config file
res.Accelerators = selectedPCIEAddresses
resources = append(resources, &res)
}
metaData := make(map[string]string)
metaData["jobName"] = job.Name
metaData["slurmInfo"] = printSlurmInfo(job)
metaDataInBytes, err := json.Marshal(metaData)
if err != nil {
log.Fatalf("metaData JSON marshaling failed: %s", err)
}
var defaultJob schema.BaseJob = schema.BaseJob{
JobID: job.JobID,
User: job.UserName,
Project: job.Account,
Cluster: job.Cluster,
Partition: job.Partition,
// check nil
ArrayJobId: job.ArrayJobID,
NumNodes: job.NodeCount,
NumHWThreads: job.CPUs,
NumAcc: int32(totalGPUs),
Exclusive: exclusive,
// MonitoringStatus: job.MonitoringStatus,
// SMT: job.TasksPerCore,
State: schema.JobState(job.JobState),
// ignore this for start job
// Duration: int32(time.Now().Unix() - job.StartTime), // or SubmitTime?
Walltime: time.Now().Unix(), // max duration requested by the job
// Tags: job.Tags,
// ignore this!
// RawResources: jobResourcesInBytes,
// "job_resources": "allocated_nodes" "sockets":
// very important; has to be right
Resources: resources,
RawMetaData: metaDataInBytes,
// optional metadata with'jobScript 'jobName': 'slurmInfo':
MetaData: metaData,
// ConcurrentJobs: job.ConcurrentJobs,
}
log.Debugf("Generated BaseJob with Resources=%v", defaultJob.Resources[0])
meta := &schema.JobMeta{
BaseJob: defaultJob,
StartTime: job.StartTime,
Statistics: make(map[string]schema.JobStatistics),
}
// log.Debugf("Generated JobMeta %v", req.BaseJob.JobID)
return meta, nil
}
func (cfg *SlurmRestSchedulerConfig) HandleJobs(jobs []Job) error {
// runningJobsInCC, err := cfg.JobRepository.FindRunningJobs("alex")
// Iterate over the Jobs slice
for _, job := range jobs {
// Process each job from Slurm
fmt.Printf("Job ID: %d\n", job.JobID)
fmt.Printf("Job Name: %s\n", job.Name)
fmt.Printf("Job State: %s\n", job.JobState)
fmt.Println("Job StartTime:", job.StartTime)
fmt.Println("Job Cluster:", job.Cluster)
if job.JobState == "RUNNING" {
meta, _ := cfg.CreateJobMeta(job)
// For all running jobs from Slurm
_, notFoundError := cfg.JobRepository.Find(&job.JobID, &job.Cluster, &job.StartTime)
if notFoundError != nil {
// if it does not exist in CC, create a new entry
log.Print("Job does not exist in CC, will create a new entry:", job.JobID)
id, startJobError := cfg.JobRepository.Start(meta)
if startJobError != nil {
return startJobError
}
log.Debug("Added job", id)
}
// Running in both sides: nothing needs to be done
} else if job.JobState == "COMPLETED" {
// Check if completed job with combination of (job_id, cluster_id, start_time) already exists
log.Debugf("Processing completed job ID: %v Cluster: %v StartTime: %v", job.JobID, job.Cluster, job.StartTime)
existingJob, err := cfg.JobRepository.Find(&job.JobID, &job.Cluster, &job.StartTime)
if err == nil && existingJob.State != schema.JobStateCompleted {
// for jobs completed in Slurm (either in Slurm or maybe SlurmDB)
// update job in CC with new info (job final status, duration, end timestamp)
existingJob.BaseJob.Duration = int32(job.EndTime - job.StartTime)
existingJob.BaseJob.State = schema.JobState(job.JobState)
existingJob.BaseJob.Walltime = job.EndTime
req := &StopJobRequest{
Cluster: &job.Cluster,
JobId: &job.JobID,
State: schema.JobState(job.JobState),
StartTime: &job.StartTime,
StopTime: job.EndTime,
}
cfg.checkAndHandleStopJob(existingJob, req)
}
}
}
return nil
}
func (cfg *SlurmRestSchedulerConfig) HandleDumpedJobs(jobs []DumpedJob) error {
// Iterate over the Jobs slice
for _, job := range jobs {
// Process each job from Slurm
fmt.Printf("Job ID: %d\n", job.JobID)
fmt.Printf("Job Name: %s\n", job.Name)
fmt.Printf("Job State: %s\n", job.State.Current)
fmt.Println("Job EndTime:", job.Time.End)
fmt.Println("Job Cluster:", job.Cluster)
// Check if completed job with combination of (job_id, cluster_id, start_time) already exists
log.Debugf("Processing completed dumped job ID: %v Cluster: %v StartTime: %v", job.JobID, job.Cluster, job.Time.Start)
existingJob, err := cfg.JobRepository.Find(&job.JobID, &job.Cluster, &job.Time.Start)
if err == nil && existingJob.State != schema.JobStateCompleted {
// for jobs completed in Slurm (either in Slurm or maybe SlurmDB)
// update job in CC with new info (job final status, duration, end timestamp)
existingJob.BaseJob.Duration = int32(job.Time.End - job.Time.Start)
existingJob.BaseJob.State = schema.JobState(job.State.Current)
existingJob.BaseJob.Walltime = job.Time.End
req := &StopJobRequest{
Cluster: &job.Cluster,
JobId: &job.JobID,
State: schema.JobState(job.State.Current),
StartTime: &job.Time.Start,
StopTime: job.Time.End,
}
cfg.checkAndHandleStopJob(existingJob, req)
}
}
return nil
}
func (cfg *SlurmRestSchedulerConfig) Sync() {
// Fetch an instance of Slurm JobsResponse
jobsResponse, err := fetchJobsLocal()
if err != nil {
log.Fatal(err.Error())
}
cfg.HandleJobs(jobsResponse.Jobs)
// Fetch an instance of Slurm DB JobsResponse
dumpedJobsResponse, err := fetchDumpedJobsLocal()
if err != nil {
log.Fatal(err.Error())
}
cfg.HandleDumpedJobs(dumpedJobsResponse.Jobs)
}

137
tools/nats-manager/main.go Normal file
View File

@@ -0,0 +1,137 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package main
import (
"flag"
"fmt"
"log"
"net/http"
"os"
"github.com/ClusterCockpit/cc-backend/internal/scheduler"
"github.com/nats-io/nats.go"
)
func usage() {
log.Printf("Usage: nats-pub [-s server] [-creds file] <subject> <msg>\n")
flag.PrintDefaults()
}
func showUsageAndExit(exitcode int) {
usage()
os.Exit(exitcode)
}
func setupPublisher() {
var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")
var userCreds = flag.String("creds", "", "User Credentials File")
var showHelp = flag.Bool("h", false, "Show help message")
log.SetFlags(0)
flag.Usage = usage
flag.Parse()
if *showHelp {
showUsageAndExit(0)
}
args := flag.Args()
if len(args) != 2 {
showUsageAndExit(1)
}
fmt.Printf("Hello Nats\n")
// Connect Options.
opts := []nats.Option{nats.Name("NATS Sample Publisher")}
// Use UserCredentials
if *userCreds != "" {
opts = append(opts, nats.UserCredentials(*userCreds))
}
// Connect to NATS
nc, err := nats.Connect(*urls, opts...)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
subj, msg := args[0], []byte(args[1])
nc.Publish(subj, msg)
nc.Flush()
if err := nc.LastError(); err != nil {
log.Fatal(err)
} else {
log.Printf("Published [%s] : '%s'\n", subj, msg)
}
os.Exit(0)
}
func injectPayload() {
// Read the JSON file
jobsData, err := os.ReadFile("slurm_0038.json")
dbData, err := os.ReadFile("slurmdb_0038-large.json")
if err != nil {
fmt.Println("Error reading JSON file:", err)
return
}
// Create an HTTP handler function
http.HandleFunc("/slurm/v0.0.38/jobs", func(w http.ResponseWriter, r *http.Request) {
// Set the response content type to JSON
w.Header().Set("Content-Type", "application/json")
// Write the raw JSON data to the response writer
_, err := w.Write(jobsData)
if err != nil {
http.Error(w, "Error writing jobsData payload", http.StatusInternalServerError)
return
}
})
http.HandleFunc("/slurmdb/v0.0.38/jobs", func(w http.ResponseWriter, r *http.Request) {
// Set the response content type to JSON
w.Header().Set("Content-Type", "application/json")
// Write the raw JSON data to the response writer
_, err := w.Write(dbData)
if err != nil {
http.Error(w, "Error writing dbData payload", http.StatusInternalServerError)
return
}
})
// Start the HTTP server on port 8080
fmt.Println("Listening on :8080...")
go http.ListenAndServe(":8080", nil)
}
func loadSlurmNatsScheduler() {
cfgData := []byte(`{"target": "localhost"}`)
var sch scheduler.SlurmNatsScheduler
// sch.URL = "nats://127.0.0.1:1223"
sch.Init(cfgData)
// go injectPayload()
}
func main() {
var sch scheduler.SlurmRestSchedulerConfig
sch.Init()
// injectPayload()
sch.Sync()
os.Exit(0)
}

View File

@@ -22,7 +22,6 @@
"@rollup/plugin-commonjs": "^24.1.0", "@rollup/plugin-commonjs": "^24.1.0",
"@rollup/plugin-node-resolve": "^15.0.2", "@rollup/plugin-node-resolve": "^15.0.2",
"@rollup/plugin-terser": "^0.4.1", "@rollup/plugin-terser": "^0.4.1",
"@timohausmann/quadtree-js": "^1.2.5",
"rollup": "^3.21.0", "rollup": "^3.21.0",
"rollup-plugin-css-only": "^4.3.0", "rollup-plugin-css-only": "^4.3.0",
"rollup-plugin-svelte": "^7.1.4", "rollup-plugin-svelte": "^7.1.4",
@@ -226,12 +225,6 @@
} }
} }
}, },
"node_modules/@timohausmann/quadtree-js": {
"version": "1.2.5",
"resolved": "https://registry.npmjs.org/@timohausmann/quadtree-js/-/quadtree-js-1.2.5.tgz",
"integrity": "sha512-WcH3pouYtpyLjTCRvNP0WuSV4m7mRyYhLzW44egveFryT7pJhpDsdIJASEe37iCFNA0vmEpqTYGoG0siyXEthA==",
"dev": true
},
"node_modules/@types/estree": { "node_modules/@types/estree": {
"version": "1.0.1", "version": "1.0.1",
"resolved": "https://registry.npmjs.org/@types/estree/-/estree-1.0.1.tgz", "resolved": "https://registry.npmjs.org/@types/estree/-/estree-1.0.1.tgz",

View File

@@ -10,7 +10,6 @@
"@rollup/plugin-commonjs": "^24.1.0", "@rollup/plugin-commonjs": "^24.1.0",
"@rollup/plugin-node-resolve": "^15.0.2", "@rollup/plugin-node-resolve": "^15.0.2",
"@rollup/plugin-terser": "^0.4.1", "@rollup/plugin-terser": "^0.4.1",
"@timohausmann/quadtree-js": "^1.2.5",
"rollup": "^3.21.0", "rollup": "^3.21.0",
"rollup-plugin-css-only": "^4.3.0", "rollup-plugin-css-only": "^4.3.0",
"rollup-plugin-svelte": "^7.1.4", "rollup-plugin-svelte": "^7.1.4",

View File

@@ -10,7 +10,7 @@
import { binsFromFootprint } from './utils.js' import { binsFromFootprint } from './utils.js'
import ScatterPlot from './plots/Scatter.svelte' import ScatterPlot from './plots/Scatter.svelte'
import PlotTable from './PlotTable.svelte' import PlotTable from './PlotTable.svelte'
import RooflineHeatmap from './plots/RooflineHeatmap.svelte' import Roofline from './plots/Roofline.svelte'
const { query: initq } = init() const { query: initq } = init()
@@ -293,7 +293,7 @@
{#each $topQuery.data.topList as te, i} {#each $topQuery.data.topList as te, i}
<tr> <tr>
<td><Icon name="circle-fill" style="color: {colors[i]};"/></td> <td><Icon name="circle-fill" style="color: {colors[i]};"/></td>
{#if groupSelection.key == 'user'} {#if groupSelection.key == 'User'}
<th scope="col"><a href="/monitoring/user/{te.id}?cluster={cluster.name}">{te.id}</a></th> <th scope="col"><a href="/monitoring/user/{te.id}?cluster={cluster.name}">{te.id}</a></th>
{:else} {:else}
<th scope="col"><a href="/monitoring/jobs/?cluster={cluster.name}&project={te.id}&projectMatch=eq">{te.id}</a></th> <th scope="col"><a href="/monitoring/jobs/?cluster={cluster.name}&project={te.id}&projectMatch=eq">{te.id}</a></th>
@@ -315,7 +315,7 @@
{:else if $rooflineQuery.data && cluster} {:else if $rooflineQuery.data && cluster}
<div bind:clientWidth={colWidth2}> <div bind:clientWidth={colWidth2}>
{#key $rooflineQuery.data} {#key $rooflineQuery.data}
<RooflineHeatmap <Roofline
width={colWidth2} height={300} width={colWidth2} height={300}
tiles={$rooflineQuery.data.rooflineHeatmap} tiles={$rooflineQuery.data.rooflineHeatmap}
cluster={cluster.subClusters.length == 1 ? cluster.subClusters[0] : null} cluster={cluster.subClusters.length == 1 ? cluster.subClusters[0] : null}

View File

@@ -4,7 +4,6 @@
groupByScope, groupByScope,
fetchMetricsStore, fetchMetricsStore,
checkMetricDisabled, checkMetricDisabled,
transformDataForRoofline
} from "./utils.js"; } from "./utils.js";
import { import {
Row, Row,
@@ -132,6 +131,7 @@
let plots = {}, let plots = {},
jobTags, jobTags,
fullWidth,
statsTable; statsTable;
$: document.title = $initq.fetching $: document.title = $initq.fetching
? "Loading..." ? "Loading..."
@@ -190,6 +190,7 @@
})); }));
</script> </script>
<div class="row" bind:clientWidth={fullWidth} />
<Row> <Row>
<Col> <Col>
{#if $initq.error} {#if $initq.error}
@@ -244,6 +245,7 @@
{/if} {/if}
<Col> <Col>
<Polar <Polar
size={fullWidth / 4.1}
metrics={ccconfig[ metrics={ccconfig[
`job_view_polarPlotMetrics:${$initq.data.job.cluster}` `job_view_polarPlotMetrics:${$initq.data.job.cluster}`
] || ccconfig[`job_view_polarPlotMetrics`]} ] || ccconfig[`job_view_polarPlotMetrics`]}
@@ -253,18 +255,19 @@
</Col> </Col>
<Col> <Col>
<Roofline <Roofline
renderTime={true} width={fullWidth / 3 - 10}
height={fullWidth / 5}
cluster={clusters cluster={clusters
.find((c) => c.name == $initq.data.job.cluster) .find((c) => c.name == $initq.data.job.cluster)
.subClusters.find( .subClusters.find(
(sc) => sc.name == $initq.data.job.subCluster (sc) => sc.name == $initq.data.job.subCluster
)} )}
data={ flopsAny={$jobMetrics.data.jobMetrics.find(
transformDataForRoofline ( (m) => m.name == "flops_any" && m.scope == "node"
$jobMetrics.data.jobMetrics.find((m) => m.name == "flops_any" && m.scope == "node").metric, )}
$jobMetrics.data.jobMetrics.find((m) => m.name == "mem_bw" && m.scope == "node").metric memBw={$jobMetrics.data.jobMetrics.find(
) (m) => m.name == "mem_bw" && m.scope == "node"
} )}
/> />
</Col> </Col>
{:else} {:else}
@@ -272,7 +275,8 @@
<Col /> <Col />
{/if} {/if}
</Row> </Row>
<Row class="mb-3"> <br />
<Row>
<Col xs="auto"> <Col xs="auto">
{#if $initq.data} {#if $initq.data}
<TagManagement job={$initq.data.job} bind:jobTags /> <TagManagement job={$initq.data.job} bind:jobTags />
@@ -289,6 +293,7 @@
<Zoom timeseriesPlots={plots} /> <Zoom timeseriesPlots={plots} />
</Col> --> </Col> -->
</Row> </Row>
<br />
<Row> <Row>
<Col> <Col>
{#if $jobMetrics.error} {#if $jobMetrics.error}
@@ -335,7 +340,8 @@
{/if} {/if}
</Col> </Col>
</Row> </Row>
<Row class="mt-2"> <br />
<Row>
<Col> <Col>
{#if $initq.data} {#if $initq.data}
<TabContent> <TabContent>

View File

@@ -1,7 +1,7 @@
<script> <script>
import { getContext } from "svelte"; import { getContext } from "svelte";
import Refresher from "./joblist/Refresher.svelte"; import Refresher from "./joblist/Refresher.svelte";
import Roofline from "./plots/Roofline.svelte"; import Roofline, { transformPerNodeData } from "./plots/Roofline.svelte";
import Pie, { colors } from "./plots/Pie.svelte"; import Pie, { colors } from "./plots/Pie.svelte";
import Histogram from "./plots/Histogram.svelte"; import Histogram from "./plots/Histogram.svelte";
import { import {
@@ -16,7 +16,7 @@
Progress, Progress,
Icon, Icon,
} from "sveltestrap"; } from "sveltestrap";
import { init, convert2uplot, transformPerNodeDataForRoofline } from "./utils.js"; import { init, convert2uplot } from "./utils.js";
import { scaleNumbers } from "./units.js"; import { scaleNumbers } from "./units.js";
import { import {
queryStore, queryStore,
@@ -31,8 +31,8 @@
export let cluster; export let cluster;
let plotWidths = [], let plotWidths = [],
colWidth1, colWidth1 = 0,
colWidth2 colWidth2;
let from = new Date(Date.now() - 5 * 60 * 1000), let from = new Date(Date.now() - 5 * 60 * 1000),
to = new Date(Date.now()); to = new Date(Date.now());
const topOptions = [ const topOptions = [
@@ -427,17 +427,16 @@
<div bind:clientWidth={plotWidths[i]}> <div bind:clientWidth={plotWidths[i]}>
{#key $mainQuery.data.nodeMetrics} {#key $mainQuery.data.nodeMetrics}
<Roofline <Roofline
allowSizeChange={true}
width={plotWidths[i] - 10} width={plotWidths[i] - 10}
height={300} height={300}
colorDots={true}
showTime={false}
cluster={subCluster} cluster={subCluster}
data={ data={transformPerNodeData(
transformPerNodeDataForRoofline( $mainQuery.data.nodeMetrics.filter(
$mainQuery.data.nodeMetrics.filter( (data) => data.subCluster == subCluster.name
(data) => data.subCluster == subCluster.name
)
) )
} )}
/> />
{/key} {/key}
</div> </div>
@@ -445,7 +444,7 @@
</Row> </Row>
{/each} {/each}
<hr/> <hr style="margin-top: -1em;" />
<!-- Usage Stats as Histograms --> <!-- Usage Stats as Histograms -->

View File

@@ -22,10 +22,10 @@
LineElement LineElement
); );
export let size
export let metrics export let metrics
export let cluster export let cluster
export let jobMetrics export let jobMetrics
export let height = 365
const metricConfig = getContext('metrics') const metricConfig = getContext('metrics')
@@ -89,19 +89,13 @@
// No custom defined options but keep for clarity // No custom defined options but keep for clarity
const options = { const options = {
maintainAspectRatio: false, maintainAspectRatio: false,
animation: false, animation: false
scales: { // fix scale
r: {
suggestedMin: 0.0,
suggestedMax: 1.0
}
}
} }
</script> </script>
<div class="chart-container"> <div class="chart-container">
<Radar {data} {options} {height}/> <Radar {data} {options} width={size} height={size}/>
</div> </div>
<style> <style>

View File

@@ -1,52 +1,42 @@
<script> <div class="cc-plot">
import uPlot from 'uplot' <canvas bind:this={canvasElement} width="{prevWidth}" height="{prevHeight}"></canvas>
import { formatNumber } from '../units.js' </div>
import { onMount, onDestroy } from 'svelte'
import { Card } from 'sveltestrap'
export let data = null <script context="module">
export let renderTime = false const axesColor = '#aaaaaa'
export let allowSizeChange = false const tickFontSize = 10
export let cluster = null const labelFontSize = 12
export let width = 600 const fontFamily = 'system-ui, -apple-system, "Segoe UI", Roboto, "Helvetica Neue", Arial, "Noto Sans", sans-serif, "Apple Color Emoji", "Segoe UI Emoji", "Segoe UI Symbol", "Noto Color Emoji"'
export let height = 350 const paddingLeft = 40,
paddingRight = 10,
paddingTop = 10,
paddingBottom = 50
let plotWrapper = null
let uplot = null
let timeoutId = null
/* Data Format
* data = [null, [], []] // 0: null-axis required for scatter, 1: Array of XY-Array for Scatter, 2: Optional Time Info
* data[1][0] = [100, 200, 500, ...] // X Axis -> Intensity (Vals up to clusters' flopRateScalar value)
* data[1][1] = [1000, 2000, 1500, ...] // Y Axis -> Performance (Vals up to clusters' flopRateSimd value)
* data[2] = [0.1, 0.15, 0.2, ...] // Color Code -> Time Information (Floats from 0 to 1) (Optional)
*/
// Helpers
function getGradientR(x) { function getGradientR(x) {
if (x < 0.5) return 0 if (x < 0.5) return 0
if (x > 0.75) return 255 if (x > 0.75) return 255
x = (x - 0.5) * 4.0 x = (x - 0.5) * 4.0
return Math.floor(x * 255.0) return Math.floor(x * 255.0)
} }
function getGradientG(x) { function getGradientG(x) {
if (x > 0.25 && x < 0.75) return 255 if (x > 0.25 && x < 0.75) return 255
if (x < 0.25) x = x * 4.0 if (x < 0.25) x = x * 4.0
else x = 1.0 - (x - 0.75) * 4.0 else x = 1.0 - (x - 0.75) * 4.0
return Math.floor(x * 255.0) return Math.floor(x * 255.0)
} }
function getGradientB(x) { function getGradientB(x) {
if (x < 0.25) return 255 if (x < 0.25) return 255
if (x > 0.5) return 0 if (x > 0.5) return 0
x = 1.0 - (x - 0.25) * 4.0 x = 1.0 - (x - 0.25) * 4.0
return Math.floor(x * 255.0) return Math.floor(x * 255.0)
} }
function getRGB(c) { function getRGB(c) {
return `rgb(${getGradientR(c)}, ${getGradientG(c)}, ${getGradientB(c)})` return `rgb(${getGradientR(c)}, ${getGradientG(c)}, ${getGradientB(c)})`
} }
function nearestThousand (num) {
return Math.ceil(num/1000) * 1000
}
function lineIntersect(x1, y1, x2, y2, x3, y3, x4, y4) { function lineIntersect(x1, y1, x2, y2, x3, y3, x4, y4) {
let l = (y4 - y3) * (x2 - x1) - (x4 - x3) * (y2 - y1) let l = (y4 - y3) * (x2 - x1) - (x4 - x3) * (y2 - y1)
let a = ((x4 - x3) * (y1 - y3) - (y4 - y3) * (x1 - x3)) / l let a = ((x4 - x3) * (y1 - y3) - (y4 - y3) * (x1 - x3)) / l
@@ -55,194 +45,314 @@
y: y1 + a * (y2 - y1) y: y1 + a * (y2 - y1)
} }
} }
// End Helpers
// Dot Renderers function axisStepFactor(i, size) {
const drawColorPoints = (u, seriesIdx, idx0, idx1) => { if (size && size < 500)
const size = 5 * devicePixelRatio; return 10
uPlot.orient(u, seriesIdx, (series, dataX, dataY, scaleX, scaleY, valToPosX, valToPosY, xOff, yOff, xDim, yDim, moveTo, lineTo, rect, arc) => {
let d = u.data[seriesIdx];
let deg360 = 2 * Math.PI;
for (let i = 0; i < d[0].length; i++) {
let p = new Path2D();
let xVal = d[0][i];
let yVal = d[1][i];
u.ctx.strokeStyle = getRGB(u.data[2][i])
u.ctx.fillStyle = getRGB(u.data[2][i])
if (xVal >= scaleX.min && xVal <= scaleX.max && yVal >= scaleY.min && yVal <= scaleY.max) {
let cx = valToPosX(xVal, scaleX, xDim, xOff);
let cy = valToPosY(yVal, scaleY, yDim, yOff);
p.moveTo(cx + size/2, cy); if (i % 3 == 0)
arc(p, cx, cy, size/2, 0, deg360); return 2
} else if (i % 3 == 1)
u.ctx.fill(p); return 2.5
else
return 2
}
function render(ctx, data, cluster, width, height, colorDots, showTime, defaultMaxY) {
if (width <= 0)
return
const [minX, maxX, minY, maxY] = [0.01, 1000, 1., cluster?.flopRateSimd?.value || defaultMaxY]
const w = width - paddingLeft - paddingRight
const h = height - paddingTop - paddingBottom
// Helpers:
const [log10minX, log10maxX, log10minY, log10maxY] =
[Math.log10(minX), Math.log10(maxX), Math.log10(minY), Math.log10(maxY)]
/* Value -> Pixel-Coordinate */
const getCanvasX = (x) => {
x = Math.log10(x)
x -= log10minX; x /= (log10maxX - log10minX)
return Math.round((x * w) + paddingLeft)
}
const getCanvasY = (y) => {
y = Math.log10(y)
y -= log10minY
y /= (log10maxY - log10minY)
return Math.round((h - y * h) + paddingTop)
}
// Axes
ctx.fillStyle = 'black'
ctx.strokeStyle = axesColor
ctx.font = `${tickFontSize}px ${fontFamily}`
ctx.beginPath()
for (let x = minX, i = 0; x <= maxX; i++) {
let px = getCanvasX(x)
let text = formatNumber(x)
let textWidth = ctx.measureText(text).width
ctx.fillText(text,
Math.floor(px - (textWidth / 2)),
height - paddingBottom + tickFontSize + 5)
ctx.moveTo(px, paddingTop - 5)
ctx.lineTo(px, height - paddingBottom + 5)
x *= axisStepFactor(i, w)
}
if (data.xLabel) {
ctx.font = `${labelFontSize}px ${fontFamily}`
let textWidth = ctx.measureText(data.xLabel).width
ctx.fillText(data.xLabel, Math.floor((width / 2) - (textWidth / 2)), height - 20)
}
ctx.textAlign = 'center'
ctx.font = `${tickFontSize}px ${fontFamily}`
for (let y = minY, i = 0; y <= maxY; i++) {
let py = getCanvasY(y)
ctx.moveTo(paddingLeft - 5, py)
ctx.lineTo(width - paddingRight + 5, py)
ctx.save()
ctx.translate(paddingLeft - 10, py)
ctx.rotate(-Math.PI / 2)
ctx.fillText(formatNumber(y), 0, 0)
ctx.restore()
y *= axisStepFactor(i)
}
if (data.yLabel) {
ctx.font = `${labelFontSize}px ${fontFamily}`
ctx.save()
ctx.translate(15, Math.floor(height / 2))
ctx.rotate(-Math.PI / 2)
ctx.fillText(data.yLabel, 0, 0)
ctx.restore()
}
ctx.stroke()
// Draw Data
if (data.x && data.y) {
for (let i = 0; i < data.x.length; i++) {
let x = data.x[i], y = data.y[i], c = data.c[i]
if (x == null || y == null || Number.isNaN(x) || Number.isNaN(y))
continue
const s = 3
const px = getCanvasX(x)
const py = getCanvasY(y)
ctx.fillStyle = getRGB(c)
ctx.beginPath()
ctx.arc(px, py, s, 0, Math.PI * 2, false)
ctx.fill()
} }
}); } else if (data.tiles) {
return null; const rows = data.tiles.length
}; const cols = data.tiles[0].length
const drawPoints = (u, seriesIdx, idx0, idx1) => { const tileWidth = Math.ceil(w / cols)
const size = 5 * devicePixelRatio; const tileHeight = Math.ceil(h / rows)
uPlot.orient(u, seriesIdx, (series, dataX, dataY, scaleX, scaleY, valToPosX, valToPosY, xOff, yOff, xDim, yDim, moveTo, lineTo, rect, arc) => {
let d = u.data[seriesIdx]; let max = data.tiles.reduce((max, row) =>
u.ctx.strokeStyle = getRGB(0); Math.max(max, row.reduce((max, val) =>
u.ctx.fillStyle = getRGB(0); Math.max(max, val)), 0), 0)
let deg360 = 2 * Math.PI;
let p = new Path2D(); if (max == 0)
for (let i = 0; i < d[0].length; i++) { max = 1
let xVal = d[0][i];
let yVal = d[1][i]; const tileColor = val => `rgba(255, 0, 0, ${(val / max)})`
if (xVal >= scaleX.min && xVal <= scaleX.max && yVal >= scaleY.min && yVal <= scaleY.max) {
let cx = valToPosX(xVal, scaleX, xDim, xOff); for (let i = 0; i < rows; i++) {
let cy = valToPosY(yVal, scaleY, yDim, yOff); for (let j = 0; j < cols; j++) {
p.moveTo(cx + size/2, cy); let px = paddingLeft + (j / cols) * w
arc(p, cx, cy, size/2, 0, deg360); let py = paddingTop + (h - (i / rows) * h) - tileHeight
ctx.fillStyle = tileColor(data.tiles[i][j])
ctx.fillRect(px, py, tileWidth, tileHeight)
} }
} }
u.ctx.fill(p); }
});
return null;
};
// Main Function // Draw roofs
function render(plotData) { ctx.strokeStyle = 'black'
if (plotData) { ctx.lineWidth = 2
const opts = { ctx.beginPath()
title: "", if (cluster != null) {
mode: 2, const ycut = 0.01 * cluster.memoryBandwidth.value
width: width, const scalarKnee = (cluster.flopRateScalar.value - ycut) / cluster.memoryBandwidth.value
height: height, const simdKnee = (cluster.flopRateSimd.value - ycut) / cluster.memoryBandwidth.value
legend: { const scalarKneeX = getCanvasX(scalarKnee),
show: false simdKneeX = getCanvasX(simdKnee),
}, flopRateScalarY = getCanvasY(cluster.flopRateScalar.value),
cursor: { drag: { x: false, y: false } }, flopRateSimdY = getCanvasY(cluster.flopRateSimd.value)
axes: [
{
label: 'Intensity [FLOPS/Byte]',
values: (u, vals) => vals.map(v => formatNumber(v))
},
{
label: 'Performace [GFLOPS]',
values: (u, vals) => vals.map(v => formatNumber(v))
}
],
scales: {
x: {
time: false,
range: [0.01, 1000],
distr: 3, // Render as log
log: 10, // log exp
},
y: {
range: [1.0, cluster?.flopRateSimd?.value ? nearestThousand(cluster.flopRateSimd.value) : 10000],
distr: 3, // Render as log
log: 10, // log exp
},
},
series: [
{},
{ paths: renderTime ? drawColorPoints : drawPoints }
],
hooks: {
drawClear: [
u => {
u.series.forEach((s, i) => {
if (i > 0)
s._paths = null;
});
},
],
draw: [
u => { // draw roofs when cluster set
// console.log(u)
if (cluster != null) {
const padding = u._padding // [top, right, bottom, left]
u.ctx.strokeStyle = 'black' if (scalarKneeX < width - paddingRight) {
u.ctx.lineWidth = 2 ctx.moveTo(scalarKneeX, flopRateScalarY)
u.ctx.beginPath() ctx.lineTo(width - paddingRight, flopRateScalarY)
}
const ycut = 0.01 * cluster.memoryBandwidth.value if (simdKneeX < width - paddingRight) {
const scalarKnee = (cluster.flopRateScalar.value - ycut) / cluster.memoryBandwidth.value ctx.moveTo(simdKneeX, flopRateSimdY)
const simdKnee = (cluster.flopRateSimd.value - ycut) / cluster.memoryBandwidth.value ctx.lineTo(width - paddingRight, flopRateSimdY)
const scalarKneeX = u.valToPos(scalarKnee, 'x', true), // Value, axis, toCanvasPixels }
simdKneeX = u.valToPos(simdKnee, 'x', true),
flopRateScalarY = u.valToPos(cluster.flopRateScalar.value, 'y', true),
flopRateSimdY = u.valToPos(cluster.flopRateSimd.value, 'y', true)
if (scalarKneeX < width - padding[1]) { // Top horizontal roofline let x1 = getCanvasX(0.01),
u.ctx.moveTo(scalarKneeX, flopRateScalarY) y1 = getCanvasY(ycut),
u.ctx.lineTo(width - padding[1], flopRateScalarY) x2 = getCanvasX(simdKnee),
} y2 = flopRateSimdY
if (simdKneeX < width - padding[1]) { // Lower horitontal roofline let xAxisIntersect = lineIntersect(
u.ctx.moveTo(simdKneeX, flopRateSimdY) x1, y1, x2, y2,
u.ctx.lineTo(width - padding[1], flopRateSimdY) 0, height - paddingBottom, width, height - paddingBottom)
}
let x1 = u.valToPos(0.01, 'x', true), if (xAxisIntersect.x > x1) {
y1 = u.valToPos(ycut, 'y', true) x1 = xAxisIntersect.x
y1 = xAxisIntersect.y
}
let x2 = u.valToPos(simdKnee, 'x', true), ctx.moveTo(x1, y1)
y2 = flopRateSimdY ctx.lineTo(x2, y2)
}
ctx.stroke()
let xAxisIntersect = lineIntersect( if (colorDots && showTime && data.x && data.y) {
x1, y1, x2, y2, // The Color Scale For Time Information
u.valToPos(0.01, 'x', true), u.valToPos(1.0, 'y', true), // X-Axis Start Coords ctx.fillStyle = 'black'
u.valToPos(1000, 'x', true), u.valToPos(1.0, 'y', true) // X-Axis End Coords ctx.fillText('Time:', 17, height - 5)
) const start = paddingLeft + 5
for (let x = start; x < width - paddingRight; x += 15) {
if (xAxisIntersect.x > x1) { let c = (x - start) / (width - start - paddingRight)
x1 = xAxisIntersect.x ctx.fillStyle = getRGB(c)
y1 = xAxisIntersect.y ctx.beginPath()
} ctx.arc(x, height - 10, 5, 0, Math.PI * 2, false)
ctx.fill()
// Diagonal }
u.ctx.moveTo(x1, y1)
u.ctx.lineTo(x2, y2)
u.ctx.stroke()
// Reset grid lineWidth
u.ctx.lineWidth = 0.15
}
}
]
},
};
uplot = new uPlot(opts, plotData, plotWrapper);
} else {
console.log('No data for roofline!')
} }
} }
// Svelte and Sizechange function transformData(flopsAny, memBw, colorDots) { // Uses Metric Object
onMount(() => { const nodes = flopsAny.series.length
render(data) const timesteps = flopsAny.series[0].data.length
})
onDestroy(() => {
if (uplot)
uplot.destroy()
if (timeoutId != null) /* c will contain values from 0 to 1 representing the time */
clearTimeout(timeoutId) const x = [], y = [], c = []
})
function sizeChanged() {
if (timeoutId != null)
clearTimeout(timeoutId)
timeoutId = setTimeout(() => { if (flopsAny && memBw) {
timeoutId = null for (let i = 0; i < nodes; i++) {
if (uplot) const flopsData = flopsAny.series[i].data
uplot.destroy() const memBwData = memBw.series[i].data
render(data) for (let j = 0; j < timesteps; j++) {
}, 200) const f = flopsData[j], m = memBwData[j]
const intensity = f / m
if (Number.isNaN(intensity) || !Number.isFinite(intensity))
continue
x.push(intensity)
y.push(f)
c.push(colorDots ? j / timesteps : 0)
}
}
} else {
console.warn("transformData: metrics for 'mem_bw' and/or 'flops_any' missing!")
}
return {
x, y, c,
xLabel: 'Intensity [FLOPS/byte]',
yLabel: 'Performance [GFLOPS]'
}
}
// Return something to be plotted. The argument shall be the result of the
// `nodeMetrics` GraphQL query.
export function transformPerNodeData(nodes) {
const x = [], y = [], c = []
for (let node of nodes) {
let flopsAny = node.metrics.find(m => m.name == 'flops_any' && m.scope == 'node')?.metric
let memBw = node.metrics.find(m => m.name == 'mem_bw' && m.scope == 'node')?.metric
if (!flopsAny || !memBw) {
console.warn("transformPerNodeData: metrics for 'mem_bw' and/or 'flops_any' missing!")
continue
}
let flopsData = flopsAny.series[0].data, memBwData = memBw.series[0].data
const f = flopsData[flopsData.length - 1], m = memBwData[flopsData.length - 1]
const intensity = f / m
if (Number.isNaN(intensity) || !Number.isFinite(intensity))
continue
x.push(intensity)
y.push(f)
c.push(0)
}
return {
x, y, c,
xLabel: 'Intensity [FLOPS/byte]',
yLabel: 'Performance [GFLOPS]'
}
} }
$: if (allowSizeChange) sizeChanged(width, height)
</script> </script>
{#if data != null} <script>
<div bind:this={plotWrapper}/> import { onMount, tick } from 'svelte'
{:else} import { formatNumber } from '../units.js'
<Card class="mx-4" body color="warning">Cannot render roofline: No data!</Card>
{/if} export let flopsAny = null
export let memBw = null
export let cluster = null
export let maxY = null
export let width = 500
export let height = 300
export let tiles = null
export let colorDots = true
export let showTime = true
export let data = null
console.assert(data || tiles || (flopsAny && memBw), "you must provide flopsAny and memBw or tiles!")
let ctx, canvasElement, prevWidth = width, prevHeight = height
data = data != null ? data : (flopsAny && memBw
? transformData(flopsAny.metric, memBw.metric, colorDots) // Use Metric Object from Parent
: {
tiles: tiles,
xLabel: 'Intensity [FLOPS/byte]',
yLabel: 'Performance [GFLOPS]'
})
onMount(() => {
ctx = canvasElement.getContext('2d')
if (prevWidth != width || prevHeight != height) {
sizeChanged()
return
}
canvasElement.width = width
canvasElement.height = height
render(ctx, data, cluster, width, height, colorDots, showTime, maxY)
})
let timeoutId = null
function sizeChanged() {
if (!ctx)
return
if (timeoutId != null)
clearTimeout(timeoutId)
prevWidth = width
prevHeight = height
timeoutId = setTimeout(() => {
if (!canvasElement)
return
timeoutId = null
canvasElement.width = width
canvasElement.height = height
render(ctx, data, cluster, width, height, colorDots, showTime, maxY)
}, 250)
}
$: sizeChanged(width, height)
</script>

View File

@@ -1,234 +0,0 @@
<div class="cc-plot">
<canvas bind:this={canvasElement} width="{prevWidth}" height="{prevHeight}"></canvas>
</div>
<script context="module">
const axesColor = '#aaaaaa'
const tickFontSize = 10
const labelFontSize = 12
const fontFamily = 'system-ui, -apple-system, "Segoe UI", Roboto, "Helvetica Neue", Arial, "Noto Sans", sans-serif, "Apple Color Emoji", "Segoe UI Emoji", "Segoe UI Symbol", "Noto Color Emoji"'
const paddingLeft = 40,
paddingRight = 10,
paddingTop = 10,
paddingBottom = 50
function lineIntersect(x1, y1, x2, y2, x3, y3, x4, y4) {
let l = (y4 - y3) * (x2 - x1) - (x4 - x3) * (y2 - y1)
let a = ((x4 - x3) * (y1 - y3) - (y4 - y3) * (x1 - x3)) / l
return {
x: x1 + a * (x2 - x1),
y: y1 + a * (y2 - y1)
}
}
function axisStepFactor(i, size) {
if (size && size < 500)
return 10
if (i % 3 == 0)
return 2
else if (i % 3 == 1)
return 2.5
else
return 2
}
function render(ctx, data, cluster, width, height, defaultMaxY) {
if (width <= 0)
return
const [minX, maxX, minY, maxY] = [0.01, 1000, 1., cluster?.flopRateSimd?.value || defaultMaxY]
const w = width - paddingLeft - paddingRight
const h = height - paddingTop - paddingBottom
// Helpers:
const [log10minX, log10maxX, log10minY, log10maxY] =
[Math.log10(minX), Math.log10(maxX), Math.log10(minY), Math.log10(maxY)]
/* Value -> Pixel-Coordinate */
const getCanvasX = (x) => {
x = Math.log10(x)
x -= log10minX; x /= (log10maxX - log10minX)
return Math.round((x * w) + paddingLeft)
}
const getCanvasY = (y) => {
y = Math.log10(y)
y -= log10minY
y /= (log10maxY - log10minY)
return Math.round((h - y * h) + paddingTop)
}
// Axes
ctx.fillStyle = 'black'
ctx.strokeStyle = axesColor
ctx.font = `${tickFontSize}px ${fontFamily}`
ctx.beginPath()
for (let x = minX, i = 0; x <= maxX; i++) {
let px = getCanvasX(x)
let text = formatNumber(x)
let textWidth = ctx.measureText(text).width
ctx.fillText(text,
Math.floor(px - (textWidth / 2)),
height - paddingBottom + tickFontSize + 5)
ctx.moveTo(px, paddingTop - 5)
ctx.lineTo(px, height - paddingBottom + 5)
x *= axisStepFactor(i, w)
}
if (data.xLabel) {
ctx.font = `${labelFontSize}px ${fontFamily}`
let textWidth = ctx.measureText(data.xLabel).width
ctx.fillText(data.xLabel, Math.floor((width / 2) - (textWidth / 2)), height - 20)
}
ctx.textAlign = 'center'
ctx.font = `${tickFontSize}px ${fontFamily}`
for (let y = minY, i = 0; y <= maxY; i++) {
let py = getCanvasY(y)
ctx.moveTo(paddingLeft - 5, py)
ctx.lineTo(width - paddingRight + 5, py)
ctx.save()
ctx.translate(paddingLeft - 10, py)
ctx.rotate(-Math.PI / 2)
ctx.fillText(formatNumber(y), 0, 0)
ctx.restore()
y *= axisStepFactor(i)
}
if (data.yLabel) {
ctx.font = `${labelFontSize}px ${fontFamily}`
ctx.save()
ctx.translate(15, Math.floor(height / 2))
ctx.rotate(-Math.PI / 2)
ctx.fillText(data.yLabel, 0, 0)
ctx.restore()
}
ctx.stroke()
// Draw Data
if (data.tiles) {
const rows = data.tiles.length
const cols = data.tiles[0].length
const tileWidth = Math.ceil(w / cols)
const tileHeight = Math.ceil(h / rows)
let max = data.tiles.reduce((max, row) =>
Math.max(max, row.reduce((max, val) =>
Math.max(max, val)), 0), 0)
if (max == 0)
max = 1
const tileColor = val => `rgba(255, 0, 0, ${(val / max)})`
for (let i = 0; i < rows; i++) {
for (let j = 0; j < cols; j++) {
let px = paddingLeft + (j / cols) * w
let py = paddingTop + (h - (i / rows) * h) - tileHeight
ctx.fillStyle = tileColor(data.tiles[i][j])
ctx.fillRect(px, py, tileWidth, tileHeight)
}
}
}
// Draw roofs
ctx.strokeStyle = 'black'
ctx.lineWidth = 2
ctx.beginPath()
if (cluster != null) {
const ycut = 0.01 * cluster.memoryBandwidth.value
const scalarKnee = (cluster.flopRateScalar.value - ycut) / cluster.memoryBandwidth.value
const simdKnee = (cluster.flopRateSimd.value - ycut) / cluster.memoryBandwidth.value
const scalarKneeX = getCanvasX(scalarKnee),
simdKneeX = getCanvasX(simdKnee),
flopRateScalarY = getCanvasY(cluster.flopRateScalar.value),
flopRateSimdY = getCanvasY(cluster.flopRateSimd.value)
if (scalarKneeX < width - paddingRight) {
ctx.moveTo(scalarKneeX, flopRateScalarY)
ctx.lineTo(width - paddingRight, flopRateScalarY)
}
if (simdKneeX < width - paddingRight) {
ctx.moveTo(simdKneeX, flopRateSimdY)
ctx.lineTo(width - paddingRight, flopRateSimdY)
}
let x1 = getCanvasX(0.01),
y1 = getCanvasY(ycut),
x2 = getCanvasX(simdKnee),
y2 = flopRateSimdY
let xAxisIntersect = lineIntersect(
x1, y1, x2, y2,
0, height - paddingBottom, width, height - paddingBottom)
if (xAxisIntersect.x > x1) {
x1 = xAxisIntersect.x
y1 = xAxisIntersect.y
}
ctx.moveTo(x1, y1)
ctx.lineTo(x2, y2)
}
ctx.stroke()
}
</script>
<script>
import { onMount } from 'svelte'
import { formatNumber } from '../units.js'
export let cluster = null
export let tiles = null
export let maxY = null
export let width = 500
export let height = 300
console.assert(tiles, "you must provide tiles!")
let ctx, canvasElement, prevWidth = width, prevHeight = height
const data = {
tiles: tiles,
xLabel: 'Intensity [FLOPS/byte]',
yLabel: 'Performance [GFLOPS]'
}
onMount(() => {
ctx = canvasElement.getContext('2d')
if (prevWidth != width || prevHeight != height) {
sizeChanged()
return
}
canvasElement.width = width
canvasElement.height = height
render(ctx, data, cluster, width, height, maxY)
})
let timeoutId = null
function sizeChanged() {
if (!ctx)
return
if (timeoutId != null)
clearTimeout(timeoutId)
prevWidth = width
prevHeight = height
timeoutId = setTimeout(() => {
if (!canvasElement)
return
timeoutId = null
canvasElement.width = width
canvasElement.height = height
render(ctx, data, cluster, width, height, maxY)
}, 250)
}
$: sizeChanged(width, height)
</script>

View File

@@ -6,8 +6,8 @@ const power = [1, 1e3, 1e6, 1e9, 1e12, 1e15, 1e18, 1e21]
const prefix = ['', 'K', 'M', 'G', 'T', 'P', 'E'] const prefix = ['', 'K', 'M', 'G', 'T', 'P', 'E']
export function formatNumber(x) { export function formatNumber(x) {
if ( isNaN(x) || x == null) { if ( isNaN(x) ) {
return x // Return if String or Null return x // Return if String , used in Histograms
} else { } else {
for (let i = 0; i < prefix.length; i++) for (let i = 0; i < prefix.length; i++)
if (power[i] <= x && x < power[i+1]) if (power[i] <= x && x < power[i+1])

View File

@@ -6,7 +6,7 @@ import {
} from "@urql/svelte"; } from "@urql/svelte";
import { setContext, getContext, hasContext, onDestroy, tick } from "svelte"; import { setContext, getContext, hasContext, onDestroy, tick } from "svelte";
import { readable } from "svelte/store"; import { readable } from "svelte/store";
// import { formatNumber } from './units.js' import { formatNumber } from './units.js'
/* /*
* Call this function only at component initialization time! * Call this function only at component initialization time!
@@ -326,11 +326,8 @@ export function convert2uplot(canvasData) {
} }
export function binsFromFootprint(weights, scope, values, numBins) { export function binsFromFootprint(weights, scope, values, numBins) {
let min = 0, max = 0 //, median = 0 let min = 0, max = 0
if (values.length != 0) { if (values.length != 0) {
// Extreme, wrong peak vlaues: Filter here or backend?
// median = median(values)
for (let x of values) { for (let x of values) {
min = Math.min(min, x) min = Math.min(min, x)
max = Math.max(max, x) max = Math.max(max, x)
@@ -366,75 +363,3 @@ export function binsFromFootprint(weights, scope, values, numBins) {
bins: bins bins: bins
} }
} }
export function transformDataForRoofline(flopsAny, memBw) { // Uses Metric Objects: {series:[{},{},...], timestep:60, name:$NAME}
const nodes = flopsAny.series.length
const timesteps = flopsAny.series[0].data.length
/* c will contain values from 0 to 1 representing the time */
let data = null
const x = [], y = [], c = []
if (flopsAny && memBw) {
for (let i = 0; i < nodes; i++) {
const flopsData = flopsAny.series[i].data
const memBwData = memBw.series[i].data
for (let j = 0; j < timesteps; j++) {
const f = flopsData[j], m = memBwData[j]
const intensity = f / m
if (Number.isNaN(intensity) || !Number.isFinite(intensity))
continue
x.push(intensity)
y.push(f)
c.push(j / timesteps)
}
}
} else {
console.warn("transformData: metrics for 'mem_bw' and/or 'flops_any' missing!")
}
if (x.length > 0 && y.length > 0 && c.length > 0) {
data = [null, [x, y], c] // for dataformat see roofline.svelte
}
return data
}
// Return something to be plotted. The argument shall be the result of the
// `nodeMetrics` GraphQL query.
export function transformPerNodeDataForRoofline(nodes) {
let data = null
const x = [], y = []
for (let node of nodes) {
let flopsAny = node.metrics.find(m => m.name == 'flops_any' && m.scope == 'node')?.metric
let memBw = node.metrics.find(m => m.name == 'mem_bw' && m.scope == 'node')?.metric
if (!flopsAny || !memBw) {
console.warn("transformPerNodeData: metrics for 'mem_bw' and/or 'flops_any' missing!")
continue
}
let flopsData = flopsAny.series[0].data, memBwData = memBw.series[0].data
const f = flopsData[flopsData.length - 1], m = memBwData[flopsData.length - 1]
const intensity = f / m
if (Number.isNaN(intensity) || !Number.isFinite(intensity))
continue
x.push(intensity)
y.push(f)
}
if (x.length > 0 && y.length > 0) {
data = [null, [x, y], []] // for dataformat see roofline.svelte
}
return data
}
// https://stackoverflow.com/questions/45309447/calculating-median-javascript
// function median(numbers) {
// const sorted = Array.from(numbers).sort((a, b) => a - b);
// const middle = Math.floor(sorted.length / 2);
// if (sorted.length % 2 === 0) {
// return (sorted[middle - 1] + sorted[middle]) / 2;
// }
// return sorted[middle];
// }