19 Commits

Author SHA1 Message Date
Bole Ma
acabdb6a61 Refactored CreateJobMeta 2023-10-26 17:43:54 +02:00
Bole Ma
6ac1182c06 Added SlurmDB Payload 2023-10-26 16:22:26 +02:00
Bole Ma
9fe497173d Added JobResource back to Job 2023-10-26 03:21:51 +02:00
Bole Ma
a3fbdbcf90 Added JSON Payload Converter 2023-10-25 23:27:06 +02:00
Bole Ma
59f6658344 Refactored queryAllJobs 2023-09-26 17:01:59 +02:00
Bole Ma
36b0f33208 Added support for extracting Resources and MetaData 2023-09-26 15:54:33 +02:00
Bole Ma
41cd1171fb Removed deprecated libraries 2023-09-26 02:58:34 +02:00
4c16e44507 Cleanup 2023-09-06 14:29:20 +02:00
8377e51608 Add FindRunningJobs to job repository 2023-09-06 14:28:29 +02:00
Bole Ma
e949f34e94 Merge branch '135-batch-scheduler-integration' of github.com:ClusterCockpit/cc-backend into 135-batch-scheduler-integration 2023-09-05 23:26:38 +02:00
Bole Ma
feeb0231e7 Added code handling OpenAPI structs 2023-09-05 23:26:34 +02:00
d4f09b4679 Remove generated REST client 2023-09-01 07:36:29 +02:00
03175681b6 Merge branch 'master' into 135-batch-scheduler-integration 2023-09-01 07:17:02 +02:00
Bole Ma
a43d03457b Added function queryAllJobs (adapted from Python) 2023-08-25 15:45:07 +02:00
125f3cd254 Show JobId instead ID. Add output 2023-08-21 09:19:18 +02:00
Bole Ma
d9e7a48e2f Added slurmNats code that could not parse BaseJob correctly 2023-08-18 15:38:02 +02:00
Bole Ma
c0f9eb7869 Merge branch 'master' of github.com:ClusterCockpit/cc-backend into 135-batch-scheduler-integration 2023-08-04 14:26:42 +02:00
Bole Ma
7aa7750177 Added test implementation for Nats subscriber 2023-07-07 16:15:05 +02:00
e5f7ad8ac0 Introduce batch scheduler plugins 2023-06-22 08:25:32 +02:00
8 changed files with 1491 additions and 4 deletions

18
go.mod
View File

@@ -18,6 +18,7 @@ require (
github.com/influxdata/influxdb-client-go/v2 v2.12.2
github.com/jmoiron/sqlx v1.3.5
github.com/mattn/go-sqlite3 v1.14.16
github.com/nats-io/nats.go v1.28.0
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.40.0
github.com/qustavo/sqlhooks/v2 v2.1.0
@@ -25,12 +26,14 @@ require (
github.com/swaggo/http-swagger v1.3.3
github.com/swaggo/swag v1.16.1
github.com/vektah/gqlparser/v2 v2.5.8
golang.org/x/crypto v0.12.0
golang.org/x/crypto v0.13.0
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea
)
require (
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/ClusterCockpit/slurm-rest-client-0_0_37 v0.0.0-20230901125459-dc653ac37420 // indirect
github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20231010134848-707e8b20bde7 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/agnivade/levenshtein v1.1.1 // indirect
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
@@ -38,6 +41,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/containerd v1.6.18 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deepmap/oapi-codegen v1.12.4 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect
@@ -56,6 +60,7 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
@@ -64,21 +69,26 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/nats-io/nats-server/v2 v2.9.21 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/swaggo/files v1.0.0 // indirect
github.com/urfave/cli/v2 v2.25.7 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.uber.org/atomic v1.10.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/oauth2 v0.5.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.12.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.30.0 // indirect

31
go.sum
View File

@@ -82,6 +82,12 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
github.com/ClusterCockpit/cc-units v0.4.0 h1:zP5DOu99GmErW0tCDf0gcLrlWt42RQ9dpoONEOh4cI0=
github.com/ClusterCockpit/cc-units v0.4.0/go.mod h1:3S3PAhAayS3pbgcT4q9Vn9VJw22Op51X0YimtG77zBw=
github.com/ClusterCockpit/slurm-rest-client-0_0_37 v0.0.0-20230901125459-dc653ac37420 h1:AeQY40KrLQmSSyyHdbNdAqgln+0+p1dLag5yspE5M8A=
github.com/ClusterCockpit/slurm-rest-client-0_0_37 v0.0.0-20230901125459-dc653ac37420/go.mod h1:oNgVG2puNj9cNw/KgqLbgE1pPOn8jXORX3ErP58LcAA=
github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20230906120742-0f15562ea666 h1:8PofHcOwEMmeAFqJjvAEgnu7rbRHAwJhd2XJ9u/YxiU=
github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20230906120742-0f15562ea666/go.mod h1:vxaj1my0GNoCXx4bYyOTA/IZP/IOZImtdOIn4T7pCa4=
github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20231010134848-707e8b20bde7 h1:YY/qDtFsp1DOJw/jyobiIBiIh1/yD2IVOdcK7EVEIKs=
github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20231010134848-707e8b20bde7/go.mod h1:vxaj1my0GNoCXx4bYyOTA/IZP/IOZImtdOIn4T7pCa4=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/Masterminds/squirrel v1.5.3 h1:YPpoceAcxuzIljlr5iWpNKaql7hLeG1KLSrhvdHpkZc=
@@ -461,6 +467,7 @@ github.com/go-ldap/ldap/v3 v3.4.4/go.mod h1:fe1MsuN5eJJ1FeLT/LEBVdWfNWKh459R7aXg
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
@@ -593,6 +600,7 @@ github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8l
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
@@ -816,6 +824,8 @@ github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdY
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@@ -887,6 +897,7 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr
github.com/maxbrunsfeld/counterfeiter/v6 v6.2.2/go.mod h1:eD9eIE7cdwcMi9rYluz88Jz2VyhSmden33/aXg4oVIY=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/pkcs11 v1.0.3/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
@@ -931,6 +942,15 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/nakagami/firebirdsql v0.0.0-20190310045651-3c02a58cfed8/go.mod h1:86wM1zFnC6/uDBfZGNwB65O+pR2OFi5q/YQaEUid1qA=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/nats-server/v2 v2.9.21 h1:2TBTh0UDE74eNXQmV4HofsmRSCiVN0TH2Wgrp6BD6fk=
github.com/nats-io/nats-server/v2 v2.9.21/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU=
github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c=
github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/neo4j/neo4j-go-driver v1.8.1-0.20200803113522-b626aa943eba/go.mod h1:ncO5VaFWh0Nrt+4KT4mOZboaczBZcLuHrG+/sUeP8gI=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
@@ -1144,6 +1164,7 @@ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1F
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/swaggo/files v1.0.0 h1:1gGXVIeUFCS/dta17rnP0iOpr6CXFwKD7EO5ID233e4=
github.com/swaggo/files v1.0.0/go.mod h1:N59U6URJLyU1PQgFqPM7wXLMhJx7QAolnvfQkqO13kc=
@@ -1167,6 +1188,7 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/vektah/gqlparser/v2 v2.5.8 h1:pm6WOnGdzFOCfcQo9L3+xzW51mKrlwTEg4Wr7AH1JW4=
github.com/vektah/gqlparser/v2 v2.5.8/go.mod h1:z8xXUff237NntSuH8mLFijZ+1tjV1swDbpDqjJmk6ME=
github.com/vishvananda/netlink v0.0.0-20181108222139-023a6dafdcdf/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk=
@@ -1286,6 +1308,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -1409,6 +1433,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/oauth2 v0.0.0-20180227000427-d7d64896b5ff/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -1568,6 +1594,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@@ -1586,6 +1614,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -1595,6 +1625,7 @@ golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@@ -290,6 +290,30 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {
return scanJob(q.RunWith(r.stmtCache).QueryRow())
}
func (r *JobRepository) FindRunningJobs(cluster string) (map[int64]*schema.Job, error) {
jobs := map[int64]*schema.Job{}
q := sq.Select(jobColumns...).From("job").Where("job.cluster = ?", cluster)
q = q.Where("job.job_state = ?", "running")
rows, err := q.RunWith(r.stmtCache).Query()
if err != nil {
log.Errorf("Error while querying running jobs: %v", err)
return nil, err
}
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
rows.Close()
log.Warn("Error while scanning rows (Jobs)")
return nil, err
}
jobs[job.JobID] = job
}
return jobs, nil
}
func (r *JobRepository) FindConcurrentJobs(
ctx context.Context,
job *schema.Job) (*model.JobLinkResultList, error) {

View File

@@ -0,0 +1,479 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package scheduler
import (
"encoding/json"
"fmt"
"os"
"github.com/ClusterCockpit/cc-backend/pkg/log"
)
type MetricConfig struct {
Name string `json:"name"`
Unit struct {
Base string `json:"base"`
} `json:"unit"`
Scope string `json:"scope"`
Aggregation string `json:"aggregation"`
Timestep int `json:"timestep"`
Peak float64 `json:"peak"`
Normal float64 `json:"normal"`
Caution float64 `json:"caution"`
Alert float64 `json:"alert"`
}
type SubCluster struct {
Name string `json:"name"`
Nodes string `json:"nodes"`
ProcessorType string `json:"processorType"`
SocketsPerNode int `json:"socketsPerNode"`
CoresPerSocket int `json:"coresPerSocket"`
ThreadsPerCore int `json:"threadsPerCore"`
FlopRateScalar struct {
Unit struct {
Base string `json:"base"`
Prefix string `json:"prefix"`
} `json:"unit"`
Value float64 `json:"value"`
} `json:"flopRateScalar"`
FlopRateSimd struct {
Unit struct {
Base string `json:"base"`
Prefix string `json:"prefix"`
} `json:"unit"`
Value float64 `json:"value"`
} `json:"flopRateSimd"`
MemoryBandwidth struct {
Unit struct {
Base string `json:"base"`
Prefix string `json:"prefix"`
} `json:"unit"`
Value float64 `json:"value"`
} `json:"memoryBandwidth"`
Topology struct {
Node []int `json:"node"`
Socket [][]int `json:"socket"`
MemoryDomain [][]int `json:"memoryDomain"`
Core [][]int `json:"core"`
Accelerators []struct {
ID string `json:"id"`
Type string `json:"type"`
Model string `json:"model"`
} `json:"accelerators"`
} `json:"topology"`
}
type ClusterConfig struct {
Name string `json:"name"`
MetricConfig []MetricConfig `json:"metricConfig"`
SubClusters []SubCluster `json:"subClusters"`
}
type Metadata struct {
Plugin struct {
Type string `json:"type"`
Name string `json:"name"`
} `json:"plugin"`
Slurm struct {
Version struct {
Major int `json:"major"`
Micro int `json:"micro"`
Minor int `json:"minor"`
} `json:"version"`
Release string `json:"release"`
} `json:"Slurm"`
}
type JobResource struct {
Nodes string `json:"nodes"`
AllocatedCores int `json:"allocated_cores"`
AllocatedHosts int `json:"allocated_hosts"`
AllocatedNodes []AllocatedNode `json:"allocated_nodes"`
}
type AllocatedNode struct {
Sockets map[string]Socket `json:"sockets"`
Nodename string `json:"nodename"`
CPUsUsed *int `json:"cpus_used"`
MemoryUsed *int `json:"memory_used"`
MemoryAllocated *int `json:"memory_allocated"`
}
type Socket struct {
Cores map[string]string `json:"cores"`
}
type Job struct {
Account string `json:"account"`
AccrueTime int `json:"accrue_time"`
AdminComment string `json:"admin_comment"`
ArrayJobID int64 `json:"array_job_id"`
ArrayTaskID interface{} `json:"array_task_id"`
ArrayMaxTasks int `json:"array_max_tasks"`
ArrayTaskString string `json:"array_task_string"`
AssociationID int `json:"association_id"`
BatchFeatures string `json:"batch_features"`
BatchFlag bool `json:"batch_flag"`
BatchHost string `json:"batch_host"`
Flags []string `json:"flags"`
BurstBuffer string `json:"burst_buffer"`
BurstBufferState string `json:"burst_buffer_state"`
Cluster string `json:"cluster"`
ClusterFeatures string `json:"cluster_features"`
Command string `json:"command"`
Comment string `json:"comment"`
Container string `json:"container"`
Contiguous bool `json:"contiguous"`
CoreSpec interface{} `json:"core_spec"`
ThreadSpec interface{} `json:"thread_spec"`
CoresPerSocket interface{} `json:"cores_per_socket"`
BillableTres interface{} `json:"billable_tres"`
CPUPerTask interface{} `json:"cpus_per_task"`
CPUFrequencyMinimum interface{} `json:"cpu_frequency_minimum"`
CPUFrequencyMaximum interface{} `json:"cpu_frequency_maximum"`
CPUFrequencyGovernor interface{} `json:"cpu_frequency_governor"`
CPUPerTres string `json:"cpus_per_tres"`
Deadline int `json:"deadline"`
DelayBoot int `json:"delay_boot"`
Dependency string `json:"dependency"`
DerivedExitCode int `json:"derived_exit_code"`
EligibleTime int `json:"eligible_time"`
EndTime int64 `json:"end_time"`
ExcludedNodes string `json:"excluded_nodes"`
ExitCode int `json:"exit_code"`
Features string `json:"features"`
FederationOrigin string `json:"federation_origin"`
FederationSiblingsActive string `json:"federation_siblings_active"`
FederationSiblingsViable string `json:"federation_siblings_viable"`
GresDetail []string `json:"gres_detail"`
GroupID int `json:"group_id"`
GroupName string `json:"group_name"`
JobID int64 `json:"job_id"`
JobResources JobResource `json:"job_resources"`
JobState string `json:"job_state"`
LastSchedEvaluation int `json:"last_sched_evaluation"`
Licenses string `json:"licenses"`
MaxCPUs int `json:"max_cpus"`
MaxNodes int `json:"max_nodes"`
MCSLabel string `json:"mcs_label"`
MemoryPerTres string `json:"memory_per_tres"`
Name string `json:"name"`
Nodes string `json:"nodes"`
Nice interface{} `json:"nice"`
TasksPerCore interface{} `json:"tasks_per_core"`
TasksPerNode int `json:"tasks_per_node"`
TasksPerSocket interface{} `json:"tasks_per_socket"`
TasksPerBoard int `json:"tasks_per_board"`
CPUs int32 `json:"cpus"`
NodeCount int32 `json:"node_count"`
Tasks int `json:"tasks"`
HETJobID int `json:"het_job_id"`
HETJobIDSet string `json:"het_job_id_set"`
HETJobOffset int `json:"het_job_offset"`
Partition string `json:"partition"`
MemoryPerNode interface{} `json:"memory_per_node"`
MemoryPerCPU int `json:"memory_per_cpu"`
MinimumCPUsPerNode int `json:"minimum_cpus_per_node"`
MinimumTmpDiskPerNode int `json:"minimum_tmp_disk_per_node"`
PreemptTime int `json:"preempt_time"`
PreSUSTime int `json:"pre_sus_time"`
Priority int `json:"priority"`
Profile interface{} `json:"profile"`
QoS string `json:"qos"`
Reboot bool `json:"reboot"`
RequiredNodes string `json:"required_nodes"`
Requeue bool `json:"requeue"`
ResizeTime int `json:"resize_time"`
RestartCnt int `json:"restart_cnt"`
ResvName string `json:"resv_name"`
Shared *string `json:"shared"`
ShowFlags []string `json:"show_flags"`
SocketsPerBoard int `json:"sockets_per_board"`
SocketsPerNode interface{} `json:"sockets_per_node"`
StartTime int64 `json:"start_time"`
StateDescription string `json:"state_description"`
StateReason string `json:"state_reason"`
StandardError string `json:"standard_error"`
StandardInput string `json:"standard_input"`
StandardOutput string `json:"standard_output"`
SubmitTime int `json:"submit_time"`
SuspendTime int `json:"suspend_time"`
SystemComment string `json:"system_comment"`
TimeLimit int `json:"time_limit"`
TimeMinimum int `json:"time_minimum"`
ThreadsPerCore interface{} `json:"threads_per_core"`
TresBind string `json:"tres_bind"`
TresFreq string `json:"tres_freq"`
TresPerJob string `json:"tres_per_job"`
TresPerNode string `json:"tres_per_node"`
TresPerSocket string `json:"tres_per_socket"`
TresPerTask string `json:"tres_per_task"`
TresReqStr string `json:"tres_req_str"`
TresAllocStr string `json:"tres_alloc_str"`
UserID int `json:"user_id"`
UserName string `json:"user_name"`
Wckey string `json:"wckey"`
CurrentWorkingDirectory string `json:"current_working_directory"`
}
type SlurmPayload struct {
Meta Metadata `json:"meta"`
Errors []interface{} `json:"errors"`
Jobs []Job `json:"jobs"`
}
type DumpedComment struct {
Administrator interface{} `json:"administrator"`
Job interface{} `json:"job"`
System interface{} `json:"system"`
}
type MaxLimits struct {
Running struct {
Tasks int `json:"tasks"`
} `json:"max"`
}
type ArrayInfo struct {
JobID int `json:"job_id"`
Limits MaxLimits `json:"limits"`
Task interface{} `json:"task"`
TaskID interface{} `json:"task_id"`
}
type Association struct {
Account string `json:"account"`
Cluster string `json:"cluster"`
Partition interface{} `json:"partition"`
User string `json:"user"`
}
type TimeInfo struct {
Elapsed int64 `json:"elapsed"`
Eligible int64 `json:"eligible"`
End int64 `json:"end"`
Start int64 `json:"start"`
Submission int64 `json:"submission"`
Suspended int64 `json:"suspended"`
System struct {
Seconds int `json:"seconds"`
Microseconds int `json:"microseconds"`
} `json:"system"`
Limit int `json:"limit"`
Total struct {
Seconds int `json:"seconds"`
Microseconds int `json:"microseconds"`
} `json:"total"`
User struct {
Seconds int `json:"seconds"`
Microseconds int `json:"microseconds"`
} `json:"user"`
}
type ExitCode struct {
Status string `json:"status"`
ReturnCode int `json:"return_code"`
}
type DumpedJob struct {
Account string `json:"account"`
Comment DumpedComment `json:"comment"`
AllocationNodes int `json:"allocation_nodes"`
Array ArrayInfo `json:"array"`
Association Association `json:"association"`
Cluster string `json:"cluster"`
Constraints string `json:"constraints"`
Container interface{} `json:"container"`
DerivedExitCode ExitCode `json:"derived_exit_code"`
Time TimeInfo `json:"time"`
ExitCode ExitCode `json:"exit_code"`
Flags []string `json:"flags"`
Group string `json:"group"`
Het struct {
JobID int `json:"job_id"`
JobOffset interface{} `json:"job_offset"`
} `json:"het"`
JobID int64 `json:"job_id"`
Name string `json:"name"`
MCS struct {
Label string `json:"label"`
} `json:"mcs"`
Nodes string `json:"nodes"`
Partition string `json:"partition"`
Priority int `json:"priority"`
QoS string `json:"qos"`
Required struct {
CPUs int `json:"CPUs"`
Memory int `json:"memory"`
} `json:"required"`
KillRequestUser interface{} `json:"kill_request_user"`
Reservation struct {
ID int `json:"id"`
Name int `json:"name"`
} `json:"reservation"`
State struct {
Current string `json:"current"`
Reason string `json:"reason"`
} `json:"state"`
Steps []struct {
Nodes struct {
List []string `json:"list"`
Count int `json:"count"`
Range string `json:"range"`
} `json:"nodes"`
Tres struct {
Requested struct {
Max []interface{} `json:"max"`
Min []interface{} `json:"min"`
Average []interface{} `json:"average"`
Total []interface{} `json:"total"`
} `json:"requested"`
Consumed struct {
Max []interface{} `json:"max"`
Min []interface{} `json:"min"`
Average []interface{} `json:"average"`
Total []interface{} `json:"total"`
} `json:"consumed"`
Allocated []struct {
Type string `json:"type"`
Name interface{} `json:"name"`
ID int `json:"id"`
Count int `json:"count"`
} `json:"allocated"`
} `json:"tres"`
Time TimeInfo `json:"time"`
ExitCode ExitCode `json:"exit_code"`
Tasks struct {
Count int `json:"count"`
} `json:"tasks"`
PID interface{} `json:"pid"`
CPU struct {
RequestedFrequency struct {
Min int `json:"min"`
Max int `json:"max"`
} `json:"requested_frequency"`
Governor []interface{} `json:"governor"`
} `json:"CPU"`
KillRequestUser interface{} `json:"kill_request_user"`
State string `json:"state"`
Statistics struct {
CPU struct {
ActualFrequency int `json:"actual_frequency"`
} `json:"CPU"`
Energy struct {
Consumed int `json:"consumed"`
} `json:"energy"`
} `json:"statistics"`
Step struct {
JobID int `json:"job_id"`
Het struct {
Component interface{} `json:"component"`
} `json:"het"`
ID string `json:"id"`
Name string `json:"name"`
} `json:"step"`
Task struct {
Distribution string `json:"distribution"`
} `json:"task"`
} `json:"steps"`
Tres struct {
Allocated []struct {
Type string `json:"type"`
Name interface{} `json:"name"`
ID int `json:"id"`
Count int `json:"count"`
} `json:"allocated"`
Requested []struct {
Type string `json:"type"`
Name interface{} `json:"name"`
ID int `json:"id"`
Count int `json:"count"`
} `json:"requested"`
} `json:"tres"`
User string `json:"user"`
Wckey struct {
Wckey string `json:"wckey"`
Flags []string `json:"flags"`
} `json:"wckey"`
WorkingDirectory string `json:"working_directory"`
}
type SlurmDBPayload struct {
Meta Metadata `json:"meta"`
Errors []string `json:"errors"`
Jobs []DumpedJob `json:"jobs"`
}
func DecodeClusterConfig(filename string) (ClusterConfig, error) {
var clusterConfig ClusterConfig
file, err := os.Open(filename)
if err != nil {
log.Errorf("Cluster config file not found. No cores/GPU ids available.")
return clusterConfig, err
}
defer file.Close()
decoder := json.NewDecoder(file)
err = decoder.Decode(&clusterConfig)
if err != nil {
log.Errorf("Error decoding cluster config file: %v", err)
}
log.Printf("Name: %s\n", clusterConfig.Name)
log.Printf("MetricConfig:\n")
for _, metric := range clusterConfig.MetricConfig {
log.Printf(" Name: %s\n", metric.Name)
log.Printf(" Unit Base: %s\n", metric.Unit.Base)
log.Printf(" Scope: %s\n", metric.Scope)
log.Printf(" Aggregation: %s\n", metric.Aggregation)
log.Printf(" Timestep: %d\n", metric.Timestep)
log.Printf(" Peak: %f\n", metric.Peak)
log.Printf(" Normal: %f\n", metric.Normal)
log.Printf(" Caution: %f\n", metric.Caution)
log.Printf(" Alert: %f\n", metric.Alert)
}
log.Printf("SubClusters:\n")
for _, subCluster := range clusterConfig.SubClusters {
log.Printf(" Name: %s\n", subCluster.Name)
log.Printf(" Nodes: %s\n", subCluster.Nodes)
log.Printf(" Processor Type: %s\n", subCluster.ProcessorType)
log.Printf(" Sockets Per Node: %d\n", subCluster.SocketsPerNode)
log.Printf(" Cores Per Socket: %d\n", subCluster.CoresPerSocket)
log.Printf(" Threads Per Core: %d\n", subCluster.ThreadsPerCore)
log.Printf(" Flop Rate Scalar Unit Base: %s\n", subCluster.FlopRateScalar.Unit.Base)
log.Printf(" Flop Rate Scalar Unit Prefix: %s\n", subCluster.FlopRateScalar.Unit.Prefix)
log.Printf(" Flop Rate Scalar Value: %f\n", subCluster.FlopRateScalar.Value)
log.Printf(" Flop Rate Simd Unit Base: %s\n", subCluster.FlopRateSimd.Unit.Base)
log.Printf(" Flop Rate Simd Unit Prefix: %s\n", subCluster.FlopRateSimd.Unit.Prefix)
log.Printf(" Flop Rate Simd Value: %f\n", subCluster.FlopRateSimd.Value)
log.Printf(" Memory Bandwidth Unit Base: %s\n", subCluster.MemoryBandwidth.Unit.Base)
log.Printf(" Memory Bandwidth Unit Prefix: %s\n", subCluster.MemoryBandwidth.Unit.Prefix)
log.Printf(" Memory Bandwidth Value: %f\n", subCluster.MemoryBandwidth.Value)
log.Printf(" Topology Node: %v\n", subCluster.Topology.Node)
log.Printf(" Topology Socket: %v\n", subCluster.Topology.Socket)
log.Printf(" Topology Memory Domain: %v\n", subCluster.Topology.MemoryDomain)
log.Printf(" Topology Core: %v\n", subCluster.Topology.Core)
log.Printf(" Topology Accelerators:\n")
for _, accelerator := range subCluster.Topology.Accelerators {
log.Printf(" ID: %s\n", accelerator.ID)
log.Printf(" Type: %s\n", accelerator.Type)
log.Printf(" Model: %s\n", accelerator.Model)
}
}
return clusterConfig, nil
}
func UnmarshalSlurmPayload(jsonPayload string) (SlurmPayload, error) {
var slurmData SlurmPayload
err := json.Unmarshal([]byte(jsonPayload), &slurmData)
if err != nil {
return slurmData, fmt.Errorf("failed to unmarshal JSON data: %v", err)
}
return slurmData, nil
}

View File

@@ -0,0 +1,27 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package scheduler
import "encoding/json"
type BatchScheduler interface {
Init(rawConfig json.RawMessage) error
Sync()
}
var sd BatchScheduler
func Init(rawConfig json.RawMessage) error {
sd = &SlurmNatsScheduler{}
sd.Init(rawConfig)
return nil
}
func GetHandle() BatchScheduler {
return sd
}

View File

@@ -0,0 +1,214 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package scheduler
import (
"database/sql"
"encoding/json"
"fmt"
"strings"
"sync"
"github.com/ClusterCockpit/cc-backend/internal/importer"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/nats-io/nats.go"
)
type SlurmNatsConfig struct {
URL string `json:"url"`
}
type SlurmNatsScheduler struct {
url string
RepositoryMutex sync.Mutex
JobRepository *repository.JobRepository
}
type StopJobRequest struct {
// Stop Time of job as epoch
StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"`
State schema.JobState `json:"jobState" validate:"required" example:"completed"` // Final job state
JobId *int64 `json:"jobId" example:"123000"` // Cluster Job ID of job
Cluster *string `json:"cluster" example:"fritz"` // Cluster of job
StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch
}
func (sd *SlurmNatsScheduler) startJob(req *schema.JobMeta) {
fmt.Printf("DEBUG: %+v\n", *req)
log.Printf("Server Name: %s - Job ID: %v", req.BaseJob.Cluster, req.BaseJob.JobID)
log.Printf("User: %s - Project: %s", req.BaseJob.User, req.BaseJob.Project)
if req.State == "" {
req.State = schema.JobStateRunning
}
if err := importer.SanityChecks(&req.BaseJob); err != nil {
log.Errorf("Sanity checks failed: %s", err.Error())
return
}
// aquire lock to avoid race condition between API calls
var unlockOnce sync.Once
sd.RepositoryMutex.Lock()
defer unlockOnce.Do(sd.RepositoryMutex.Unlock)
// Check if combination of (job_id, cluster_id, start_time) already exists:
jobs, err := sd.JobRepository.FindAll(&req.JobID, &req.Cluster, nil)
if err != nil && err != sql.ErrNoRows {
log.Errorf("checking for duplicate failed: %s", err.Error())
return
} else if err == nil {
for _, job := range jobs {
if (req.StartTime - job.StartTimeUnix) < 86400 {
log.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID)
return
}
}
}
id, err := sd.JobRepository.Start(req)
if err != nil {
log.Errorf("insert into database failed: %s", err.Error())
return
}
// unlock here, adding Tags can be async
unlockOnce.Do(sd.RepositoryMutex.Unlock)
for _, tag := range req.Tags {
if _, err := sd.JobRepository.AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
log.Errorf("adding tag to new job %d failed: %s", id, err.Error())
return
}
}
log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime)
}
func (sd *SlurmNatsScheduler) checkAndHandleStopJob(job *schema.Job, req *StopJobRequest) {
// Sanity checks
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
log.Errorf("stopTime must be larger than startTime and only running jobs can be stopped")
return
}
if req.State != "" && !req.State.Valid() {
log.Errorf("invalid job state: %#v", req.State)
return
} else if req.State == "" {
req.State = schema.JobStateCompleted
}
// Mark job as stopped in the database (update state and duration)
job.Duration = int32(req.StopTime - job.StartTime.Unix())
job.State = req.State
if err := sd.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
log.Errorf("marking job as stopped failed: %s", err.Error())
return
}
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime)
// Monitoring is disabled...
if job.MonitoringStatus == schema.MonitoringStatusDisabled {
return
}
// Trigger async archiving
sd.JobRepository.TriggerArchiving(job)
}
func (sd *SlurmNatsScheduler) stopJob(req *StopJobRequest) {
// if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
// log.Errorf("missing role: %v", auth.GetRoleString(auth.RoleApi))
// return
// }
log.Printf("Server Name: %s - Job ID: %v", *req.Cluster, req.JobId)
// Fetch job (that will be stopped) from db
var job *schema.Job
var err error
if req.JobId == nil {
log.Errorf("the field 'jobId' is required")
return
}
job, err = sd.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
if err != nil {
log.Errorf("finding job failed: %s", err.Error())
return
}
sd.checkAndHandleStopJob(job, req)
}
func (sd *SlurmNatsScheduler) Init(rawConfig json.RawMessage) error {
servers := []string{"nats://127.0.0.1:4222", "nats://127.0.0.1:1223"}
nc, err := nats.Connect(strings.Join(servers, ","))
if err != nil {
log.Fatal(err.Error())
}
defer nc.Close()
getStatusTxt := func(nc *nats.Conn) string {
switch nc.Status() {
case nats.CONNECTED:
return "Connected"
case nats.CLOSED:
return "Closed"
default:
return "Other"
}
}
log.Printf("The connection status is %v\n", getStatusTxt(nc))
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
log.Fatal(err.Error())
}
defer ec.Close()
// Define the object
type encodedMessage struct {
ServerName string
ResponseCode int
}
wg := sync.WaitGroup{}
wg.Add(1)
// Subscribe
if _, err := ec.Subscribe("test", func(s *encodedMessage) {
log.Printf("Server Name: %s - Response Code: %v", s.ServerName, s.ResponseCode)
if s.ResponseCode == 500 {
wg.Done()
}
}); err != nil {
log.Fatal(err.Error())
}
if _, err := ec.Subscribe("startJob", sd.startJob); err != nil {
log.Fatal(err.Error())
}
if _, err := ec.Subscribe("stopJob", sd.stopJob); err != nil {
log.Fatal(err.Error())
}
// Wait for a message to come in
wg.Wait()
return nil
}
func (sd *SlurmNatsScheduler) Sync() {
}

View File

@@ -0,0 +1,565 @@
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package scheduler
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
type SlurmRestSchedulerConfig struct {
URL string `json:"url"`
JobRepository *repository.JobRepository
clusterConfig ClusterConfig
client *http.Client
nodeClusterMap map[string]string
clusterPCIEAddressMap map[string][]string
}
func (cfg *SlurmRestSchedulerConfig) queryDB(qtime int64, clusterName string) ([]interface{}, error) {
apiEndpoint := "/slurmdb/v0.0.38/jobs"
// Construct the query parameters
queryParams := url.Values{}
queryParams.Set("users", "user1,user2")
queryParams.Set("submit_time", "2023-01-01T00:00:00")
// Add the query parameters to the API endpoint
apiEndpoint += "?" + queryParams.Encode()
// Create a new HTTP GET request
req, err := http.NewRequest("GET", apiEndpoint, nil)
if err != nil {
log.Errorf("Error creating request: %v", err)
}
// Send the request
resp, err := cfg.client.Do(req)
if err != nil {
log.Errorf("Error sending request: %v", err)
}
defer resp.Body.Close()
// Check the response status code
if resp.StatusCode != http.StatusOK {
log.Errorf("API request failed with status: %v", resp.Status)
}
// Read the response body
// Here you can parse the response body as needed
// For simplicity, let's just print the response body
var dbOutput []byte
_, err = resp.Body.Read(dbOutput)
if err != nil {
log.Errorf("Error reading response body: %v", err)
}
log.Errorf("API response: %v", string(dbOutput))
dataJobs := make(map[string]interface{})
err = json.Unmarshal(dbOutput, &dataJobs)
if err != nil {
log.Errorf("Error parsing JSON response: %v", err)
os.Exit(1)
}
if _, ok := dataJobs["jobs"]; !ok {
log.Errorf("ERROR: jobs not found - response incomplete")
os.Exit(1)
}
jobs, _ := dataJobs["jobs"].([]interface{})
return jobs, nil
}
func (cfg *SlurmRestSchedulerConfig) fetchJobs() (SlurmPayload, error) {
var ctlOutput []byte
apiEndpoint := "http://:8080/slurm/v0.0.38/jobs"
// Create a new HTTP GET request with query parameters
req, err := http.NewRequest("GET", apiEndpoint, nil)
if err != nil {
log.Errorf("Error creating request: %v", err)
}
// Send the request
resp, err := cfg.client.Do(req)
if err != nil {
log.Errorf("Error sending request: %v", err)
}
defer resp.Body.Close()
// Check the response status code
if resp.StatusCode != http.StatusOK {
log.Errorf("API request failed with status: %v", resp.Status)
}
_, err = resp.Body.Read(ctlOutput)
log.Printf("Received JSON Data: %v", ctlOutput)
if err != nil {
log.Errorf("Error reading response body: %v", err)
}
var jobsResponse SlurmPayload
err = json.Unmarshal(ctlOutput, &jobsResponse)
if err != nil {
log.Errorf("Error parsing JSON response: %v", err)
return jobsResponse, err
}
return jobsResponse, nil
}
func fetchJobsLocal() (SlurmPayload, error) {
// Read the Slurm Payload JSON file
jobsData, err := os.ReadFile("slurm_0038.json")
if err != nil {
fmt.Println("Error reading Slurm Payload JSON file:", err)
}
var jobsResponse SlurmPayload
err = json.Unmarshal(jobsData, &jobsResponse)
if err != nil {
log.Errorf("Error parsing Slurm Payload JSON response: %v", err)
return jobsResponse, err
}
return jobsResponse, nil
}
func fetchDumpedJobsLocal() (SlurmDBPayload, error) {
// Read the SlurmDB Payload JSON file
jobsData, err := os.ReadFile("slurmdb_0038-large.json")
if err != nil {
fmt.Println("Error reading SlurmDB Payload JSON file:", err)
}
var jobsResponse SlurmDBPayload
err = json.Unmarshal(jobsData, &jobsResponse)
if err != nil {
log.Errorf("Error parsing SlurmDB Payload JSON response: %v", err)
return jobsResponse, err
}
return jobsResponse, nil
}
func printSlurmInfo(job Job) string {
text := fmt.Sprintf(`
JobId=%v JobName=%v
UserId=%v(%v) GroupId=%v
Account=%v QOS=%v
Requeue=%v Restarts=%v BatchFlag=%v
TimeLimit=%v
SubmitTime=%v
Partition=%v
NodeList=%v
NumNodes=%v NumCPUs=%v NumTasks=%v CPUs/Task=%v
NTasksPerNode:Socket:Core=%v:%v:%v
TRES_req=%v
TRES_alloc=%v
Command=%v
WorkDir=%v
StdErr=%v
StdOut=%v`,
job.JobID, job.Name,
job.UserName, job.UserID, job.GroupID,
job.Account, job.QoS,
job.Requeue, job.RestartCnt, job.BatchFlag,
job.TimeLimit, job.SubmitTime,
job.Partition,
job.Nodes,
job.NodeCount, job.CPUs, job.Tasks, job.CPUPerTask,
job.TasksPerBoard, job.TasksPerSocket, job.TasksPerCore,
job.TresAllocStr,
job.TresAllocStr,
job.Command,
job.CurrentWorkingDirectory,
job.StandardError,
job.StandardOutput,
)
return text
}
func exitWithError(err error, output []byte) {
if exitError, ok := err.(*exec.ExitError); ok {
if exitError.ExitCode() == 28 {
fmt.Fprintf(os.Stderr, "ERROR: API call failed with timeout; check slurmrestd.\nOutput:\n%s\n", output)
} else {
fmt.Fprintf(os.Stderr, "ERROR: API call failed with code %d;\nOutput:\n%s\n", exitError.ExitCode(), output)
}
} else {
log.Errorf("ERROR: %v", err)
}
os.Exit(1)
}
func (cfg *SlurmRestSchedulerConfig) Init() error {
var err error
cfg.clusterConfig, err = DecodeClusterConfig("cluster-alex.json")
cfg.nodeClusterMap = make(map[string]string)
cfg.clusterPCIEAddressMap = make(map[string][]string)
for _, subCluster := range cfg.clusterConfig.SubClusters {
cfg.ConstructNodeClusterMap(subCluster.Nodes, subCluster.Name)
pcieAddresses := make([]string, 0, 32)
for idx, accelerator := range subCluster.Topology.Accelerators {
pcieAddresses[idx] = accelerator.ID
}
cfg.clusterPCIEAddressMap[subCluster.Name] = pcieAddresses
}
// Create an HTTP client
cfg.client = &http.Client{}
return err
}
func (cfg *SlurmRestSchedulerConfig) checkAndHandleStopJob(job *schema.Job, req *StopJobRequest) {
// Sanity checks
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
log.Errorf("stopTime must be larger than startTime and only running jobs can be stopped")
return
}
if req.State != "" && !req.State.Valid() {
log.Errorf("invalid job state: %#v", req.State)
return
} else if req.State == "" {
req.State = schema.JobStateCompleted
}
// Mark job as stopped in the database (update state and duration)
job.Duration = int32(req.StopTime - job.StartTime.Unix())
job.State = req.State
if err := cfg.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
log.Errorf("marking job as stopped failed: %s", err.Error())
return
}
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime)
// Monitoring is disabled...
if job.MonitoringStatus == schema.MonitoringStatusDisabled {
return
}
// Trigger async archiving
cfg.JobRepository.TriggerArchiving(job)
}
func (cfg *SlurmRestSchedulerConfig) ConstructNodeClusterMap(nodes string, cluster string) {
if cfg.nodeClusterMap == nil {
cfg.nodeClusterMap = make(map[string]string)
}
// Split the input by commas
groups := strings.Split(nodes, ",")
for _, group := range groups {
// Use regular expressions to match numbers and ranges
numberRangeRegex := regexp.MustCompile(`a\[(\d+)-(\d+)\]`)
numberRegex := regexp.MustCompile(`a(\d+)`)
if numberRangeRegex.MatchString(group) {
// Extract nodes from ranges
matches := numberRangeRegex.FindStringSubmatch(group)
if len(matches) == 3 {
start, _ := strconv.Atoi(matches[1])
end, _ := strconv.Atoi(matches[2])
for i := start; i <= end; i++ {
cfg.nodeClusterMap[matches[0]+fmt.Sprintf("%04d", i)] = cluster
}
}
} else if numberRegex.MatchString(group) {
// Extract individual node
matches := numberRegex.FindStringSubmatch(group)
if len(matches) == 2 {
cfg.nodeClusterMap[group] = cluster
}
}
}
}
func extractElements(indicesStr string, addresses []string) ([]string, error) {
// Split the input string by commas to get individual index ranges
indexRanges := strings.Split(indicesStr, ",")
var selected []string
for _, indexRange := range indexRanges {
// Split each index range by hyphen to separate start and end indices
rangeParts := strings.Split(indexRange, "-")
if len(rangeParts) == 1 {
// If there's only one part, it's a single index
index, err := strconv.Atoi(rangeParts[0])
if err != nil {
return nil, err
}
selected = append(selected, addresses[index])
} else if len(rangeParts) == 2 {
// If there are two parts, it's a range
start, err := strconv.Atoi(rangeParts[0])
if err != nil {
return nil, err
}
end, err := strconv.Atoi(rangeParts[1])
if err != nil {
return nil, err
}
// Add all indices in the range to the result
for i := start; i <= end; i++ {
selected = append(selected, addresses[i])
}
} else {
// Invalid format
return nil, fmt.Errorf("invalid index range: %s", indexRange)
}
}
return selected, nil
}
func (cfg *SlurmRestSchedulerConfig) CreateJobMeta(job Job) (*schema.JobMeta, error) {
var exclusive int32
if job.Shared == nil {
exclusive = 1
} else {
exclusive = 0
}
totalGPUs := 0
var resources []*schema.Resource
for nodeIndex, node := range job.JobResources.AllocatedNodes {
var res schema.Resource
res.Hostname = node.Nodename
log.Debugf("Node %s Cores map size: %d\n", node.Nodename, len(node.Sockets))
if node.CPUsUsed == nil || node.MemoryAllocated == nil {
log.Fatalf("Either node.Cpus or node.Memory is nil\n")
}
for k, v := range node.Sockets {
fmt.Printf("core id[%s] value[%s]\n", k, v)
threadID, _ := strconv.Atoi(k)
res.HWThreads = append(res.HWThreads, threadID)
}
re := regexp.MustCompile(`\(([^)]*)\)`)
matches := re.FindStringSubmatch(job.GresDetail[nodeIndex])
if len(matches) < 2 {
return nil, fmt.Errorf("no substring found in brackets")
}
nodePCIEAddresses := cfg.clusterPCIEAddressMap[cfg.nodeClusterMap[node.Nodename]]
selectedPCIEAddresses, err := extractElements(matches[1], nodePCIEAddresses)
totalGPUs += len(selectedPCIEAddresses)
if err != nil {
return nil, err
}
// For core/GPU id mapping, need to query from cluster config file
res.Accelerators = selectedPCIEAddresses
resources = append(resources, &res)
}
metaData := make(map[string]string)
metaData["jobName"] = job.Name
metaData["slurmInfo"] = printSlurmInfo(job)
metaDataInBytes, err := json.Marshal(metaData)
if err != nil {
log.Fatalf("metaData JSON marshaling failed: %s", err)
}
var defaultJob schema.BaseJob = schema.BaseJob{
JobID: job.JobID,
User: job.UserName,
Project: job.Account,
Cluster: job.Cluster,
Partition: job.Partition,
// check nil
ArrayJobId: job.ArrayJobID,
NumNodes: job.NodeCount,
NumHWThreads: job.CPUs,
NumAcc: int32(totalGPUs),
Exclusive: exclusive,
// MonitoringStatus: job.MonitoringStatus,
// SMT: job.TasksPerCore,
State: schema.JobState(job.JobState),
// ignore this for start job
// Duration: int32(time.Now().Unix() - job.StartTime), // or SubmitTime?
Walltime: time.Now().Unix(), // max duration requested by the job
// Tags: job.Tags,
// ignore this!
// RawResources: jobResourcesInBytes,
// "job_resources": "allocated_nodes" "sockets":
// very important; has to be right
Resources: resources,
RawMetaData: metaDataInBytes,
// optional metadata with'jobScript 'jobName': 'slurmInfo':
MetaData: metaData,
// ConcurrentJobs: job.ConcurrentJobs,
}
log.Debugf("Generated BaseJob with Resources=%v", defaultJob.Resources[0])
meta := &schema.JobMeta{
BaseJob: defaultJob,
StartTime: job.StartTime,
Statistics: make(map[string]schema.JobStatistics),
}
// log.Debugf("Generated JobMeta %v", req.BaseJob.JobID)
return meta, nil
}
func (cfg *SlurmRestSchedulerConfig) HandleJobs(jobs []Job) error {
// runningJobsInCC, err := cfg.JobRepository.FindRunningJobs("alex")
// Iterate over the Jobs slice
for _, job := range jobs {
// Process each job from Slurm
fmt.Printf("Job ID: %d\n", job.JobID)
fmt.Printf("Job Name: %s\n", job.Name)
fmt.Printf("Job State: %s\n", job.JobState)
fmt.Println("Job StartTime:", job.StartTime)
fmt.Println("Job Cluster:", job.Cluster)
if job.JobState == "RUNNING" {
meta, _ := cfg.CreateJobMeta(job)
// For all running jobs from Slurm
_, notFoundError := cfg.JobRepository.Find(&job.JobID, &job.Cluster, &job.StartTime)
if notFoundError != nil {
// if it does not exist in CC, create a new entry
log.Print("Job does not exist in CC, will create a new entry:", job.JobID)
id, startJobError := cfg.JobRepository.Start(meta)
if startJobError != nil {
return startJobError
}
log.Debug("Added job", id)
}
// Running in both sides: nothing needs to be done
} else if job.JobState == "COMPLETED" {
// Check if completed job with combination of (job_id, cluster_id, start_time) already exists
log.Debugf("Processing completed job ID: %v Cluster: %v StartTime: %v", job.JobID, job.Cluster, job.StartTime)
existingJob, err := cfg.JobRepository.Find(&job.JobID, &job.Cluster, &job.StartTime)
if err == nil && existingJob.State != schema.JobStateCompleted {
// for jobs completed in Slurm (either in Slurm or maybe SlurmDB)
// update job in CC with new info (job final status, duration, end timestamp)
existingJob.BaseJob.Duration = int32(job.EndTime - job.StartTime)
existingJob.BaseJob.State = schema.JobState(job.JobState)
existingJob.BaseJob.Walltime = job.EndTime
req := &StopJobRequest{
Cluster: &job.Cluster,
JobId: &job.JobID,
State: schema.JobState(job.JobState),
StartTime: &job.StartTime,
StopTime: job.EndTime,
}
cfg.checkAndHandleStopJob(existingJob, req)
}
}
}
return nil
}
func (cfg *SlurmRestSchedulerConfig) HandleDumpedJobs(jobs []DumpedJob) error {
// Iterate over the Jobs slice
for _, job := range jobs {
// Process each job from Slurm
fmt.Printf("Job ID: %d\n", job.JobID)
fmt.Printf("Job Name: %s\n", job.Name)
fmt.Printf("Job State: %s\n", job.State.Current)
fmt.Println("Job EndTime:", job.Time.End)
fmt.Println("Job Cluster:", job.Cluster)
// Check if completed job with combination of (job_id, cluster_id, start_time) already exists
log.Debugf("Processing completed dumped job ID: %v Cluster: %v StartTime: %v", job.JobID, job.Cluster, job.Time.Start)
existingJob, err := cfg.JobRepository.Find(&job.JobID, &job.Cluster, &job.Time.Start)
if err == nil && existingJob.State != schema.JobStateCompleted {
// for jobs completed in Slurm (either in Slurm or maybe SlurmDB)
// update job in CC with new info (job final status, duration, end timestamp)
existingJob.BaseJob.Duration = int32(job.Time.End - job.Time.Start)
existingJob.BaseJob.State = schema.JobState(job.State.Current)
existingJob.BaseJob.Walltime = job.Time.End
req := &StopJobRequest{
Cluster: &job.Cluster,
JobId: &job.JobID,
State: schema.JobState(job.State.Current),
StartTime: &job.Time.Start,
StopTime: job.Time.End,
}
cfg.checkAndHandleStopJob(existingJob, req)
}
}
return nil
}
func (cfg *SlurmRestSchedulerConfig) Sync() {
// Fetch an instance of Slurm JobsResponse
jobsResponse, err := fetchJobsLocal()
if err != nil {
log.Fatal(err.Error())
}
cfg.HandleJobs(jobsResponse.Jobs)
// Fetch an instance of Slurm DB JobsResponse
dumpedJobsResponse, err := fetchDumpedJobsLocal()
if err != nil {
log.Fatal(err.Error())
}
cfg.HandleDumpedJobs(dumpedJobsResponse.Jobs)
}

137
tools/nats-manager/main.go Normal file
View File

@@ -0,0 +1,137 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package main
import (
"flag"
"fmt"
"log"
"net/http"
"os"
"github.com/ClusterCockpit/cc-backend/internal/scheduler"
"github.com/nats-io/nats.go"
)
func usage() {
log.Printf("Usage: nats-pub [-s server] [-creds file] <subject> <msg>\n")
flag.PrintDefaults()
}
func showUsageAndExit(exitcode int) {
usage()
os.Exit(exitcode)
}
func setupPublisher() {
var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")
var userCreds = flag.String("creds", "", "User Credentials File")
var showHelp = flag.Bool("h", false, "Show help message")
log.SetFlags(0)
flag.Usage = usage
flag.Parse()
if *showHelp {
showUsageAndExit(0)
}
args := flag.Args()
if len(args) != 2 {
showUsageAndExit(1)
}
fmt.Printf("Hello Nats\n")
// Connect Options.
opts := []nats.Option{nats.Name("NATS Sample Publisher")}
// Use UserCredentials
if *userCreds != "" {
opts = append(opts, nats.UserCredentials(*userCreds))
}
// Connect to NATS
nc, err := nats.Connect(*urls, opts...)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
subj, msg := args[0], []byte(args[1])
nc.Publish(subj, msg)
nc.Flush()
if err := nc.LastError(); err != nil {
log.Fatal(err)
} else {
log.Printf("Published [%s] : '%s'\n", subj, msg)
}
os.Exit(0)
}
func injectPayload() {
// Read the JSON file
jobsData, err := os.ReadFile("slurm_0038.json")
dbData, err := os.ReadFile("slurmdb_0038-large.json")
if err != nil {
fmt.Println("Error reading JSON file:", err)
return
}
// Create an HTTP handler function
http.HandleFunc("/slurm/v0.0.38/jobs", func(w http.ResponseWriter, r *http.Request) {
// Set the response content type to JSON
w.Header().Set("Content-Type", "application/json")
// Write the raw JSON data to the response writer
_, err := w.Write(jobsData)
if err != nil {
http.Error(w, "Error writing jobsData payload", http.StatusInternalServerError)
return
}
})
http.HandleFunc("/slurmdb/v0.0.38/jobs", func(w http.ResponseWriter, r *http.Request) {
// Set the response content type to JSON
w.Header().Set("Content-Type", "application/json")
// Write the raw JSON data to the response writer
_, err := w.Write(dbData)
if err != nil {
http.Error(w, "Error writing dbData payload", http.StatusInternalServerError)
return
}
})
// Start the HTTP server on port 8080
fmt.Println("Listening on :8080...")
go http.ListenAndServe(":8080", nil)
}
func loadSlurmNatsScheduler() {
cfgData := []byte(`{"target": "localhost"}`)
var sch scheduler.SlurmNatsScheduler
// sch.URL = "nats://127.0.0.1:1223"
sch.Init(cfgData)
// go injectPayload()
}
func main() {
var sch scheduler.SlurmRestSchedulerConfig
sch.Init()
// injectPayload()
sch.Sync()
os.Exit(0)
}