mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-07-30 16:16:07 +02:00
Compare commits
19 Commits
v1.4.4
...
135-batch-
Author | SHA1 | Date | |
---|---|---|---|
|
acabdb6a61 | ||
|
6ac1182c06 | ||
|
9fe497173d | ||
|
a3fbdbcf90 | ||
|
59f6658344 | ||
|
36b0f33208 | ||
|
41cd1171fb | ||
4c16e44507 | |||
8377e51608 | |||
|
e949f34e94 | ||
|
feeb0231e7 | ||
d4f09b4679 | |||
03175681b6 | |||
|
a43d03457b | ||
125f3cd254 | |||
|
d9e7a48e2f | ||
|
c0f9eb7869 | ||
|
7aa7750177 | ||
e5f7ad8ac0 |
18
go.mod
18
go.mod
@@ -18,6 +18,7 @@ require (
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.12.2
|
||||
github.com/jmoiron/sqlx v1.3.5
|
||||
github.com/mattn/go-sqlite3 v1.14.16
|
||||
github.com/nats-io/nats.go v1.28.0
|
||||
github.com/prometheus/client_golang v1.14.0
|
||||
github.com/prometheus/common v0.40.0
|
||||
github.com/qustavo/sqlhooks/v2 v2.1.0
|
||||
@@ -25,12 +26,14 @@ require (
|
||||
github.com/swaggo/http-swagger v1.3.3
|
||||
github.com/swaggo/swag v1.16.1
|
||||
github.com/vektah/gqlparser/v2 v2.5.8
|
||||
golang.org/x/crypto v0.12.0
|
||||
golang.org/x/crypto v0.13.0
|
||||
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
|
||||
github.com/ClusterCockpit/slurm-rest-client-0_0_37 v0.0.0-20230901125459-dc653ac37420 // indirect
|
||||
github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20231010134848-707e8b20bde7 // indirect
|
||||
github.com/KyleBanks/depth v1.2.1 // indirect
|
||||
github.com/agnivade/levenshtein v1.1.1 // indirect
|
||||
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
|
||||
@@ -38,6 +41,7 @@ require (
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/containerd/containerd v1.6.18 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/deepmap/oapi-codegen v1.12.4 // indirect
|
||||
github.com/felixge/httpsnoop v1.0.3 // indirect
|
||||
github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect
|
||||
@@ -56,6 +60,7 @@ require (
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/jpillora/backoff v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/compress v1.16.7 // indirect
|
||||
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
|
||||
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
@@ -64,21 +69,26 @@ require (
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
|
||||
github.com/nats-io/nats-server/v2 v2.9.21 // indirect
|
||||
github.com/nats-io/nkeys v0.4.4 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/prometheus/client_model v0.3.0 // indirect
|
||||
github.com/prometheus/procfs v0.9.0 // indirect
|
||||
github.com/robfig/cron/v3 v3.0.1 // indirect
|
||||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||
github.com/stretchr/testify v1.8.4 // indirect
|
||||
github.com/swaggo/files v1.0.0 // indirect
|
||||
github.com/urfave/cli/v2 v2.25.7 // indirect
|
||||
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
|
||||
go.uber.org/atomic v1.10.0 // indirect
|
||||
golang.org/x/mod v0.12.0 // indirect
|
||||
golang.org/x/net v0.14.0 // indirect
|
||||
golang.org/x/net v0.15.0 // indirect
|
||||
golang.org/x/oauth2 v0.5.0 // indirect
|
||||
golang.org/x/sys v0.11.0 // indirect
|
||||
golang.org/x/text v0.12.0 // indirect
|
||||
golang.org/x/sys v0.12.0 // indirect
|
||||
golang.org/x/text v0.13.0 // indirect
|
||||
golang.org/x/tools v0.12.0 // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
google.golang.org/protobuf v1.30.0 // indirect
|
||||
|
31
go.sum
31
go.sum
@@ -82,6 +82,12 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
|
||||
github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
|
||||
github.com/ClusterCockpit/cc-units v0.4.0 h1:zP5DOu99GmErW0tCDf0gcLrlWt42RQ9dpoONEOh4cI0=
|
||||
github.com/ClusterCockpit/cc-units v0.4.0/go.mod h1:3S3PAhAayS3pbgcT4q9Vn9VJw22Op51X0YimtG77zBw=
|
||||
github.com/ClusterCockpit/slurm-rest-client-0_0_37 v0.0.0-20230901125459-dc653ac37420 h1:AeQY40KrLQmSSyyHdbNdAqgln+0+p1dLag5yspE5M8A=
|
||||
github.com/ClusterCockpit/slurm-rest-client-0_0_37 v0.0.0-20230901125459-dc653ac37420/go.mod h1:oNgVG2puNj9cNw/KgqLbgE1pPOn8jXORX3ErP58LcAA=
|
||||
github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20230906120742-0f15562ea666 h1:8PofHcOwEMmeAFqJjvAEgnu7rbRHAwJhd2XJ9u/YxiU=
|
||||
github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20230906120742-0f15562ea666/go.mod h1:vxaj1my0GNoCXx4bYyOTA/IZP/IOZImtdOIn4T7pCa4=
|
||||
github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20231010134848-707e8b20bde7 h1:YY/qDtFsp1DOJw/jyobiIBiIh1/yD2IVOdcK7EVEIKs=
|
||||
github.com/ClusterCockpit/slurm-rest-client-0_0_38 v0.0.0-20231010134848-707e8b20bde7/go.mod h1:vxaj1my0GNoCXx4bYyOTA/IZP/IOZImtdOIn4T7pCa4=
|
||||
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
|
||||
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
|
||||
github.com/Masterminds/squirrel v1.5.3 h1:YPpoceAcxuzIljlr5iWpNKaql7hLeG1KLSrhvdHpkZc=
|
||||
@@ -461,6 +467,7 @@ github.com/go-ldap/ldap/v3 v3.4.4/go.mod h1:fe1MsuN5eJJ1FeLT/LEBVdWfNWKh459R7aXg
|
||||
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
|
||||
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
|
||||
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
|
||||
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
|
||||
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
|
||||
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
|
||||
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
|
||||
@@ -593,6 +600,7 @@ github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8l
|
||||
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
|
||||
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
||||
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
||||
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
|
||||
@@ -816,6 +824,8 @@ github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdY
|
||||
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
||||
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
|
||||
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
|
||||
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
@@ -887,6 +897,7 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr
|
||||
github.com/maxbrunsfeld/counterfeiter/v6 v6.2.2/go.mod h1:eD9eIE7cdwcMi9rYluz88Jz2VyhSmden33/aXg4oVIY=
|
||||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||
github.com/miekg/pkcs11 v1.0.3/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs=
|
||||
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
|
||||
github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4=
|
||||
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
|
||||
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
||||
@@ -931,6 +942,15 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
|
||||
github.com/nakagami/firebirdsql v0.0.0-20190310045651-3c02a58cfed8/go.mod h1:86wM1zFnC6/uDBfZGNwB65O+pR2OFi5q/YQaEUid1qA=
|
||||
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
|
||||
github.com/nats-io/nats-server/v2 v2.9.21 h1:2TBTh0UDE74eNXQmV4HofsmRSCiVN0TH2Wgrp6BD6fk=
|
||||
github.com/nats-io/nats-server/v2 v2.9.21/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU=
|
||||
github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c=
|
||||
github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
|
||||
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
|
||||
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
|
||||
github.com/neo4j/neo4j-go-driver v1.8.1-0.20200803113522-b626aa943eba/go.mod h1:ncO5VaFWh0Nrt+4KT4mOZboaczBZcLuHrG+/sUeP8gI=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
|
||||
@@ -1144,6 +1164,7 @@ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1F
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
|
||||
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
|
||||
github.com/swaggo/files v1.0.0 h1:1gGXVIeUFCS/dta17rnP0iOpr6CXFwKD7EO5ID233e4=
|
||||
github.com/swaggo/files v1.0.0/go.mod h1:N59U6URJLyU1PQgFqPM7wXLMhJx7QAolnvfQkqO13kc=
|
||||
@@ -1167,6 +1188,7 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX
|
||||
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
|
||||
github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
|
||||
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
|
||||
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
||||
github.com/vektah/gqlparser/v2 v2.5.8 h1:pm6WOnGdzFOCfcQo9L3+xzW51mKrlwTEg4Wr7AH1JW4=
|
||||
github.com/vektah/gqlparser/v2 v2.5.8/go.mod h1:z8xXUff237NntSuH8mLFijZ+1tjV1swDbpDqjJmk6ME=
|
||||
github.com/vishvananda/netlink v0.0.0-20181108222139-023a6dafdcdf/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk=
|
||||
@@ -1286,6 +1308,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
|
||||
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
|
||||
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
|
||||
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
|
||||
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
|
||||
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
@@ -1409,6 +1433,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
|
||||
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
|
||||
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
|
||||
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
|
||||
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
|
||||
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
|
||||
golang.org/x/oauth2 v0.0.0-20180227000427-d7d64896b5ff/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
@@ -1568,6 +1594,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
|
||||
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
|
||||
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
@@ -1586,6 +1614,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
|
||||
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
|
||||
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
|
||||
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
@@ -1595,6 +1625,7 @@ golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxb
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
|
||||
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
|
@@ -290,6 +290,30 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {
|
||||
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
||||
}
|
||||
|
||||
func (r *JobRepository) FindRunningJobs(cluster string) (map[int64]*schema.Job, error) {
|
||||
jobs := map[int64]*schema.Job{}
|
||||
q := sq.Select(jobColumns...).From("job").Where("job.cluster = ?", cluster)
|
||||
q = q.Where("job.job_state = ?", "running")
|
||||
|
||||
rows, err := q.RunWith(r.stmtCache).Query()
|
||||
if err != nil {
|
||||
log.Errorf("Error while querying running jobs: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
job, err := scanJob(rows)
|
||||
if err != nil {
|
||||
rows.Close()
|
||||
log.Warn("Error while scanning rows (Jobs)")
|
||||
return nil, err
|
||||
}
|
||||
jobs[job.JobID] = job
|
||||
}
|
||||
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
func (r *JobRepository) FindConcurrentJobs(
|
||||
ctx context.Context,
|
||||
job *schema.Job) (*model.JobLinkResultList, error) {
|
||||
|
479
internal/scheduler/payloadConverter.go
Normal file
479
internal/scheduler/payloadConverter.go
Normal file
@@ -0,0 +1,479 @@
|
||||
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
)
|
||||
|
||||
type MetricConfig struct {
|
||||
Name string `json:"name"`
|
||||
Unit struct {
|
||||
Base string `json:"base"`
|
||||
} `json:"unit"`
|
||||
Scope string `json:"scope"`
|
||||
Aggregation string `json:"aggregation"`
|
||||
Timestep int `json:"timestep"`
|
||||
Peak float64 `json:"peak"`
|
||||
Normal float64 `json:"normal"`
|
||||
Caution float64 `json:"caution"`
|
||||
Alert float64 `json:"alert"`
|
||||
}
|
||||
type SubCluster struct {
|
||||
Name string `json:"name"`
|
||||
Nodes string `json:"nodes"`
|
||||
ProcessorType string `json:"processorType"`
|
||||
SocketsPerNode int `json:"socketsPerNode"`
|
||||
CoresPerSocket int `json:"coresPerSocket"`
|
||||
ThreadsPerCore int `json:"threadsPerCore"`
|
||||
FlopRateScalar struct {
|
||||
Unit struct {
|
||||
Base string `json:"base"`
|
||||
Prefix string `json:"prefix"`
|
||||
} `json:"unit"`
|
||||
Value float64 `json:"value"`
|
||||
} `json:"flopRateScalar"`
|
||||
FlopRateSimd struct {
|
||||
Unit struct {
|
||||
Base string `json:"base"`
|
||||
Prefix string `json:"prefix"`
|
||||
} `json:"unit"`
|
||||
Value float64 `json:"value"`
|
||||
} `json:"flopRateSimd"`
|
||||
MemoryBandwidth struct {
|
||||
Unit struct {
|
||||
Base string `json:"base"`
|
||||
Prefix string `json:"prefix"`
|
||||
} `json:"unit"`
|
||||
Value float64 `json:"value"`
|
||||
} `json:"memoryBandwidth"`
|
||||
Topology struct {
|
||||
Node []int `json:"node"`
|
||||
Socket [][]int `json:"socket"`
|
||||
MemoryDomain [][]int `json:"memoryDomain"`
|
||||
Core [][]int `json:"core"`
|
||||
Accelerators []struct {
|
||||
ID string `json:"id"`
|
||||
Type string `json:"type"`
|
||||
Model string `json:"model"`
|
||||
} `json:"accelerators"`
|
||||
} `json:"topology"`
|
||||
}
|
||||
|
||||
type ClusterConfig struct {
|
||||
Name string `json:"name"`
|
||||
MetricConfig []MetricConfig `json:"metricConfig"`
|
||||
SubClusters []SubCluster `json:"subClusters"`
|
||||
}
|
||||
|
||||
type Metadata struct {
|
||||
Plugin struct {
|
||||
Type string `json:"type"`
|
||||
Name string `json:"name"`
|
||||
} `json:"plugin"`
|
||||
Slurm struct {
|
||||
Version struct {
|
||||
Major int `json:"major"`
|
||||
Micro int `json:"micro"`
|
||||
Minor int `json:"minor"`
|
||||
} `json:"version"`
|
||||
Release string `json:"release"`
|
||||
} `json:"Slurm"`
|
||||
}
|
||||
|
||||
type JobResource struct {
|
||||
Nodes string `json:"nodes"`
|
||||
AllocatedCores int `json:"allocated_cores"`
|
||||
AllocatedHosts int `json:"allocated_hosts"`
|
||||
AllocatedNodes []AllocatedNode `json:"allocated_nodes"`
|
||||
}
|
||||
|
||||
type AllocatedNode struct {
|
||||
Sockets map[string]Socket `json:"sockets"`
|
||||
Nodename string `json:"nodename"`
|
||||
CPUsUsed *int `json:"cpus_used"`
|
||||
MemoryUsed *int `json:"memory_used"`
|
||||
MemoryAllocated *int `json:"memory_allocated"`
|
||||
}
|
||||
|
||||
type Socket struct {
|
||||
Cores map[string]string `json:"cores"`
|
||||
}
|
||||
|
||||
type Job struct {
|
||||
Account string `json:"account"`
|
||||
AccrueTime int `json:"accrue_time"`
|
||||
AdminComment string `json:"admin_comment"`
|
||||
ArrayJobID int64 `json:"array_job_id"`
|
||||
ArrayTaskID interface{} `json:"array_task_id"`
|
||||
ArrayMaxTasks int `json:"array_max_tasks"`
|
||||
ArrayTaskString string `json:"array_task_string"`
|
||||
AssociationID int `json:"association_id"`
|
||||
BatchFeatures string `json:"batch_features"`
|
||||
BatchFlag bool `json:"batch_flag"`
|
||||
BatchHost string `json:"batch_host"`
|
||||
Flags []string `json:"flags"`
|
||||
BurstBuffer string `json:"burst_buffer"`
|
||||
BurstBufferState string `json:"burst_buffer_state"`
|
||||
Cluster string `json:"cluster"`
|
||||
ClusterFeatures string `json:"cluster_features"`
|
||||
Command string `json:"command"`
|
||||
Comment string `json:"comment"`
|
||||
Container string `json:"container"`
|
||||
Contiguous bool `json:"contiguous"`
|
||||
CoreSpec interface{} `json:"core_spec"`
|
||||
ThreadSpec interface{} `json:"thread_spec"`
|
||||
CoresPerSocket interface{} `json:"cores_per_socket"`
|
||||
BillableTres interface{} `json:"billable_tres"`
|
||||
CPUPerTask interface{} `json:"cpus_per_task"`
|
||||
CPUFrequencyMinimum interface{} `json:"cpu_frequency_minimum"`
|
||||
CPUFrequencyMaximum interface{} `json:"cpu_frequency_maximum"`
|
||||
CPUFrequencyGovernor interface{} `json:"cpu_frequency_governor"`
|
||||
CPUPerTres string `json:"cpus_per_tres"`
|
||||
Deadline int `json:"deadline"`
|
||||
DelayBoot int `json:"delay_boot"`
|
||||
Dependency string `json:"dependency"`
|
||||
DerivedExitCode int `json:"derived_exit_code"`
|
||||
EligibleTime int `json:"eligible_time"`
|
||||
EndTime int64 `json:"end_time"`
|
||||
ExcludedNodes string `json:"excluded_nodes"`
|
||||
ExitCode int `json:"exit_code"`
|
||||
Features string `json:"features"`
|
||||
FederationOrigin string `json:"federation_origin"`
|
||||
FederationSiblingsActive string `json:"federation_siblings_active"`
|
||||
FederationSiblingsViable string `json:"federation_siblings_viable"`
|
||||
GresDetail []string `json:"gres_detail"`
|
||||
GroupID int `json:"group_id"`
|
||||
GroupName string `json:"group_name"`
|
||||
JobID int64 `json:"job_id"`
|
||||
JobResources JobResource `json:"job_resources"`
|
||||
JobState string `json:"job_state"`
|
||||
LastSchedEvaluation int `json:"last_sched_evaluation"`
|
||||
Licenses string `json:"licenses"`
|
||||
MaxCPUs int `json:"max_cpus"`
|
||||
MaxNodes int `json:"max_nodes"`
|
||||
MCSLabel string `json:"mcs_label"`
|
||||
MemoryPerTres string `json:"memory_per_tres"`
|
||||
Name string `json:"name"`
|
||||
Nodes string `json:"nodes"`
|
||||
Nice interface{} `json:"nice"`
|
||||
TasksPerCore interface{} `json:"tasks_per_core"`
|
||||
TasksPerNode int `json:"tasks_per_node"`
|
||||
TasksPerSocket interface{} `json:"tasks_per_socket"`
|
||||
TasksPerBoard int `json:"tasks_per_board"`
|
||||
CPUs int32 `json:"cpus"`
|
||||
NodeCount int32 `json:"node_count"`
|
||||
Tasks int `json:"tasks"`
|
||||
HETJobID int `json:"het_job_id"`
|
||||
HETJobIDSet string `json:"het_job_id_set"`
|
||||
HETJobOffset int `json:"het_job_offset"`
|
||||
Partition string `json:"partition"`
|
||||
MemoryPerNode interface{} `json:"memory_per_node"`
|
||||
MemoryPerCPU int `json:"memory_per_cpu"`
|
||||
MinimumCPUsPerNode int `json:"minimum_cpus_per_node"`
|
||||
MinimumTmpDiskPerNode int `json:"minimum_tmp_disk_per_node"`
|
||||
PreemptTime int `json:"preempt_time"`
|
||||
PreSUSTime int `json:"pre_sus_time"`
|
||||
Priority int `json:"priority"`
|
||||
Profile interface{} `json:"profile"`
|
||||
QoS string `json:"qos"`
|
||||
Reboot bool `json:"reboot"`
|
||||
RequiredNodes string `json:"required_nodes"`
|
||||
Requeue bool `json:"requeue"`
|
||||
ResizeTime int `json:"resize_time"`
|
||||
RestartCnt int `json:"restart_cnt"`
|
||||
ResvName string `json:"resv_name"`
|
||||
Shared *string `json:"shared"`
|
||||
ShowFlags []string `json:"show_flags"`
|
||||
SocketsPerBoard int `json:"sockets_per_board"`
|
||||
SocketsPerNode interface{} `json:"sockets_per_node"`
|
||||
StartTime int64 `json:"start_time"`
|
||||
StateDescription string `json:"state_description"`
|
||||
StateReason string `json:"state_reason"`
|
||||
StandardError string `json:"standard_error"`
|
||||
StandardInput string `json:"standard_input"`
|
||||
StandardOutput string `json:"standard_output"`
|
||||
SubmitTime int `json:"submit_time"`
|
||||
SuspendTime int `json:"suspend_time"`
|
||||
SystemComment string `json:"system_comment"`
|
||||
TimeLimit int `json:"time_limit"`
|
||||
TimeMinimum int `json:"time_minimum"`
|
||||
ThreadsPerCore interface{} `json:"threads_per_core"`
|
||||
TresBind string `json:"tres_bind"`
|
||||
TresFreq string `json:"tres_freq"`
|
||||
TresPerJob string `json:"tres_per_job"`
|
||||
TresPerNode string `json:"tres_per_node"`
|
||||
TresPerSocket string `json:"tres_per_socket"`
|
||||
TresPerTask string `json:"tres_per_task"`
|
||||
TresReqStr string `json:"tres_req_str"`
|
||||
TresAllocStr string `json:"tres_alloc_str"`
|
||||
UserID int `json:"user_id"`
|
||||
UserName string `json:"user_name"`
|
||||
Wckey string `json:"wckey"`
|
||||
CurrentWorkingDirectory string `json:"current_working_directory"`
|
||||
}
|
||||
|
||||
type SlurmPayload struct {
|
||||
Meta Metadata `json:"meta"`
|
||||
Errors []interface{} `json:"errors"`
|
||||
Jobs []Job `json:"jobs"`
|
||||
}
|
||||
|
||||
type DumpedComment struct {
|
||||
Administrator interface{} `json:"administrator"`
|
||||
Job interface{} `json:"job"`
|
||||
System interface{} `json:"system"`
|
||||
}
|
||||
|
||||
type MaxLimits struct {
|
||||
Running struct {
|
||||
Tasks int `json:"tasks"`
|
||||
} `json:"max"`
|
||||
}
|
||||
|
||||
type ArrayInfo struct {
|
||||
JobID int `json:"job_id"`
|
||||
Limits MaxLimits `json:"limits"`
|
||||
Task interface{} `json:"task"`
|
||||
TaskID interface{} `json:"task_id"`
|
||||
}
|
||||
|
||||
type Association struct {
|
||||
Account string `json:"account"`
|
||||
Cluster string `json:"cluster"`
|
||||
Partition interface{} `json:"partition"`
|
||||
User string `json:"user"`
|
||||
}
|
||||
|
||||
type TimeInfo struct {
|
||||
Elapsed int64 `json:"elapsed"`
|
||||
Eligible int64 `json:"eligible"`
|
||||
End int64 `json:"end"`
|
||||
Start int64 `json:"start"`
|
||||
Submission int64 `json:"submission"`
|
||||
Suspended int64 `json:"suspended"`
|
||||
System struct {
|
||||
Seconds int `json:"seconds"`
|
||||
Microseconds int `json:"microseconds"`
|
||||
} `json:"system"`
|
||||
Limit int `json:"limit"`
|
||||
Total struct {
|
||||
Seconds int `json:"seconds"`
|
||||
Microseconds int `json:"microseconds"`
|
||||
} `json:"total"`
|
||||
User struct {
|
||||
Seconds int `json:"seconds"`
|
||||
Microseconds int `json:"microseconds"`
|
||||
} `json:"user"`
|
||||
}
|
||||
|
||||
type ExitCode struct {
|
||||
Status string `json:"status"`
|
||||
ReturnCode int `json:"return_code"`
|
||||
}
|
||||
|
||||
type DumpedJob struct {
|
||||
Account string `json:"account"`
|
||||
Comment DumpedComment `json:"comment"`
|
||||
AllocationNodes int `json:"allocation_nodes"`
|
||||
Array ArrayInfo `json:"array"`
|
||||
Association Association `json:"association"`
|
||||
Cluster string `json:"cluster"`
|
||||
Constraints string `json:"constraints"`
|
||||
Container interface{} `json:"container"`
|
||||
DerivedExitCode ExitCode `json:"derived_exit_code"`
|
||||
Time TimeInfo `json:"time"`
|
||||
ExitCode ExitCode `json:"exit_code"`
|
||||
Flags []string `json:"flags"`
|
||||
Group string `json:"group"`
|
||||
Het struct {
|
||||
JobID int `json:"job_id"`
|
||||
JobOffset interface{} `json:"job_offset"`
|
||||
} `json:"het"`
|
||||
JobID int64 `json:"job_id"`
|
||||
Name string `json:"name"`
|
||||
MCS struct {
|
||||
Label string `json:"label"`
|
||||
} `json:"mcs"`
|
||||
Nodes string `json:"nodes"`
|
||||
Partition string `json:"partition"`
|
||||
Priority int `json:"priority"`
|
||||
QoS string `json:"qos"`
|
||||
Required struct {
|
||||
CPUs int `json:"CPUs"`
|
||||
Memory int `json:"memory"`
|
||||
} `json:"required"`
|
||||
KillRequestUser interface{} `json:"kill_request_user"`
|
||||
Reservation struct {
|
||||
ID int `json:"id"`
|
||||
Name int `json:"name"`
|
||||
} `json:"reservation"`
|
||||
State struct {
|
||||
Current string `json:"current"`
|
||||
Reason string `json:"reason"`
|
||||
} `json:"state"`
|
||||
Steps []struct {
|
||||
Nodes struct {
|
||||
List []string `json:"list"`
|
||||
Count int `json:"count"`
|
||||
Range string `json:"range"`
|
||||
} `json:"nodes"`
|
||||
Tres struct {
|
||||
Requested struct {
|
||||
Max []interface{} `json:"max"`
|
||||
Min []interface{} `json:"min"`
|
||||
Average []interface{} `json:"average"`
|
||||
Total []interface{} `json:"total"`
|
||||
} `json:"requested"`
|
||||
Consumed struct {
|
||||
Max []interface{} `json:"max"`
|
||||
Min []interface{} `json:"min"`
|
||||
Average []interface{} `json:"average"`
|
||||
Total []interface{} `json:"total"`
|
||||
} `json:"consumed"`
|
||||
Allocated []struct {
|
||||
Type string `json:"type"`
|
||||
Name interface{} `json:"name"`
|
||||
ID int `json:"id"`
|
||||
Count int `json:"count"`
|
||||
} `json:"allocated"`
|
||||
} `json:"tres"`
|
||||
Time TimeInfo `json:"time"`
|
||||
ExitCode ExitCode `json:"exit_code"`
|
||||
Tasks struct {
|
||||
Count int `json:"count"`
|
||||
} `json:"tasks"`
|
||||
PID interface{} `json:"pid"`
|
||||
CPU struct {
|
||||
RequestedFrequency struct {
|
||||
Min int `json:"min"`
|
||||
Max int `json:"max"`
|
||||
} `json:"requested_frequency"`
|
||||
Governor []interface{} `json:"governor"`
|
||||
} `json:"CPU"`
|
||||
KillRequestUser interface{} `json:"kill_request_user"`
|
||||
State string `json:"state"`
|
||||
Statistics struct {
|
||||
CPU struct {
|
||||
ActualFrequency int `json:"actual_frequency"`
|
||||
} `json:"CPU"`
|
||||
Energy struct {
|
||||
Consumed int `json:"consumed"`
|
||||
} `json:"energy"`
|
||||
} `json:"statistics"`
|
||||
Step struct {
|
||||
JobID int `json:"job_id"`
|
||||
Het struct {
|
||||
Component interface{} `json:"component"`
|
||||
} `json:"het"`
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
} `json:"step"`
|
||||
Task struct {
|
||||
Distribution string `json:"distribution"`
|
||||
} `json:"task"`
|
||||
} `json:"steps"`
|
||||
Tres struct {
|
||||
Allocated []struct {
|
||||
Type string `json:"type"`
|
||||
Name interface{} `json:"name"`
|
||||
ID int `json:"id"`
|
||||
Count int `json:"count"`
|
||||
} `json:"allocated"`
|
||||
Requested []struct {
|
||||
Type string `json:"type"`
|
||||
Name interface{} `json:"name"`
|
||||
ID int `json:"id"`
|
||||
Count int `json:"count"`
|
||||
} `json:"requested"`
|
||||
} `json:"tres"`
|
||||
User string `json:"user"`
|
||||
Wckey struct {
|
||||
Wckey string `json:"wckey"`
|
||||
Flags []string `json:"flags"`
|
||||
} `json:"wckey"`
|
||||
WorkingDirectory string `json:"working_directory"`
|
||||
}
|
||||
|
||||
type SlurmDBPayload struct {
|
||||
Meta Metadata `json:"meta"`
|
||||
Errors []string `json:"errors"`
|
||||
Jobs []DumpedJob `json:"jobs"`
|
||||
}
|
||||
|
||||
func DecodeClusterConfig(filename string) (ClusterConfig, error) {
|
||||
var clusterConfig ClusterConfig
|
||||
|
||||
file, err := os.Open(filename)
|
||||
if err != nil {
|
||||
log.Errorf("Cluster config file not found. No cores/GPU ids available.")
|
||||
return clusterConfig, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
decoder := json.NewDecoder(file)
|
||||
err = decoder.Decode(&clusterConfig)
|
||||
if err != nil {
|
||||
log.Errorf("Error decoding cluster config file: %v", err)
|
||||
}
|
||||
|
||||
log.Printf("Name: %s\n", clusterConfig.Name)
|
||||
log.Printf("MetricConfig:\n")
|
||||
for _, metric := range clusterConfig.MetricConfig {
|
||||
log.Printf(" Name: %s\n", metric.Name)
|
||||
log.Printf(" Unit Base: %s\n", metric.Unit.Base)
|
||||
log.Printf(" Scope: %s\n", metric.Scope)
|
||||
log.Printf(" Aggregation: %s\n", metric.Aggregation)
|
||||
log.Printf(" Timestep: %d\n", metric.Timestep)
|
||||
log.Printf(" Peak: %f\n", metric.Peak)
|
||||
log.Printf(" Normal: %f\n", metric.Normal)
|
||||
log.Printf(" Caution: %f\n", metric.Caution)
|
||||
log.Printf(" Alert: %f\n", metric.Alert)
|
||||
}
|
||||
log.Printf("SubClusters:\n")
|
||||
for _, subCluster := range clusterConfig.SubClusters {
|
||||
log.Printf(" Name: %s\n", subCluster.Name)
|
||||
log.Printf(" Nodes: %s\n", subCluster.Nodes)
|
||||
log.Printf(" Processor Type: %s\n", subCluster.ProcessorType)
|
||||
log.Printf(" Sockets Per Node: %d\n", subCluster.SocketsPerNode)
|
||||
log.Printf(" Cores Per Socket: %d\n", subCluster.CoresPerSocket)
|
||||
log.Printf(" Threads Per Core: %d\n", subCluster.ThreadsPerCore)
|
||||
log.Printf(" Flop Rate Scalar Unit Base: %s\n", subCluster.FlopRateScalar.Unit.Base)
|
||||
log.Printf(" Flop Rate Scalar Unit Prefix: %s\n", subCluster.FlopRateScalar.Unit.Prefix)
|
||||
log.Printf(" Flop Rate Scalar Value: %f\n", subCluster.FlopRateScalar.Value)
|
||||
log.Printf(" Flop Rate Simd Unit Base: %s\n", subCluster.FlopRateSimd.Unit.Base)
|
||||
log.Printf(" Flop Rate Simd Unit Prefix: %s\n", subCluster.FlopRateSimd.Unit.Prefix)
|
||||
log.Printf(" Flop Rate Simd Value: %f\n", subCluster.FlopRateSimd.Value)
|
||||
log.Printf(" Memory Bandwidth Unit Base: %s\n", subCluster.MemoryBandwidth.Unit.Base)
|
||||
log.Printf(" Memory Bandwidth Unit Prefix: %s\n", subCluster.MemoryBandwidth.Unit.Prefix)
|
||||
log.Printf(" Memory Bandwidth Value: %f\n", subCluster.MemoryBandwidth.Value)
|
||||
log.Printf(" Topology Node: %v\n", subCluster.Topology.Node)
|
||||
log.Printf(" Topology Socket: %v\n", subCluster.Topology.Socket)
|
||||
log.Printf(" Topology Memory Domain: %v\n", subCluster.Topology.MemoryDomain)
|
||||
log.Printf(" Topology Core: %v\n", subCluster.Topology.Core)
|
||||
log.Printf(" Topology Accelerators:\n")
|
||||
for _, accelerator := range subCluster.Topology.Accelerators {
|
||||
log.Printf(" ID: %s\n", accelerator.ID)
|
||||
log.Printf(" Type: %s\n", accelerator.Type)
|
||||
log.Printf(" Model: %s\n", accelerator.Model)
|
||||
}
|
||||
}
|
||||
|
||||
return clusterConfig, nil
|
||||
}
|
||||
|
||||
func UnmarshalSlurmPayload(jsonPayload string) (SlurmPayload, error) {
|
||||
var slurmData SlurmPayload
|
||||
err := json.Unmarshal([]byte(jsonPayload), &slurmData)
|
||||
if err != nil {
|
||||
return slurmData, fmt.Errorf("failed to unmarshal JSON data: %v", err)
|
||||
}
|
||||
return slurmData, nil
|
||||
}
|
27
internal/scheduler/scheduler.go
Normal file
27
internal/scheduler/scheduler.go
Normal file
@@ -0,0 +1,27 @@
|
||||
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package scheduler
|
||||
|
||||
import "encoding/json"
|
||||
|
||||
type BatchScheduler interface {
|
||||
Init(rawConfig json.RawMessage) error
|
||||
|
||||
Sync()
|
||||
}
|
||||
|
||||
var sd BatchScheduler
|
||||
|
||||
func Init(rawConfig json.RawMessage) error {
|
||||
|
||||
sd = &SlurmNatsScheduler{}
|
||||
sd.Init(rawConfig)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetHandle() BatchScheduler {
|
||||
return sd
|
||||
}
|
214
internal/scheduler/slurmNats.go
Normal file
214
internal/scheduler/slurmNats.go
Normal file
@@ -0,0 +1,214 @@
|
||||
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/importer"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
type SlurmNatsConfig struct {
|
||||
URL string `json:"url"`
|
||||
}
|
||||
|
||||
type SlurmNatsScheduler struct {
|
||||
url string
|
||||
|
||||
RepositoryMutex sync.Mutex
|
||||
JobRepository *repository.JobRepository
|
||||
}
|
||||
|
||||
type StopJobRequest struct {
|
||||
// Stop Time of job as epoch
|
||||
StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"`
|
||||
State schema.JobState `json:"jobState" validate:"required" example:"completed"` // Final job state
|
||||
JobId *int64 `json:"jobId" example:"123000"` // Cluster Job ID of job
|
||||
Cluster *string `json:"cluster" example:"fritz"` // Cluster of job
|
||||
StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch
|
||||
}
|
||||
|
||||
func (sd *SlurmNatsScheduler) startJob(req *schema.JobMeta) {
|
||||
fmt.Printf("DEBUG: %+v\n", *req)
|
||||
|
||||
log.Printf("Server Name: %s - Job ID: %v", req.BaseJob.Cluster, req.BaseJob.JobID)
|
||||
log.Printf("User: %s - Project: %s", req.BaseJob.User, req.BaseJob.Project)
|
||||
|
||||
if req.State == "" {
|
||||
req.State = schema.JobStateRunning
|
||||
}
|
||||
if err := importer.SanityChecks(&req.BaseJob); err != nil {
|
||||
log.Errorf("Sanity checks failed: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// aquire lock to avoid race condition between API calls
|
||||
var unlockOnce sync.Once
|
||||
sd.RepositoryMutex.Lock()
|
||||
defer unlockOnce.Do(sd.RepositoryMutex.Unlock)
|
||||
|
||||
// Check if combination of (job_id, cluster_id, start_time) already exists:
|
||||
jobs, err := sd.JobRepository.FindAll(&req.JobID, &req.Cluster, nil)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
log.Errorf("checking for duplicate failed: %s", err.Error())
|
||||
return
|
||||
} else if err == nil {
|
||||
for _, job := range jobs {
|
||||
if (req.StartTime - job.StartTimeUnix) < 86400 {
|
||||
log.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d", job.ID)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
id, err := sd.JobRepository.Start(req)
|
||||
if err != nil {
|
||||
log.Errorf("insert into database failed: %s", err.Error())
|
||||
return
|
||||
}
|
||||
// unlock here, adding Tags can be async
|
||||
unlockOnce.Do(sd.RepositoryMutex.Unlock)
|
||||
|
||||
for _, tag := range req.Tags {
|
||||
if _, err := sd.JobRepository.AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
|
||||
log.Errorf("adding tag to new job %d failed: %s", id, err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime)
|
||||
}
|
||||
|
||||
func (sd *SlurmNatsScheduler) checkAndHandleStopJob(job *schema.Job, req *StopJobRequest) {
|
||||
|
||||
// Sanity checks
|
||||
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
|
||||
log.Errorf("stopTime must be larger than startTime and only running jobs can be stopped")
|
||||
return
|
||||
}
|
||||
|
||||
if req.State != "" && !req.State.Valid() {
|
||||
log.Errorf("invalid job state: %#v", req.State)
|
||||
return
|
||||
} else if req.State == "" {
|
||||
req.State = schema.JobStateCompleted
|
||||
}
|
||||
|
||||
// Mark job as stopped in the database (update state and duration)
|
||||
job.Duration = int32(req.StopTime - job.StartTime.Unix())
|
||||
job.State = req.State
|
||||
if err := sd.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
|
||||
log.Errorf("marking job as stopped failed: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime)
|
||||
|
||||
// Monitoring is disabled...
|
||||
if job.MonitoringStatus == schema.MonitoringStatusDisabled {
|
||||
return
|
||||
}
|
||||
|
||||
// Trigger async archiving
|
||||
sd.JobRepository.TriggerArchiving(job)
|
||||
}
|
||||
|
||||
func (sd *SlurmNatsScheduler) stopJob(req *StopJobRequest) {
|
||||
// if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleApi) {
|
||||
// log.Errorf("missing role: %v", auth.GetRoleString(auth.RoleApi))
|
||||
// return
|
||||
// }
|
||||
|
||||
log.Printf("Server Name: %s - Job ID: %v", *req.Cluster, req.JobId)
|
||||
|
||||
// Fetch job (that will be stopped) from db
|
||||
var job *schema.Job
|
||||
var err error
|
||||
if req.JobId == nil {
|
||||
log.Errorf("the field 'jobId' is required")
|
||||
return
|
||||
}
|
||||
|
||||
job, err = sd.JobRepository.Find(req.JobId, req.Cluster, req.StartTime)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("finding job failed: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
sd.checkAndHandleStopJob(job, req)
|
||||
}
|
||||
|
||||
func (sd *SlurmNatsScheduler) Init(rawConfig json.RawMessage) error {
|
||||
servers := []string{"nats://127.0.0.1:4222", "nats://127.0.0.1:1223"}
|
||||
|
||||
nc, err := nats.Connect(strings.Join(servers, ","))
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
getStatusTxt := func(nc *nats.Conn) string {
|
||||
switch nc.Status() {
|
||||
case nats.CONNECTED:
|
||||
return "Connected"
|
||||
case nats.CLOSED:
|
||||
return "Closed"
|
||||
default:
|
||||
return "Other"
|
||||
}
|
||||
}
|
||||
log.Printf("The connection status is %v\n", getStatusTxt(nc))
|
||||
|
||||
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
defer ec.Close()
|
||||
|
||||
// Define the object
|
||||
type encodedMessage struct {
|
||||
ServerName string
|
||||
ResponseCode int
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
|
||||
// Subscribe
|
||||
if _, err := ec.Subscribe("test", func(s *encodedMessage) {
|
||||
log.Printf("Server Name: %s - Response Code: %v", s.ServerName, s.ResponseCode)
|
||||
if s.ResponseCode == 500 {
|
||||
wg.Done()
|
||||
}
|
||||
}); err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
|
||||
if _, err := ec.Subscribe("startJob", sd.startJob); err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
|
||||
if _, err := ec.Subscribe("stopJob", sd.stopJob); err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
|
||||
// Wait for a message to come in
|
||||
wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sd *SlurmNatsScheduler) Sync() {
|
||||
|
||||
}
|
565
internal/scheduler/slurmRest.go
Normal file
565
internal/scheduler/slurmRest.go
Normal file
@@ -0,0 +1,565 @@
|
||||
// Copyright (C) 2023 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
)
|
||||
|
||||
type SlurmRestSchedulerConfig struct {
|
||||
URL string `json:"url"`
|
||||
|
||||
JobRepository *repository.JobRepository
|
||||
|
||||
clusterConfig ClusterConfig
|
||||
|
||||
client *http.Client
|
||||
|
||||
nodeClusterMap map[string]string
|
||||
|
||||
clusterPCIEAddressMap map[string][]string
|
||||
}
|
||||
|
||||
func (cfg *SlurmRestSchedulerConfig) queryDB(qtime int64, clusterName string) ([]interface{}, error) {
|
||||
|
||||
apiEndpoint := "/slurmdb/v0.0.38/jobs"
|
||||
|
||||
// Construct the query parameters
|
||||
queryParams := url.Values{}
|
||||
queryParams.Set("users", "user1,user2")
|
||||
queryParams.Set("submit_time", "2023-01-01T00:00:00")
|
||||
|
||||
// Add the query parameters to the API endpoint
|
||||
apiEndpoint += "?" + queryParams.Encode()
|
||||
|
||||
// Create a new HTTP GET request
|
||||
req, err := http.NewRequest("GET", apiEndpoint, nil)
|
||||
if err != nil {
|
||||
log.Errorf("Error creating request: %v", err)
|
||||
}
|
||||
|
||||
// Send the request
|
||||
resp, err := cfg.client.Do(req)
|
||||
if err != nil {
|
||||
log.Errorf("Error sending request: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Check the response status code
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
log.Errorf("API request failed with status: %v", resp.Status)
|
||||
}
|
||||
|
||||
// Read the response body
|
||||
// Here you can parse the response body as needed
|
||||
// For simplicity, let's just print the response body
|
||||
var dbOutput []byte
|
||||
_, err = resp.Body.Read(dbOutput)
|
||||
if err != nil {
|
||||
log.Errorf("Error reading response body: %v", err)
|
||||
}
|
||||
|
||||
log.Errorf("API response: %v", string(dbOutput))
|
||||
|
||||
dataJobs := make(map[string]interface{})
|
||||
err = json.Unmarshal(dbOutput, &dataJobs)
|
||||
if err != nil {
|
||||
log.Errorf("Error parsing JSON response: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if _, ok := dataJobs["jobs"]; !ok {
|
||||
log.Errorf("ERROR: jobs not found - response incomplete")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
jobs, _ := dataJobs["jobs"].([]interface{})
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
func (cfg *SlurmRestSchedulerConfig) fetchJobs() (SlurmPayload, error) {
|
||||
var ctlOutput []byte
|
||||
|
||||
apiEndpoint := "http://:8080/slurm/v0.0.38/jobs"
|
||||
// Create a new HTTP GET request with query parameters
|
||||
req, err := http.NewRequest("GET", apiEndpoint, nil)
|
||||
if err != nil {
|
||||
log.Errorf("Error creating request: %v", err)
|
||||
}
|
||||
|
||||
// Send the request
|
||||
resp, err := cfg.client.Do(req)
|
||||
if err != nil {
|
||||
log.Errorf("Error sending request: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Check the response status code
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
log.Errorf("API request failed with status: %v", resp.Status)
|
||||
}
|
||||
|
||||
_, err = resp.Body.Read(ctlOutput)
|
||||
log.Printf("Received JSON Data: %v", ctlOutput)
|
||||
if err != nil {
|
||||
log.Errorf("Error reading response body: %v", err)
|
||||
}
|
||||
|
||||
var jobsResponse SlurmPayload
|
||||
err = json.Unmarshal(ctlOutput, &jobsResponse)
|
||||
if err != nil {
|
||||
log.Errorf("Error parsing JSON response: %v", err)
|
||||
return jobsResponse, err
|
||||
}
|
||||
|
||||
return jobsResponse, nil
|
||||
}
|
||||
|
||||
func fetchJobsLocal() (SlurmPayload, error) {
|
||||
// Read the Slurm Payload JSON file
|
||||
jobsData, err := os.ReadFile("slurm_0038.json")
|
||||
|
||||
if err != nil {
|
||||
fmt.Println("Error reading Slurm Payload JSON file:", err)
|
||||
}
|
||||
|
||||
var jobsResponse SlurmPayload
|
||||
err = json.Unmarshal(jobsData, &jobsResponse)
|
||||
if err != nil {
|
||||
log.Errorf("Error parsing Slurm Payload JSON response: %v", err)
|
||||
return jobsResponse, err
|
||||
}
|
||||
|
||||
return jobsResponse, nil
|
||||
}
|
||||
|
||||
func fetchDumpedJobsLocal() (SlurmDBPayload, error) {
|
||||
// Read the SlurmDB Payload JSON file
|
||||
jobsData, err := os.ReadFile("slurmdb_0038-large.json")
|
||||
|
||||
if err != nil {
|
||||
fmt.Println("Error reading SlurmDB Payload JSON file:", err)
|
||||
}
|
||||
|
||||
var jobsResponse SlurmDBPayload
|
||||
err = json.Unmarshal(jobsData, &jobsResponse)
|
||||
if err != nil {
|
||||
log.Errorf("Error parsing SlurmDB Payload JSON response: %v", err)
|
||||
return jobsResponse, err
|
||||
}
|
||||
|
||||
return jobsResponse, nil
|
||||
}
|
||||
|
||||
func printSlurmInfo(job Job) string {
|
||||
|
||||
text := fmt.Sprintf(`
|
||||
JobId=%v JobName=%v
|
||||
UserId=%v(%v) GroupId=%v
|
||||
Account=%v QOS=%v
|
||||
Requeue=%v Restarts=%v BatchFlag=%v
|
||||
TimeLimit=%v
|
||||
SubmitTime=%v
|
||||
Partition=%v
|
||||
NodeList=%v
|
||||
NumNodes=%v NumCPUs=%v NumTasks=%v CPUs/Task=%v
|
||||
NTasksPerNode:Socket:Core=%v:%v:%v
|
||||
TRES_req=%v
|
||||
TRES_alloc=%v
|
||||
Command=%v
|
||||
WorkDir=%v
|
||||
StdErr=%v
|
||||
StdOut=%v`,
|
||||
job.JobID, job.Name,
|
||||
job.UserName, job.UserID, job.GroupID,
|
||||
job.Account, job.QoS,
|
||||
job.Requeue, job.RestartCnt, job.BatchFlag,
|
||||
job.TimeLimit, job.SubmitTime,
|
||||
job.Partition,
|
||||
job.Nodes,
|
||||
job.NodeCount, job.CPUs, job.Tasks, job.CPUPerTask,
|
||||
job.TasksPerBoard, job.TasksPerSocket, job.TasksPerCore,
|
||||
job.TresAllocStr,
|
||||
job.TresAllocStr,
|
||||
job.Command,
|
||||
job.CurrentWorkingDirectory,
|
||||
job.StandardError,
|
||||
job.StandardOutput,
|
||||
)
|
||||
|
||||
return text
|
||||
}
|
||||
|
||||
func exitWithError(err error, output []byte) {
|
||||
if exitError, ok := err.(*exec.ExitError); ok {
|
||||
if exitError.ExitCode() == 28 {
|
||||
fmt.Fprintf(os.Stderr, "ERROR: API call failed with timeout; check slurmrestd.\nOutput:\n%s\n", output)
|
||||
} else {
|
||||
fmt.Fprintf(os.Stderr, "ERROR: API call failed with code %d;\nOutput:\n%s\n", exitError.ExitCode(), output)
|
||||
}
|
||||
} else {
|
||||
log.Errorf("ERROR: %v", err)
|
||||
}
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
func (cfg *SlurmRestSchedulerConfig) Init() error {
|
||||
var err error
|
||||
|
||||
cfg.clusterConfig, err = DecodeClusterConfig("cluster-alex.json")
|
||||
|
||||
cfg.nodeClusterMap = make(map[string]string)
|
||||
cfg.clusterPCIEAddressMap = make(map[string][]string)
|
||||
|
||||
for _, subCluster := range cfg.clusterConfig.SubClusters {
|
||||
|
||||
cfg.ConstructNodeClusterMap(subCluster.Nodes, subCluster.Name)
|
||||
|
||||
pcieAddresses := make([]string, 0, 32)
|
||||
|
||||
for idx, accelerator := range subCluster.Topology.Accelerators {
|
||||
pcieAddresses[idx] = accelerator.ID
|
||||
}
|
||||
|
||||
cfg.clusterPCIEAddressMap[subCluster.Name] = pcieAddresses
|
||||
}
|
||||
// Create an HTTP client
|
||||
cfg.client = &http.Client{}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (cfg *SlurmRestSchedulerConfig) checkAndHandleStopJob(job *schema.Job, req *StopJobRequest) {
|
||||
|
||||
// Sanity checks
|
||||
if job == nil || job.StartTime.Unix() >= req.StopTime || job.State != schema.JobStateRunning {
|
||||
log.Errorf("stopTime must be larger than startTime and only running jobs can be stopped")
|
||||
return
|
||||
}
|
||||
|
||||
if req.State != "" && !req.State.Valid() {
|
||||
log.Errorf("invalid job state: %#v", req.State)
|
||||
return
|
||||
} else if req.State == "" {
|
||||
req.State = schema.JobStateCompleted
|
||||
}
|
||||
|
||||
// Mark job as stopped in the database (update state and duration)
|
||||
job.Duration = int32(req.StopTime - job.StartTime.Unix())
|
||||
job.State = req.State
|
||||
if err := cfg.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
|
||||
log.Errorf("marking job as stopped failed: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime)
|
||||
|
||||
// Monitoring is disabled...
|
||||
if job.MonitoringStatus == schema.MonitoringStatusDisabled {
|
||||
return
|
||||
}
|
||||
|
||||
// Trigger async archiving
|
||||
cfg.JobRepository.TriggerArchiving(job)
|
||||
}
|
||||
|
||||
func (cfg *SlurmRestSchedulerConfig) ConstructNodeClusterMap(nodes string, cluster string) {
|
||||
|
||||
if cfg.nodeClusterMap == nil {
|
||||
cfg.nodeClusterMap = make(map[string]string)
|
||||
}
|
||||
|
||||
// Split the input by commas
|
||||
groups := strings.Split(nodes, ",")
|
||||
|
||||
for _, group := range groups {
|
||||
// Use regular expressions to match numbers and ranges
|
||||
numberRangeRegex := regexp.MustCompile(`a\[(\d+)-(\d+)\]`)
|
||||
numberRegex := regexp.MustCompile(`a(\d+)`)
|
||||
|
||||
if numberRangeRegex.MatchString(group) {
|
||||
// Extract nodes from ranges
|
||||
matches := numberRangeRegex.FindStringSubmatch(group)
|
||||
if len(matches) == 3 {
|
||||
start, _ := strconv.Atoi(matches[1])
|
||||
end, _ := strconv.Atoi(matches[2])
|
||||
for i := start; i <= end; i++ {
|
||||
cfg.nodeClusterMap[matches[0]+fmt.Sprintf("%04d", i)] = cluster
|
||||
}
|
||||
}
|
||||
} else if numberRegex.MatchString(group) {
|
||||
// Extract individual node
|
||||
matches := numberRegex.FindStringSubmatch(group)
|
||||
if len(matches) == 2 {
|
||||
cfg.nodeClusterMap[group] = cluster
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func extractElements(indicesStr string, addresses []string) ([]string, error) {
|
||||
// Split the input string by commas to get individual index ranges
|
||||
indexRanges := strings.Split(indicesStr, ",")
|
||||
|
||||
var selected []string
|
||||
for _, indexRange := range indexRanges {
|
||||
// Split each index range by hyphen to separate start and end indices
|
||||
rangeParts := strings.Split(indexRange, "-")
|
||||
|
||||
if len(rangeParts) == 1 {
|
||||
// If there's only one part, it's a single index
|
||||
index, err := strconv.Atoi(rangeParts[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
selected = append(selected, addresses[index])
|
||||
} else if len(rangeParts) == 2 {
|
||||
// If there are two parts, it's a range
|
||||
start, err := strconv.Atoi(rangeParts[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
end, err := strconv.Atoi(rangeParts[1])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Add all indices in the range to the result
|
||||
for i := start; i <= end; i++ {
|
||||
selected = append(selected, addresses[i])
|
||||
}
|
||||
} else {
|
||||
// Invalid format
|
||||
return nil, fmt.Errorf("invalid index range: %s", indexRange)
|
||||
}
|
||||
}
|
||||
|
||||
return selected, nil
|
||||
}
|
||||
|
||||
func (cfg *SlurmRestSchedulerConfig) CreateJobMeta(job Job) (*schema.JobMeta, error) {
|
||||
|
||||
var exclusive int32
|
||||
if job.Shared == nil {
|
||||
exclusive = 1
|
||||
} else {
|
||||
exclusive = 0
|
||||
}
|
||||
|
||||
totalGPUs := 0
|
||||
|
||||
var resources []*schema.Resource
|
||||
|
||||
for nodeIndex, node := range job.JobResources.AllocatedNodes {
|
||||
var res schema.Resource
|
||||
res.Hostname = node.Nodename
|
||||
|
||||
log.Debugf("Node %s Cores map size: %d\n", node.Nodename, len(node.Sockets))
|
||||
|
||||
if node.CPUsUsed == nil || node.MemoryAllocated == nil {
|
||||
log.Fatalf("Either node.Cpus or node.Memory is nil\n")
|
||||
}
|
||||
|
||||
for k, v := range node.Sockets {
|
||||
fmt.Printf("core id[%s] value[%s]\n", k, v)
|
||||
threadID, _ := strconv.Atoi(k)
|
||||
res.HWThreads = append(res.HWThreads, threadID)
|
||||
}
|
||||
|
||||
re := regexp.MustCompile(`\(([^)]*)\)`)
|
||||
matches := re.FindStringSubmatch(job.GresDetail[nodeIndex])
|
||||
|
||||
if len(matches) < 2 {
|
||||
return nil, fmt.Errorf("no substring found in brackets")
|
||||
}
|
||||
|
||||
nodePCIEAddresses := cfg.clusterPCIEAddressMap[cfg.nodeClusterMap[node.Nodename]]
|
||||
|
||||
selectedPCIEAddresses, err := extractElements(matches[1], nodePCIEAddresses)
|
||||
|
||||
totalGPUs += len(selectedPCIEAddresses)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// For core/GPU id mapping, need to query from cluster config file
|
||||
res.Accelerators = selectedPCIEAddresses
|
||||
resources = append(resources, &res)
|
||||
}
|
||||
|
||||
metaData := make(map[string]string)
|
||||
metaData["jobName"] = job.Name
|
||||
metaData["slurmInfo"] = printSlurmInfo(job)
|
||||
|
||||
metaDataInBytes, err := json.Marshal(metaData)
|
||||
if err != nil {
|
||||
log.Fatalf("metaData JSON marshaling failed: %s", err)
|
||||
}
|
||||
|
||||
var defaultJob schema.BaseJob = schema.BaseJob{
|
||||
JobID: job.JobID,
|
||||
User: job.UserName,
|
||||
Project: job.Account,
|
||||
Cluster: job.Cluster,
|
||||
Partition: job.Partition,
|
||||
// check nil
|
||||
ArrayJobId: job.ArrayJobID,
|
||||
NumNodes: job.NodeCount,
|
||||
NumHWThreads: job.CPUs,
|
||||
NumAcc: int32(totalGPUs),
|
||||
Exclusive: exclusive,
|
||||
// MonitoringStatus: job.MonitoringStatus,
|
||||
// SMT: job.TasksPerCore,
|
||||
State: schema.JobState(job.JobState),
|
||||
// ignore this for start job
|
||||
// Duration: int32(time.Now().Unix() - job.StartTime), // or SubmitTime?
|
||||
Walltime: time.Now().Unix(), // max duration requested by the job
|
||||
// Tags: job.Tags,
|
||||
// ignore this!
|
||||
// RawResources: jobResourcesInBytes,
|
||||
// "job_resources": "allocated_nodes" "sockets":
|
||||
// very important; has to be right
|
||||
Resources: resources,
|
||||
RawMetaData: metaDataInBytes,
|
||||
// optional metadata with'jobScript 'jobName': 'slurmInfo':
|
||||
MetaData: metaData,
|
||||
// ConcurrentJobs: job.ConcurrentJobs,
|
||||
}
|
||||
log.Debugf("Generated BaseJob with Resources=%v", defaultJob.Resources[0])
|
||||
|
||||
meta := &schema.JobMeta{
|
||||
BaseJob: defaultJob,
|
||||
StartTime: job.StartTime,
|
||||
Statistics: make(map[string]schema.JobStatistics),
|
||||
}
|
||||
// log.Debugf("Generated JobMeta %v", req.BaseJob.JobID)
|
||||
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
func (cfg *SlurmRestSchedulerConfig) HandleJobs(jobs []Job) error {
|
||||
|
||||
// runningJobsInCC, err := cfg.JobRepository.FindRunningJobs("alex")
|
||||
|
||||
// Iterate over the Jobs slice
|
||||
for _, job := range jobs {
|
||||
// Process each job from Slurm
|
||||
fmt.Printf("Job ID: %d\n", job.JobID)
|
||||
fmt.Printf("Job Name: %s\n", job.Name)
|
||||
fmt.Printf("Job State: %s\n", job.JobState)
|
||||
fmt.Println("Job StartTime:", job.StartTime)
|
||||
fmt.Println("Job Cluster:", job.Cluster)
|
||||
|
||||
if job.JobState == "RUNNING" {
|
||||
|
||||
meta, _ := cfg.CreateJobMeta(job)
|
||||
|
||||
// For all running jobs from Slurm
|
||||
_, notFoundError := cfg.JobRepository.Find(&job.JobID, &job.Cluster, &job.StartTime)
|
||||
|
||||
if notFoundError != nil {
|
||||
// if it does not exist in CC, create a new entry
|
||||
log.Print("Job does not exist in CC, will create a new entry:", job.JobID)
|
||||
id, startJobError := cfg.JobRepository.Start(meta)
|
||||
if startJobError != nil {
|
||||
return startJobError
|
||||
}
|
||||
log.Debug("Added job", id)
|
||||
}
|
||||
|
||||
// Running in both sides: nothing needs to be done
|
||||
} else if job.JobState == "COMPLETED" {
|
||||
// Check if completed job with combination of (job_id, cluster_id, start_time) already exists
|
||||
log.Debugf("Processing completed job ID: %v Cluster: %v StartTime: %v", job.JobID, job.Cluster, job.StartTime)
|
||||
existingJob, err := cfg.JobRepository.Find(&job.JobID, &job.Cluster, &job.StartTime)
|
||||
|
||||
if err == nil && existingJob.State != schema.JobStateCompleted {
|
||||
// for jobs completed in Slurm (either in Slurm or maybe SlurmDB)
|
||||
// update job in CC with new info (job final status, duration, end timestamp)
|
||||
|
||||
existingJob.BaseJob.Duration = int32(job.EndTime - job.StartTime)
|
||||
existingJob.BaseJob.State = schema.JobState(job.JobState)
|
||||
existingJob.BaseJob.Walltime = job.EndTime
|
||||
|
||||
req := &StopJobRequest{
|
||||
Cluster: &job.Cluster,
|
||||
JobId: &job.JobID,
|
||||
State: schema.JobState(job.JobState),
|
||||
StartTime: &job.StartTime,
|
||||
StopTime: job.EndTime,
|
||||
}
|
||||
cfg.checkAndHandleStopJob(existingJob, req)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cfg *SlurmRestSchedulerConfig) HandleDumpedJobs(jobs []DumpedJob) error {
|
||||
|
||||
// Iterate over the Jobs slice
|
||||
for _, job := range jobs {
|
||||
// Process each job from Slurm
|
||||
fmt.Printf("Job ID: %d\n", job.JobID)
|
||||
fmt.Printf("Job Name: %s\n", job.Name)
|
||||
fmt.Printf("Job State: %s\n", job.State.Current)
|
||||
fmt.Println("Job EndTime:", job.Time.End)
|
||||
fmt.Println("Job Cluster:", job.Cluster)
|
||||
|
||||
// Check if completed job with combination of (job_id, cluster_id, start_time) already exists
|
||||
log.Debugf("Processing completed dumped job ID: %v Cluster: %v StartTime: %v", job.JobID, job.Cluster, job.Time.Start)
|
||||
existingJob, err := cfg.JobRepository.Find(&job.JobID, &job.Cluster, &job.Time.Start)
|
||||
|
||||
if err == nil && existingJob.State != schema.JobStateCompleted {
|
||||
// for jobs completed in Slurm (either in Slurm or maybe SlurmDB)
|
||||
// update job in CC with new info (job final status, duration, end timestamp)
|
||||
|
||||
existingJob.BaseJob.Duration = int32(job.Time.End - job.Time.Start)
|
||||
existingJob.BaseJob.State = schema.JobState(job.State.Current)
|
||||
existingJob.BaseJob.Walltime = job.Time.End
|
||||
|
||||
req := &StopJobRequest{
|
||||
Cluster: &job.Cluster,
|
||||
JobId: &job.JobID,
|
||||
State: schema.JobState(job.State.Current),
|
||||
StartTime: &job.Time.Start,
|
||||
StopTime: job.Time.End,
|
||||
}
|
||||
cfg.checkAndHandleStopJob(existingJob, req)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cfg *SlurmRestSchedulerConfig) Sync() {
|
||||
|
||||
// Fetch an instance of Slurm JobsResponse
|
||||
jobsResponse, err := fetchJobsLocal()
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
cfg.HandleJobs(jobsResponse.Jobs)
|
||||
|
||||
// Fetch an instance of Slurm DB JobsResponse
|
||||
dumpedJobsResponse, err := fetchDumpedJobsLocal()
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
cfg.HandleDumpedJobs(dumpedJobsResponse.Jobs)
|
||||
|
||||
}
|
137
tools/nats-manager/main.go
Normal file
137
tools/nats-manager/main.go
Normal file
@@ -0,0 +1,137 @@
|
||||
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/scheduler"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
func usage() {
|
||||
log.Printf("Usage: nats-pub [-s server] [-creds file] <subject> <msg>\n")
|
||||
flag.PrintDefaults()
|
||||
}
|
||||
|
||||
func showUsageAndExit(exitcode int) {
|
||||
usage()
|
||||
os.Exit(exitcode)
|
||||
}
|
||||
|
||||
func setupPublisher() {
|
||||
var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")
|
||||
var userCreds = flag.String("creds", "", "User Credentials File")
|
||||
var showHelp = flag.Bool("h", false, "Show help message")
|
||||
|
||||
log.SetFlags(0)
|
||||
flag.Usage = usage
|
||||
flag.Parse()
|
||||
|
||||
if *showHelp {
|
||||
showUsageAndExit(0)
|
||||
}
|
||||
|
||||
args := flag.Args()
|
||||
if len(args) != 2 {
|
||||
showUsageAndExit(1)
|
||||
}
|
||||
|
||||
fmt.Printf("Hello Nats\n")
|
||||
|
||||
// Connect Options.
|
||||
opts := []nats.Option{nats.Name("NATS Sample Publisher")}
|
||||
|
||||
// Use UserCredentials
|
||||
if *userCreds != "" {
|
||||
opts = append(opts, nats.UserCredentials(*userCreds))
|
||||
}
|
||||
|
||||
// Connect to NATS
|
||||
nc, err := nats.Connect(*urls, opts...)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer nc.Close()
|
||||
subj, msg := args[0], []byte(args[1])
|
||||
|
||||
nc.Publish(subj, msg)
|
||||
nc.Flush()
|
||||
|
||||
if err := nc.LastError(); err != nil {
|
||||
log.Fatal(err)
|
||||
} else {
|
||||
log.Printf("Published [%s] : '%s'\n", subj, msg)
|
||||
}
|
||||
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
func injectPayload() {
|
||||
// Read the JSON file
|
||||
jobsData, err := os.ReadFile("slurm_0038.json")
|
||||
dbData, err := os.ReadFile("slurmdb_0038-large.json")
|
||||
|
||||
if err != nil {
|
||||
fmt.Println("Error reading JSON file:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Create an HTTP handler function
|
||||
http.HandleFunc("/slurm/v0.0.38/jobs", func(w http.ResponseWriter, r *http.Request) {
|
||||
// Set the response content type to JSON
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
// Write the raw JSON data to the response writer
|
||||
_, err := w.Write(jobsData)
|
||||
if err != nil {
|
||||
http.Error(w, "Error writing jobsData payload", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
http.HandleFunc("/slurmdb/v0.0.38/jobs", func(w http.ResponseWriter, r *http.Request) {
|
||||
// Set the response content type to JSON
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
// Write the raw JSON data to the response writer
|
||||
_, err := w.Write(dbData)
|
||||
if err != nil {
|
||||
http.Error(w, "Error writing dbData payload", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
// Start the HTTP server on port 8080
|
||||
fmt.Println("Listening on :8080...")
|
||||
go http.ListenAndServe(":8080", nil)
|
||||
}
|
||||
|
||||
func loadSlurmNatsScheduler() {
|
||||
cfgData := []byte(`{"target": "localhost"}`)
|
||||
|
||||
var sch scheduler.SlurmNatsScheduler
|
||||
// sch.URL = "nats://127.0.0.1:1223"
|
||||
sch.Init(cfgData)
|
||||
|
||||
// go injectPayload()
|
||||
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
||||
var sch scheduler.SlurmRestSchedulerConfig
|
||||
sch.Init()
|
||||
|
||||
// injectPayload()
|
||||
|
||||
sch.Sync()
|
||||
|
||||
os.Exit(0)
|
||||
}
|
Reference in New Issue
Block a user