From af43901ca3ddd54cdbb5377682fb4abb3bcf1bbc Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Mon, 8 Sep 2025 22:54:13 +0200 Subject: [PATCH] Trial and Test MetricStore components --- .gitignore | 5 + api/schema.graphqls | 4 +- api/swagger.json | 30 +++-- api/swagger.yaml | 25 ++-- cmd/cc-backend/main.go | 6 +- configs/config-demo.json | 18 ++- configs/config.json | 18 ++- go.mod | 7 +- go.sum | 19 +++ internal/api/api_test.go | 4 +- internal/api/docs.go | 30 +++-- internal/api/rest.go | 1 - internal/avro/avroHelper.go | 2 +- internal/config/memorystore.go | 8 +- internal/graph/generated/generated.go | 97 +++++---------- internal/graph/model/models_gen.go | 2 +- internal/graph/schema.resolvers.go | 5 - internal/memorystore/checkpoint.go | 2 +- internal/memorystore/lineprotocol.go | 110 +++++++++--------- internal/memorystore/memorystore.go | 16 +-- internal/repository/job.go | 14 +-- internal/repository/jobCreate.go | 2 +- internal/repository/jobFind.go | 10 +- internal/repository/jobQuery.go | 6 +- internal/repository/stats.go | 2 +- internal/tagger/jobclasses/highload.json | 2 +- .../tagger/jobclasses/lowUtilization.json | 2 +- internal/tagger/jobclasses/lowload.json | 2 +- internal/taskManager/commitJobService.go | 4 +- internal/taskManager/taskManager.go | 5 - internal/taskManager/updateDurationService.go | 4 +- .../taskManager/updateFootprintService.go | 4 +- startDemo.sh | 37 +++++- test_ccms_write_api.sh.bak | 110 ++++++++++++++++++ 34 files changed, 394 insertions(+), 219 deletions(-) create mode 100755 test_ccms_write_api.sh.bak diff --git a/.gitignore b/.gitignore index 75cc004..963073d 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,11 @@ /var/*.db /var/*.txt +/var/checkpoints* + +migrateTimestamps.pl +test_ccms_write_api.sh + /web/frontend/public/build /web/frontend/node_modules diff --git a/api/schema.graphqls b/api/schema.graphqls index 794c630..070b5b7 100644 --- a/api/schema.graphqls +++ b/api/schema.graphqls @@ -37,7 +37,7 @@ type Job { numAcc: Int! energy: Float! SMT: Int! - exclusive: Int! + shared: String! partition: String! arrayJobId: Int! monitoringStatus: Int! @@ -419,7 +419,7 @@ input JobFilter { startTime: TimeRange state: [JobState!] metricStats: [MetricStatItem!] - exclusive: Int + shared: StringInput node: StringInput } diff --git a/api/swagger.json b/api/swagger.json index 87bf3ed..c60810a 100644 --- a/api/swagger.json +++ b/api/swagger.json @@ -1394,12 +1394,6 @@ "format": "float64" } }, - "exclusive": { - "type": "integer", - "maximum": 2, - "minimum": 0, - "example": 1 - }, "footprint": { "type": "object", "additionalProperties": { @@ -1416,12 +1410,18 @@ }, "jobState": { "enum": [ - "completed", - "failed", + "boot_fail", "cancelled", - "stopped", - "timeout", - "out_of_memory" + "completed", + "deadline", + "failed", + "node_fail", + "out-of-memory", + "pending", + "preempted", + "running", + "suspended", + "timeout" ], "allOf": [ { @@ -1477,6 +1477,14 @@ "$ref": "#/definitions/schema.Resource" } }, + "shared": { + "type": "string", + "enum": [ + "none", + "single_user", + "multi_user" + ] + }, "smt": { "type": "integer", "example": 4 diff --git a/api/swagger.yaml b/api/swagger.yaml index 06caa56..6a4adbd 100644 --- a/api/swagger.yaml +++ b/api/swagger.yaml @@ -207,11 +207,6 @@ definitions: format: float64 type: number type: object - exclusive: - example: 1 - maximum: 2 - minimum: 0 - type: integer footprint: additionalProperties: format: float64 @@ -226,12 +221,18 @@ definitions: allOf: - $ref: '#/definitions/schema.JobState' enum: - - completed - - failed + - boot_fail - cancelled - - stopped + - completed + - deadline + - failed + - node_fail + - out-of-memory + - pending + - preempted + - running + - suspended - timeout - - out_of_memory example: completed metaData: additionalProperties: @@ -269,6 +270,12 @@ definitions: items: $ref: '#/definitions/schema.Resource' type: array + shared: + enum: + - none + - single_user + - multi_user + type: string smt: example: 4 type: integer diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 9c7ad1f..0146118 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -251,13 +251,13 @@ func main() { var wg sync.WaitGroup //Metric Store starts after all flags have been processes - memorystore.Init(wg) + memorystore.Init(&wg) archiver.Start(repository.GetJobRepository()) // // Comment out - // taskManager.Start(ccconf.GetPackageConfig("cron"), - // ccconf.GetPackageConfig("archive")) + taskManager.Start(ccconf.GetPackageConfig("cron"), + ccconf.GetPackageConfig("archive")) serverInit() diff --git a/configs/config-demo.json b/configs/config-demo.json index a31d65d..3c0d858 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -4,11 +4,23 @@ "short-running-jobs-duration": 300, "resampling": { "trigger": 30, - "resolutions": [600, 300, 120, 60] + "resolutions": [ + 600, + 300, + 120, + 60 + ] }, - "apiAllowedIPs": ["*"], + "apiAllowedIPs": [ + "*" + ], "emission-constant": 317 }, + "cron": { + "commit-job-worker": "2m", + "duration-worker": "5m", + "footprint-worker": "10m" + }, "archive": { "kind": "file", "path": "./var/job-archive" @@ -73,4 +85,4 @@ }, "retention-in-memory": "48h" } -} +} \ No newline at end of file diff --git a/configs/config.json b/configs/config.json index ed7d546..505e446 100644 --- a/configs/config.json +++ b/configs/config.json @@ -6,13 +6,25 @@ "user": "clustercockpit", "group": "clustercockpit", "validate": false, - "apiAllowedIPs": ["*"], + "apiAllowedIPs": [ + "*" + ], "short-running-jobs-duration": 300, "resampling": { "trigger": 30, - "resolutions": [600, 300, 120, 60] + "resolutions": [ + 600, + 300, + 120, + 60 + ] } }, + "cron": { + "commit-job-worker": "2m", + "duration-worker": "5m", + "footprint-worker": "10m" + }, "archive": { "kind": "file", "path": "./var/job-archive" @@ -41,4 +53,4 @@ } } ] -} +} \ No newline at end of file diff --git a/go.mod b/go.mod index 5858cff..e0add97 100644 --- a/go.mod +++ b/go.mod @@ -19,9 +19,12 @@ require ( github.com/gorilla/handlers v1.5.2 github.com/gorilla/mux v1.8.1 github.com/gorilla/sessions v1.4.0 + github.com/influxdata/line-protocol/v2 v2.2.1 github.com/jmoiron/sqlx v1.4.0 github.com/joho/godotenv v1.5.1 + github.com/linkedin/goavro/v2 v2.14.0 github.com/mattn/go-sqlite3 v1.14.24 + github.com/nats-io/nats.go v1.44.0 github.com/prometheus/client_golang v1.23.0 github.com/prometheus/common v0.65.0 github.com/qustavo/sqlhooks/v2 v2.1.0 @@ -62,14 +65,16 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect - github.com/linkedin/goavro/v2 v2.14.0 // indirect github.com/mailru/easyjson v0.9.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect + github.com/nats-io/nkeys v0.4.11 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/procfs v0.16.1 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 3c51770..792ec1c 100644 --- a/go.sum +++ b/go.sum @@ -38,6 +38,7 @@ github.com/coreos/go-oidc/v3 v3.12.0 h1:sJk+8G2qq94rDI6ehZ71Bol3oUHy63qNYmkiSjrc github.com/coreos/go-oidc/v3 v3.12.0/go.mod h1:gE3LgjOgFoHi9a4ce4/tJczr0Ai2/BoDhf0r5lltWI0= github.com/cpuguy83/go-md2man/v2 v2.0.7 h1:zbFlGlXEAKlwXpmvle3d8Oe3YnkKIK4xSRTd3sHPnBo= github.com/cpuguy83/go-md2man/v2 v2.0.7/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -57,6 +58,10 @@ github.com/expr-lang/expr v1.17.5 h1:i1WrMvcdLF249nSNlpQZN1S6NXuW9WaOfF5tPi3aw3k github.com/expr-lang/expr v1.17.5/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= +github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= +github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk= +github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/go-asn1-ber/asn1-ber v1.5.7 h1:DTX+lbVTWaTw1hQ+PbZPlnDZPEIs0SS/GCZAl535dDk= @@ -94,6 +99,8 @@ github.com/golang-migrate/migrate/v4 v4.18.2/go.mod h1:2CM6tJvn2kqPXwnXO/d3rAQYi github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= @@ -130,6 +137,11 @@ github.com/influxdata/influxdb-client-go/v2 v2.14.0 h1:AjbBfJuq+QoaXNcrova8smSjw github.com/influxdata/influxdb-client-go/v2 v2.14.0/go.mod h1:Ahpm3QXKMJslpXl3IftVLVezreAUtBOTZssDrjZEFHI= github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf h1:7JTmneyiNEwVBOHSjoMxiWAqB992atOeepeFYegn5RU= github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= +github.com/influxdata/line-protocol-corpus v0.0.0-20210519164801-ca6fa5da0184/go.mod h1:03nmhxzZ7Xk2pdG+lmMd7mHDfeVOYFyhOgwO61qWU98= +github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937 h1:MHJNQ+p99hFATQm6ORoLmpUCF7ovjwEFshs/NHzAbig= +github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937/go.mod h1:BKR9c0uHSmRgM/se9JhFHtTT7JTO67X23MtKMHtZcpo= +github.com/influxdata/line-protocol/v2 v2.0.0-20210312151457-c52fdecb625a/go.mod h1:6+9Xt5Sq1rWx+glMgxhcg2c0DUaehK+5TDcPZ76GypY= +github.com/influxdata/line-protocol/v2 v2.1.0/go.mod h1:QKw43hdUBg3GTk2iC3iyCxksNj7PX9aUSeYOYE/ceHY= github.com/influxdata/line-protocol/v2 v2.2.1 h1:EAPkqJ9Km4uAxtMRgUubJyqAr6zgWM0dznKMLRauQRE= github.com/influxdata/line-protocol/v2 v2.2.1/go.mod h1:DmB3Cnh+3oxmG6LOBIxce4oaL4CPj3OmMPgvauXh+tM= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= @@ -158,8 +170,11 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw= @@ -198,6 +213,7 @@ github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro= github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -358,15 +374,18 @@ golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxb golang.org/x/tools v0.35.0 h1:mBffYraMEf7aa0sB+NuKnuCy8qI/9Bughn8dC2Gu5r0= golang.org/x/tools v0.35.0/go.mod h1:NKdj5HkL/73byiZSJjqJgKn3ep7KjFkBOkR/Hps3VPw= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= sigs.k8s.io/yaml v1.6.0 h1:G8fkbMSAFqgEFgh4b1wmtzDnioxFCUgTZhlbj5P9QYs= diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 9f47a1f..1c81fc9 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -241,7 +241,7 @@ func TestRestApi(t *testing.T) { "numNodes": 1, "numHwthreads": 8, "numAcc": 0, - "exclusive": 1, + "shared": "none", "monitoringStatus": 1, "smt": 1, "resources": [ @@ -396,7 +396,7 @@ func TestRestApi(t *testing.T) { "partition": "default", "walltime": 3600, "numNodes": 1, - "exclusive": 1, + "shared": "none", "monitoringStatus": 1, "smt": 1, "resources": [ diff --git a/internal/api/docs.go b/internal/api/docs.go index 50cab92..c10745c 100644 --- a/internal/api/docs.go +++ b/internal/api/docs.go @@ -1401,12 +1401,6 @@ const docTemplate = `{ "format": "float64" } }, - "exclusive": { - "type": "integer", - "maximum": 2, - "minimum": 0, - "example": 1 - }, "footprint": { "type": "object", "additionalProperties": { @@ -1423,12 +1417,18 @@ const docTemplate = `{ }, "jobState": { "enum": [ - "completed", - "failed", + "boot_fail", "cancelled", - "stopped", - "timeout", - "out_of_memory" + "completed", + "deadline", + "failed", + "node_fail", + "out-of-memory", + "pending", + "preempted", + "running", + "suspended", + "timeout" ], "allOf": [ { @@ -1484,6 +1484,14 @@ const docTemplate = `{ "$ref": "#/definitions/schema.Resource" } }, + "shared": { + "type": "string", + "enum": [ + "none", + "single_user", + "multi_user" + ] + }, "smt": { "type": "integer", "example": 4 diff --git a/internal/api/rest.go b/internal/api/rest.go index 8cefe48..fcadc90 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -97,7 +97,6 @@ func (api *RestApi) MountUserApiRoutes(r *mux.Router) { } func (api *RestApi) MountMetricStoreApiRoutes(r *mux.Router) { - r.StrictSlash(true) // REST API Uses TokenAuth r.HandleFunc("/api/free", memorystore.HandleFree).Methods(http.MethodPost) r.HandleFunc("/api/write", memorystore.HandleWrite).Methods(http.MethodPost) diff --git a/internal/avro/avroHelper.go b/internal/avro/avroHelper.go index ea733cd..21a5617 100644 --- a/internal/avro/avroHelper.go +++ b/internal/avro/avroHelper.go @@ -29,7 +29,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { return case val := <-LineProtocolMessages: //Fetch the frequency of the metric from the global configuration - freq, err := config.MetricStoreKeys.GetMetricFrequency(val.MetricName) + freq, err := config.GetMetricFrequency(val.MetricName) if err != nil { fmt.Printf("Error fetching metric frequency: %s\n", err) continue diff --git a/internal/config/memorystore.go b/internal/config/memorystore.go index b9273b4..c277045 100644 --- a/internal/config/memorystore.go +++ b/internal/config/memorystore.go @@ -97,10 +97,10 @@ func InitMetricStore(msConfig json.RawMessage) { } } -func (c *MetricStoreConfig) GetMetricFrequency(metricName string) (int64, error) { - // if metric, ok := c.Metrics[metricName]; ok { - // return metric.Frequency, nil - // } +func GetMetricFrequency(metricName string) (int64, error) { + if metric, ok := Metrics[metricName]; ok { + return metric.Frequency, nil + } return 0, fmt.Errorf("[METRICSTORE]> metric %s not found", metricName) } diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index 3a85858..eed946d 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -118,7 +118,6 @@ type ComplexityRoot struct { Duration func(childComplexity int) int Energy func(childComplexity int) int EnergyFootprint func(childComplexity int) int - Exclusive func(childComplexity int) int Footprint func(childComplexity int) int ID func(childComplexity int) int JobID func(childComplexity int) int @@ -131,6 +130,7 @@ type ComplexityRoot struct { Project func(childComplexity int) int Resources func(childComplexity int) int SMT func(childComplexity int) int + Shared func(childComplexity int) int StartTime func(childComplexity int) int State func(childComplexity int) int SubCluster func(childComplexity int) int @@ -425,8 +425,6 @@ type ClusterResolver interface { type JobResolver interface { StartTime(ctx context.Context, obj *schema.Job) (*time.Time, error) - Exclusive(ctx context.Context, obj *schema.Job) (int, error) - Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) @@ -726,13 +724,6 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin return e.complexity.Job.EnergyFootprint(childComplexity), true - case "Job.exclusive": - if e.complexity.Job.Exclusive == nil { - break - } - - return e.complexity.Job.Exclusive(childComplexity), true - case "Job.footprint": if e.complexity.Job.Footprint == nil { break @@ -817,6 +808,13 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin return e.complexity.Job.SMT(childComplexity), true + case "Job.shared": + if e.complexity.Job.Shared == nil { + break + } + + return e.complexity.Job.Shared(childComplexity), true + case "Job.startTime": if e.complexity.Job.StartTime == nil { break @@ -2361,7 +2359,7 @@ type Job { numAcc: Int! energy: Float! SMT: Int! - exclusive: Int! + shared: String! partition: String! arrayJobId: Int! monitoringStatus: Int! @@ -2743,7 +2741,7 @@ input JobFilter { startTime: TimeRange state: [JobState!] metricStats: [MetricStatItem!] - exclusive: Int + shared: StringInput node: StringInput } @@ -5217,8 +5215,8 @@ func (ec *executionContext) fieldContext_Job_SMT(_ context.Context, field graphq return fc, nil } -func (ec *executionContext) _Job_exclusive(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_Job_exclusive(ctx, field) +func (ec *executionContext) _Job_shared(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Job_shared(ctx, field) if err != nil { return graphql.Null } @@ -5231,7 +5229,7 @@ func (ec *executionContext) _Job_exclusive(ctx context.Context, field graphql.Co }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) { ctx = rctx // use context from middleware stack in children - return ec.resolvers.Job().Exclusive(rctx, obj) + return obj.Shared, nil }) if err != nil { ec.Error(ctx, err) @@ -5243,19 +5241,19 @@ func (ec *executionContext) _Job_exclusive(ctx context.Context, field graphql.Co } return graphql.Null } - res := resTmp.(int) + res := resTmp.(string) fc.Result = res - return ec.marshalNInt2int(ctx, field.Selections, res) + return ec.marshalNString2string(ctx, field.Selections, res) } -func (ec *executionContext) fieldContext_Job_exclusive(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { +func (ec *executionContext) fieldContext_Job_shared(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { fc = &graphql.FieldContext{ Object: "Job", Field: field, - IsMethod: true, - IsResolver: true, + IsMethod: false, + IsResolver: false, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { - return nil, errors.New("field of type Int does not have child fields") + return nil, errors.New("field of type String does not have child fields") }, } return fc, nil @@ -6404,8 +6402,8 @@ func (ec *executionContext) fieldContext_JobResultList_items(_ context.Context, return ec.fieldContext_Job_energy(ctx, field) case "SMT": return ec.fieldContext_Job_SMT(ctx, field) - case "exclusive": - return ec.fieldContext_Job_exclusive(ctx, field) + case "shared": + return ec.fieldContext_Job_shared(ctx, field) case "partition": return ec.fieldContext_Job_partition(ctx, field) case "arrayJobId": @@ -11042,8 +11040,8 @@ func (ec *executionContext) fieldContext_Query_job(ctx context.Context, field gr return ec.fieldContext_Job_energy(ctx, field) case "SMT": return ec.fieldContext_Job_SMT(ctx, field) - case "exclusive": - return ec.fieldContext_Job_exclusive(ctx, field) + case "shared": + return ec.fieldContext_Job_shared(ctx, field) case "partition": return ec.fieldContext_Job_partition(ctx, field) case "arrayJobId": @@ -16357,7 +16355,7 @@ func (ec *executionContext) unmarshalInputJobFilter(ctx context.Context, obj any asMap[k] = v } - fieldsInOrder := [...]string{"tags", "dbId", "jobId", "arrayJobId", "user", "project", "jobName", "cluster", "partition", "duration", "energy", "minRunningFor", "numNodes", "numAccelerators", "numHWThreads", "startTime", "state", "metricStats", "exclusive", "node"} + fieldsInOrder := [...]string{"tags", "dbId", "jobId", "arrayJobId", "user", "project", "jobName", "cluster", "partition", "duration", "energy", "minRunningFor", "numNodes", "numAccelerators", "numHWThreads", "startTime", "state", "metricStats", "shared", "node"} for _, k := range fieldsInOrder { v, ok := asMap[k] if !ok { @@ -16490,13 +16488,13 @@ func (ec *executionContext) unmarshalInputJobFilter(ctx context.Context, obj any return it, err } it.MetricStats = data - case "exclusive": - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("exclusive")) - data, err := ec.unmarshalOInt2ᚖint(ctx, v) + case "shared": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("shared")) + data, err := ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v) if err != nil { return it, err } - it.Exclusive = data + it.Shared = data case "node": ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("node")) data, err := ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v) @@ -17397,42 +17395,11 @@ func (ec *executionContext) _Job(ctx context.Context, sel ast.SelectionSet, obj if out.Values[i] == graphql.Null { atomic.AddUint32(&out.Invalids, 1) } - case "exclusive": - field := field - - innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - } - }() - res = ec._Job_exclusive(ctx, field, obj) - if res == graphql.Null { - atomic.AddUint32(&fs.Invalids, 1) - } - return res + case "shared": + out.Values[i] = ec._Job_shared(ctx, field, obj) + if out.Values[i] == graphql.Null { + atomic.AddUint32(&out.Invalids, 1) } - - if field.Deferrable != nil { - dfs, ok := deferred[field.Deferrable.Label] - di := 0 - if ok { - dfs.AddField(field) - di = len(dfs.Values) - 1 - } else { - dfs = graphql.NewFieldSet([]graphql.CollectedField{field}) - deferred[field.Deferrable.Label] = dfs - } - dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler { - return innerFunc(ctx, dfs) - }) - - // don't run the out.Concurrently() call below - out.Values[i] = graphql.Null - continue - } - - out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) case "partition": out.Values[i] = ec._Job_partition(ctx, field, obj) if out.Values[i] == graphql.Null { diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index e9abf0d..accc344 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -69,7 +69,7 @@ type JobFilter struct { StartTime *config.TimeRange `json:"startTime,omitempty"` State []schema.JobState `json:"state,omitempty"` MetricStats []*MetricStatItem `json:"metricStats,omitempty"` - Exclusive *int `json:"exclusive,omitempty"` + Shared *StringInput `json:"shared,omitempty"` Node *StringInput `json:"node,omitempty"` } diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 8868497..315f1a3 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -35,11 +35,6 @@ func (r *jobResolver) StartTime(ctx context.Context, obj *schema.Job) (*time.Tim return ×tamp, nil } -// Exclusive is the resolver for the exclusive field. -func (r *jobResolver) Exclusive(ctx context.Context, obj *schema.Job) (int, error) { - panic(fmt.Errorf("not implemented: Exclusive - exclusive")) -} - // Tags is the resolver for the tags field. func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) { return r.Repo.GetTags(repository.GetUserFromContext(ctx), obj.ID) diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go index 76a5472..adee443 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/memorystore/checkpoint.go @@ -380,7 +380,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { if err != nil { log.Fatalf("[METRICSTORE]> Error creating directory: %#v\n", err) } - fmt.Printf("[METRICSTORE]> %#v Directory created successfully.\n", dir) + log.Printf("[METRICSTORE]> %#v Directory created successfully.\n", dir) } // Config read (replace with your actual config read) diff --git a/internal/memorystore/lineprotocol.go b/internal/memorystore/lineprotocol.go index e12b9e2..495197d 100644 --- a/internal/memorystore/lineprotocol.go +++ b/internal/memorystore/lineprotocol.go @@ -2,10 +2,8 @@ package memorystore import ( "context" - "errors" "fmt" "log" - "net" "sync" "time" @@ -17,67 +15,67 @@ import ( ) // Each connection is handled in it's own goroutine. This is a blocking function. -func ReceiveRaw(ctx context.Context, - listener net.Listener, - handleLine func(*lineprotocol.Decoder, string) error, -) error { - var wg sync.WaitGroup +// func ReceiveRaw(ctx context.Context, +// listener net.Listener, +// handleLine func(*lineprotocol.Decoder, string) error, +// ) error { +// var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - <-ctx.Done() - if err := listener.Close(); err != nil { - log.Printf("listener.Close(): %s", err.Error()) - } - }() +// wg.Add(1) +// go func() { +// defer wg.Done() +// <-ctx.Done() +// if err := listener.Close(); err != nil { +// log.Printf("listener.Close(): %s", err.Error()) +// } +// }() - for { - conn, err := listener.Accept() - if err != nil { - if errors.Is(err, net.ErrClosed) { - break - } +// for { +// conn, err := listener.Accept() +// if err != nil { +// if errors.Is(err, net.ErrClosed) { +// break +// } - log.Printf("listener.Accept(): %s", err.Error()) - } +// log.Printf("listener.Accept(): %s", err.Error()) +// } - wg.Add(2) - go func() { - defer wg.Done() - defer conn.Close() +// wg.Add(2) +// go func() { +// defer wg.Done() +// defer conn.Close() - dec := lineprotocol.NewDecoder(conn) - connctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go func() { - defer wg.Done() - select { - case <-connctx.Done(): - conn.Close() - case <-ctx.Done(): - conn.Close() - } - }() +// dec := lineprotocol.NewDecoder(conn) +// connctx, cancel := context.WithCancel(context.Background()) +// defer cancel() +// go func() { +// defer wg.Done() +// select { +// case <-connctx.Done(): +// conn.Close() +// case <-ctx.Done(): +// conn.Close() +// } +// }() - if err := handleLine(dec, "default"); err != nil { - if errors.Is(err, net.ErrClosed) { - return - } +// if err := handleLine(dec, "default"); err != nil { +// if errors.Is(err, net.ErrClosed) { +// return +// } - log.Printf("%s: %s", conn.RemoteAddr().String(), err.Error()) - errmsg := make([]byte, 128) - errmsg = append(errmsg, `error: `...) - errmsg = append(errmsg, err.Error()...) - errmsg = append(errmsg, '\n') - conn.Write(errmsg) - } - }() - } +// log.Printf("%s: %s", conn.RemoteAddr().String(), err.Error()) +// errmsg := make([]byte, 128) +// errmsg = append(errmsg, `error: `...) +// errmsg = append(errmsg, err.Error()...) +// errmsg = append(errmsg, '\n') +// conn.Write(errmsg) +// } +// }() +// } - wg.Wait() - return nil -} +// wg.Wait() +// return nil +// } // Connect to a nats server and subscribe to "updates". This is a blocking // function. handleLine will be called for each line recieved via nats. @@ -113,7 +111,7 @@ func ReceiveNats(conf *(config.NatsConfig), if workers > 1 { wg.Add(workers) - for i := 0; i < workers; i++ { + for range workers { go func() { for m := range msgs { dec := lineprotocol.NewDecoderWithBytes(m.Data) diff --git a/internal/memorystore/memorystore.go b/internal/memorystore/memorystore.go index efa4065..4a631c2 100644 --- a/internal/memorystore/memorystore.go +++ b/internal/memorystore/memorystore.go @@ -47,7 +47,7 @@ type MemoryStore struct { root Level } -func Init(wg sync.WaitGroup) { +func Init(wg *sync.WaitGroup) { startupTime := time.Now() //Pass the config.MetricStoreKeys @@ -82,10 +82,10 @@ func Init(wg sync.WaitGroup) { wg.Add(4) - Retention(&wg, ctx) - Checkpointing(&wg, ctx) - Archiving(&wg, ctx) - avro.DataStaging(&wg, ctx) + Retention(wg, ctx) + Checkpointing(wg, ctx) + Archiving(wg, ctx) + avro.DataStaging(wg, ctx) wg.Add(1) sigs := make(chan os.Signal, 1) @@ -337,12 +337,12 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric // the range asked for if no data was available. func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, resolution int64) ([]schema.Float, int64, int64, int64, error) { if from > to { - return nil, 0, 0, 0, errors.New("[METRICSTORE]> invalid time range") + return nil, 0, 0, 0, errors.New("[METRICSTORE]> invalid time range\n") } minfo, ok := m.Metrics[metric] if !ok { - return nil, 0, 0, 0, errors.New("[METRICSTORE]> unkown metric: " + metric) + return nil, 0, 0, 0, errors.New("[METRICSTORE]> unkown metric: \n" + metric) } n, data := 0, make([]schema.Float, (to-from)/minfo.Frequency+1) @@ -381,7 +381,7 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso if err != nil { return nil, 0, 0, 0, err } else if n == 0 { - return nil, 0, 0, 0, errors.New("[METRICSTORE]> metric or host not found") + return nil, 0, 0, 0, errors.New("[METRICSTORE]> metric or host not found\n") } else if n > 1 { if minfo.Aggregation == config.AvgAggregation { normalize := 1. / schema.Float(n) diff --git a/internal/repository/job.go b/internal/repository/job.go index dd40ebc..68778e1 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -52,18 +52,18 @@ func GetJobRepository() *JobRepository { } var jobColumns []string = []string{ - "job.id", "job.job_id", "job.hpc_user", "job.project", "job.cluster", "job.subcluster", + "job.id", "job.job_id", "job.hpc_user", "job.project", "job.hpc_cluster", "job.subcluster", "job.start_time", "job.cluster_partition", "job.array_job_id", "job.num_nodes", - "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", + "job.num_hwthreads", "job.num_acc", "job.shared", "job.monitoring_status", "job.smt", "job.job_state", "job.duration", "job.walltime", "job.resources", "job.footprint", "job.energy", } var jobCacheColumns []string = []string{ - "job_cache.id", "job_cache.job_id", "job_cache.hpc_user", "job_cache.project", "job_cache.cluster", + "job_cache.id", "job_cache.job_id", "job_cache.hpc_user", "job_cache.project", "job_cache.hpc_cluster", "job_cache.subcluster", "job_cache.start_time", "job_cache.cluster_partition", "job_cache.array_job_id", "job_cache.num_nodes", "job_cache.num_hwthreads", - "job_cache.num_acc", "job_cache.exclusive", "job_cache.monitoring_status", "job_cache.smt", + "job_cache.num_acc", "job_cache.shared", "job_cache.monitoring_status", "job_cache.smt", "job_cache.job_state", "job_cache.duration", "job_cache.walltime", "job_cache.resources", "job_cache.footprint", "job_cache.energy", } @@ -390,7 +390,7 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) { start := time.Now() partitions := r.cache.Get("partitions:"+cluster, func() (any, time.Duration, int) { parts := []string{} - if err = r.DB.Select(&parts, `SELECT DISTINCT job.cluster_partition FROM job WHERE job.cluster = ?;`, cluster); err != nil { + if err = r.DB.Select(&parts, `SELECT DISTINCT job.cluster_partition FROM job WHERE job.hpc_cluster = ?;`, cluster); err != nil { return nil, 0, 1000 } @@ -410,7 +410,7 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in subclusters := make(map[string]map[string]int) rows, err := sq.Select("resources", "subcluster").From("job"). Where("job.job_state = 'running'"). - Where("job.cluster = ?", cluster). + Where("job.hpc_cluster = ?", cluster). RunWith(r.stmtCache).Query() if err != nil { cclog.Error("Error while running query") @@ -505,7 +505,7 @@ func (r *JobRepository) FindJobIdsByTag(tagId int64) ([]int64, error) { // FIXME: Reconsider filtering short jobs with harcoded threshold func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) { query := sq.Select(jobColumns...).From("job"). - Where(fmt.Sprintf("job.cluster = '%s'", cluster)). + Where(fmt.Sprintf("job.hpc_cluster = '%s'", cluster)). Where("job.job_state = 'running'"). Where("job.duration > 600") diff --git a/internal/repository/jobCreate.go b/internal/repository/jobCreate.go index 666313f..f43be58 100644 --- a/internal/repository/jobCreate.go +++ b/internal/repository/jobCreate.go @@ -70,7 +70,7 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) { } _, err = r.DB.Exec( - "INSERT INTO job (job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache") + "INSERT INTO job (job_id, hpc_cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data) SELECT job_id, hpc_cluster, subcluster, start_time, hpc_user, project, cluster_partition, array_job_id, num_nodes, num_hwthreads, num_acc, shared, monitoring_status, smt, job_state, duration, walltime, footprint, energy, energy_footprint, resources, meta_data FROM job_cache") if err != nil { cclog.Warnf("Error while Job sync: %v", err) return nil, err diff --git a/internal/repository/jobFind.go b/internal/repository/jobFind.go index 39519d5..3abce8c 100644 --- a/internal/repository/jobFind.go +++ b/internal/repository/jobFind.go @@ -31,7 +31,7 @@ func (r *JobRepository) Find( Where("job.job_id = ?", *jobId) if cluster != nil { - q = q.Where("job.cluster = ?", *cluster) + q = q.Where("job.hpc_cluster = ?", *cluster) } if startTime != nil { q = q.Where("job.start_time = ?", *startTime) @@ -52,7 +52,7 @@ func (r *JobRepository) FindCached( Where("job_cache.job_id = ?", *jobId) if cluster != nil { - q = q.Where("job_cache.cluster = ?", *cluster) + q = q.Where("job_cache.hpc_cluster = ?", *cluster) } if startTime != nil { q = q.Where("job_cache.start_time = ?", *startTime) @@ -78,7 +78,7 @@ func (r *JobRepository) FindAll( Where("job.job_id = ?", *jobId) if cluster != nil { - q = q.Where("job.cluster = ?", *cluster) + q = q.Where("job.hpc_cluster = ?", *cluster) } if startTime != nil { q = q.Where("job.start_time = ?", *startTime) @@ -183,7 +183,7 @@ func (r *JobRepository) FindByJobId(ctx context.Context, jobId int64, startTime q := sq.Select(jobColumns...). From("job"). Where("job.job_id = ?", jobId). - Where("job.cluster = ?", cluster). + Where("job.hpc_cluster = ?", cluster). Where("job.start_time = ?", startTime) q, qerr := SecurityCheck(ctx, q) @@ -203,7 +203,7 @@ func (r *JobRepository) IsJobOwner(jobId int64, startTime int64, user string, cl From("job"). Where("job.job_id = ?", jobId). Where("job.hpc_user = ?", user). - Where("job.cluster = ?", cluster). + Where("job.hpc_cluster = ?", cluster). Where("job.start_time = ?", startTime) _, err := scanJob(q.RunWith(r.stmtCache).QueryRow()) diff --git a/internal/repository/jobQuery.go b/internal/repository/jobQuery.go index fdcc904..19cdd9a 100644 --- a/internal/repository/jobQuery.go +++ b/internal/repository/jobQuery.go @@ -168,7 +168,7 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select query = buildMetaJsonCondition("jobName", filter.JobName, query) } if filter.Cluster != nil { - query = buildStringCondition("job.cluster", filter.Cluster, query) + query = buildStringCondition("job.hpc_cluster", filter.Cluster, query) } if filter.Partition != nil { query = buildStringCondition("job.cluster_partition", filter.Partition, query) @@ -183,8 +183,8 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select now := time.Now().Unix() // There does not seam to be a portable way to get the current unix timestamp accross different DBs. query = query.Where("(job.job_state != 'running' OR (? - job.start_time) > ?)", now, *filter.MinRunningFor) } - if filter.Exclusive != nil { - query = query.Where("job.exclusive = ?", *filter.Exclusive) + if filter.Shared != nil { + query = query.Where("job.shared = ?", *filter.Shared) } if filter.State != nil { states := make([]string, len(filter.State)) diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 7beb674..25c862f 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -23,7 +23,7 @@ import ( var groupBy2column = map[model.Aggregate]string{ model.AggregateUser: "job.hpc_user", model.AggregateProject: "job.project", - model.AggregateCluster: "job.cluster", + model.AggregateCluster: "job.hpc_cluster", } var sortBy2column = map[model.SortByAggregate]string{ diff --git a/internal/tagger/jobclasses/highload.json b/internal/tagger/jobclasses/highload.json index 0d16b45..9667011 100644 --- a/internal/tagger/jobclasses/highload.json +++ b/internal/tagger/jobclasses/highload.json @@ -8,7 +8,7 @@ ], "metrics": ["cpu_load"], "requirements": [ - "job.exclusive == 1", + "job.shared == \"none\"", "job.duration > job_min_duration_seconds" ], "variables": [ diff --git a/internal/tagger/jobclasses/lowUtilization.json b/internal/tagger/jobclasses/lowUtilization.json index 9613b06..e84b81d 100644 --- a/internal/tagger/jobclasses/lowUtilization.json +++ b/internal/tagger/jobclasses/lowUtilization.json @@ -4,7 +4,7 @@ "parameters": ["job_min_duration_seconds"], "metrics": ["flops_any", "mem_bw"], "requirements": [ - "job.exclusive == 1", + "job.shared == \"none\"", "job.duration > job_min_duration_seconds" ], "variables": [ diff --git a/internal/tagger/jobclasses/lowload.json b/internal/tagger/jobclasses/lowload.json index 2212bd1..f952da5 100644 --- a/internal/tagger/jobclasses/lowload.json +++ b/internal/tagger/jobclasses/lowload.json @@ -8,7 +8,7 @@ ], "metrics": ["cpu_load"], "requirements": [ - "job.exclusive == 1", + "job.shared == \"none\"", "job.duration > job_min_duration_seconds" ], "variables": [ diff --git a/internal/taskManager/commitJobService.go b/internal/taskManager/commitJobService.go index e7c169a..88c2708 100644 --- a/internal/taskManager/commitJobService.go +++ b/internal/taskManager/commitJobService.go @@ -26,9 +26,9 @@ func RegisterCommitJobService() { gocron.NewTask( func() { start := time.Now() - cclog.Printf("Jobcache sync started at %s", start.Format(time.RFC3339)) + cclog.Printf("Jobcache sync started at %s\n", start.Format(time.RFC3339)) jobs, _ := jobRepo.SyncJobs() repository.CallJobStartHooks(jobs) - cclog.Printf("Jobcache sync and job callbacks are done and took %s", time.Since(start)) + cclog.Printf("Jobcache sync and job callbacks are done and took %s\n", time.Since(start)) })) } diff --git a/internal/taskManager/taskManager.go b/internal/taskManager/taskManager.go index df6c4d0..35d6ea5 100644 --- a/internal/taskManager/taskManager.go +++ b/internal/taskManager/taskManager.go @@ -7,7 +7,6 @@ package taskManager import ( "bytes" "encoding/json" - "fmt" "time" "github.com/ClusterCockpit/cc-backend/internal/auth" @@ -66,10 +65,6 @@ func Start(cronCfg, archiveConfig json.RawMessage) { RegisterStopJobsExceedTime() } - fmt.Printf("Keys : %#v\n", Keys) - fmt.Printf("cronCfg : %#v\n", cronCfg) - fmt.Printf("archiveConfig : %#v\n", archiveConfig) - dec := json.NewDecoder(bytes.NewReader(cronCfg)) dec.DisallowUnknownFields() if err := dec.Decode(&Keys); err != nil { diff --git a/internal/taskManager/updateDurationService.go b/internal/taskManager/updateDurationService.go index d650afb..53882f0 100644 --- a/internal/taskManager/updateDurationService.go +++ b/internal/taskManager/updateDurationService.go @@ -25,8 +25,8 @@ func RegisterUpdateDurationWorker() { gocron.NewTask( func() { start := time.Now() - cclog.Printf("Update duration started at %s", start.Format(time.RFC3339)) + cclog.Printf("Update duration started at %s\n", start.Format(time.RFC3339)) jobRepo.UpdateDuration() - cclog.Printf("Update duration is done and took %s", time.Since(start)) + cclog.Printf("Update duration is done and took %s\n", time.Since(start)) })) } diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 4025849..2ce9901 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -134,8 +134,8 @@ func RegisterFootprintWorker() { } jobRepo.TransactionEnd(t) } - cclog.Debugf("Finish Cluster %s, took %s", cluster.Name, time.Since(s_cluster)) + cclog.Debugf("Finish Cluster %s, took %s\n", cluster.Name, time.Since(s_cluster)) } - cclog.Printf("Updating %d (of %d; Skipped %d) Footprints is done and took %s", c, cl, ce, time.Since(s)) + cclog.Printf("Updating %d (of %d; Skipped %d) Footprints is done and took %s\n", c, cl, ce, time.Since(s)) })) } diff --git a/startDemo.sh b/startDemo.sh index faf6d35..b027bf5 100755 --- a/startDemo.sh +++ b/startDemo.sh @@ -12,6 +12,41 @@ else cp ./configs/env-template.txt .env cp ./configs/config-demo.json config.json + # mkdir -p ./var/checkpoints + # cp -rf ~/cc-metric-store/var/checkpoints ~/cc-backend/var + ./cc-backend -migrate-db - ./cc-backend -server -dev -init-db -add-user demo:admin:demo + ./cc-backend -dev -init-db -add-user demo:admin,api:demo + + # --- begin: generate JWT for demo and update test_ccms_write_api.sh --- + CC_BIN="./cc-backend" + TEST_FILE="./test_ccms_write_api.sh" + BACKUP_FILE="${TEST_FILE}.bak" + + if [ -x "$CC_BIN" ]; then + echo "Generating JWT for user 'demo'..." + output="$($CC_BIN -jwt demo 2>&1 || true)" + token="$(printf '%s\n' "$output" | grep -oE '[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+' | head -n1 || true)" + + if [ -z "$token" ]; then + echo "Warning: could not extract JWT from output:" >&2 + printf '%s\n' "$output" >&2 + else + if [ -f "$TEST_FILE" ]; then + cp -a "$TEST_FILE" "$BACKUP_FILE" + # replace first line with JWT="..." + sed -i "1s#.*#JWT=\"$token\"#" "$TEST_FILE" + echo "Updated JWT in $TEST_FILE (backup at $BACKUP_FILE)" + else + echo "Warning: $TEST_FILE not found; JWT not written." + fi + fi + else + echo "Warning: $CC_BIN not found or not executable; skipping JWT generation." + fi + # --- end: generate JWT for demo and update test_ccms_write_api.sh --- + + + ./cc-backend -server -dev + fi diff --git a/test_ccms_write_api.sh.bak b/test_ccms_write_api.sh.bak new file mode 100755 index 0000000..f76322f --- /dev/null +++ b/test_ccms_write_api.sh.bak @@ -0,0 +1,110 @@ +JWT="eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NjQ1NjMzOTUsImlhdCI6MTc1NzM2MzM5NSwicm9sZXMiOlsiYWRtaW4iLCJhcGkiXSwic3ViIjoiZGVtbyJ9.uhtEbS-ty4xNc8GWTKjyh1b06j6b3vtEw7lzQy0Eht5LtISZwRfyRBfdKjbm_t25xGrNH9sxINq4qiYKBjAaDQ" + +# curl -X 'POST' 'http://localhost:8080/metricstore/api/write/?cluster=alex' -H "Authorization: Bearer $JWT" -d $'cpu_load,cluster=alex,hostname=a042,type=hwthread,type-id=0 value=35.0 1725827464642231296' + +rm sample_fritz.txt +rm sample_alex.txt + +while [ true ]; do + echo "Alex Metrics for hwthread types and type-ids" + timestamp="$(date '+%s')" + echo "Timestamp : "+$timestamp + for metric in cpu_load cpu_user flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do + for hostname in a0603 a0903 a0832 a0329 a0702 a0122 a1624 a0731 a0224 a0704 a0631 a0225 a0222 a0427 a0603 a0429 a0833 a0705 a0901 a0601 a0227 a0804 a0322 a0226 a0126 a0129 a0605 a0801 a0934 a1622 a0902 a0428 a0537 a1623 a1722 a0228 a0701 a0326 a0327 a0123 a0321 a1621 a0323 a0124 a0534 a0931 a0324 a0933 a0424 a0905 a0128 a0532 a0805 a0521 a0535 a0932 a0127 a0325 a0633 a0831 a0803 a0426 a0425 a0229 a1721 a0602 a0632 a0223 a0422 a0423 a0536 a0328 a0703 anvme7 a0125 a0221 a0604 a0802 a0522 a0531 a0533 a0904; do + for id in {0..127}; do + echo "$metric,cluster=alex,hostname=$hostname,type=hwthread,type-id=$id value=$((1 + RANDOM % 100)).0 $timestamp" >>sample_alex.txt + done + done + done + + curl -X 'POST' 'http://localhost:8080/metricstore/api/write/?cluster=alex' -H "Authorization: Bearer $JWT" --data-binary @sample_alex.txt + + echo "Fritz Metrics for hwthread types and type-ids" + for metric in cpu_load cpu_user flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do + for hostname in f0201 f0202 f0203 f0204 f0205 f0206 f0207 f0208 f0209 f0210 f0211 f0212 f0213 f0214 f0215 f0217 f0218 f0219 f0220 f0221 f0222 f0223 f0224 f0225 f0226 f0227 f0228 f0229 f0230 f0231 f0232 f0233 f0234 f0235 f0236 f0237 f0238 f0239 f0240 f0241 f0242 f0243 f0244 f0245 f0246 f0247 f0248 f0249 f0250 f0251 f0252 f0253 f0254 f0255 f0256 f0257 f0258 f0259 f0260 f0261 f0262 f0263 f0264 f0378; do + for id in {0..71}; do + echo "$metric,cluster=fritz,hostname=$hostname,type=hwthread,type-id=$id value=$((1 + RANDOM % 100)).0 $timestamp" >>sample_fritz.txt + done + done + done + + curl -X 'POST' 'http://localhost:8080/metricstore/api/write/?cluster=fritz' -H "Authorization: Bearer $JWT" --data-binary @sample_fritz.txt + + rm sample_fritz.txt + rm sample_alex.txt + + echo "Alex Metrics for accelerator types and type-ids" + for metric in cpu_load cpu_user flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do + for hostname in a0603 a0903 a0832 a0329 a0702 a0122 a1624 a0731 a0224 a0704 a0631 a0225 a0222 a0427 a0603 a0429 a0833 a0705 a0901 a0601 a0227 a0804 a0322 a0226 a0126 a0129 a0605 a0801 a0934 a1622 a0902 a0428 a0537 a1623 a1722 a0228 a0701 a0326 a0327 a0123 a0321 a1621 a0323 a0124 a0534 a0931 a0324 a0933 a0424 a0905 a0128 a0532 a0805 a0521 a0535 a0932 a0127 a0325 a0633 a0831 a0803 a0426 a0425 a0229 a1721 a0602 a0632 a0223 a0422 a0423 a0536 a0328 a0703 anvme7 a0125 a0221 a0604 a0802 a0522 a0531 a0533 a0904; do + for id in 00000000:49:00.0 00000000:0E:00.0 00000000:D1:00.0 00000000:90:00.0 00000000:13:00.0 00000000:96:00.0 00000000:CC:00.0 00000000:4F:00.0; do + echo "$metric,cluster=alex,hostname=$hostname,type=accelerator,type-id=$id value=$((1 + RANDOM % 100)).0 $timestamp" >>sample_alex.txt + done + done + done + + curl -X 'POST' 'http://localhost:8080/metricstore/api/write/?cluster=alex' -H "Authorization: Bearer $JWT" --data-binary @sample_alex.txt + + rm sample_alex.txt + + echo "Alex Metrics for memoryDomain types and type-ids" + for metric in cpu_load cpu_user flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do + for hostname in a0603 a0903 a0832 a0329 a0702 a0122 a1624 a0731 a0224 a0704 a0631 a0225 a0222 a0427 a0603 a0429 a0833 a0705 a0901 a0601 a0227 a0804 a0322 a0226 a0126 a0129 a0605 a0801 a0934 a1622 a0902 a0428 a0537 a1623 a1722 a0228 a0701 a0326 a0327 a0123 a0321 a1621 a0323 a0124 a0534 a0931 a0324 a0933 a0424 a0905 a0128 a0532 a0805 a0521 a0535 a0932 a0127 a0325 a0633 a0831 a0803 a0426 a0425 a0229 a1721 a0602 a0632 a0223 a0422 a0423 a0536 a0328 a0703 anvme7 a0125 a0221 a0604 a0802 a0522 a0531 a0533 a0904; do + for id in {0..7}; do + echo "$metric,cluster=alex,hostname=$hostname,type=memoryDomain,type-id=$id value=$((1 + RANDOM % 100)).0 $timestamp" >>sample_alex.txt + done + done + done + + curl -X 'POST' 'http://localhost:8080/metricstore/api/write/?cluster=alex' -H "Authorization: Bearer $JWT" --data-binary @sample_alex.txt + + rm sample_alex.txt + + echo "Alex Metrics for socket types and type-ids" + for metric in cpu_load cpu_user flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do + for hostname in a0603 a0903 a0832 a0329 a0702 a0122 a1624 a0731 a0224 a0704 a0631 a0225 a0222 a0427 a0603 a0429 a0833 a0705 a0901 a0601 a0227 a0804 a0322 a0226 a0126 a0129 a0605 a0801 a0934 a1622 a0902 a0428 a0537 a1623 a1722 a0228 a0701 a0326 a0327 a0123 a0321 a1621 a0323 a0124 a0534 a0931 a0324 a0933 a0424 a0905 a0128 a0532 a0805 a0521 a0535 a0932 a0127 a0325 a0633 a0831 a0803 a0426 a0425 a0229 a1721 a0602 a0632 a0223 a0422 a0423 a0536 a0328 a0703 anvme7 a0125 a0221 a0604 a0802 a0522 a0531 a0533 a0904; do + for id in {0..1}; do + echo "$metric,cluster=alex,hostname=$hostname,type=socket,type-id=$id value=$((1 + RANDOM % 100)).0 $timestamp" >>sample_alex.txt + done + done + done + + curl -X 'POST' 'http://localhost:8080/metricstore/api/write/?cluster=alex' -H "Authorization: Bearer $JWT" --data-binary @sample_alex.txt + + echo "Fritz Metrics for socket types and type-ids" + for metric in cpu_load cpu_user flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do + for hostname in f0201 f0202 f0203 f0204 f0205 f0206 f0207 f0208 f0209 f0210 f0211 f0212 f0213 f0214 f0215 f0217 f0218 f0219 f0220 f0221 f0222 f0223 f0224 f0225 f0226 f0227 f0228 f0229 f0230 f0231 f0232 f0233 f0234 f0235 f0236 f0237 f0238 f0239 f0240 f0241 f0242 f0243 f0244 f0245 f0246 f0247 f0248 f0249 f0250 f0251 f0252 f0253 f0254 f0255 f0256 f0257 f0258 f0259 f0260 f0261 f0262 f0263 f0264 f0378; do + for id in {0..1}; do + echo "$metric,cluster=fritz,hostname=$hostname,type=socket,type-id=$id value=$((1 + RANDOM % 100)).0 $timestamp" >>sample_fritz.txt + done + done + done + + curl -X 'POST' 'http://localhost:8080/metricstore/api/write/?cluster=fritz' -H "Authorization: Bearer $JWT" --data-binary @sample_fritz.txt + + rm sample_fritz.txt + rm sample_alex.txt + + echo "Alex Metrics for nodes" + for metric in cpu_irq cpu_load mem_cached net_bytes_in cpu_user cpu_idle nfs4_read mem_used nfs4_write nfs4_total ib_xmit ib_xmit_pkts net_bytes_out cpu_iowait ib_recv cpu_system ib_recv_pkts; do + for hostname in a0603 a0903 a0832 a0329 a0702 a0122 a1624 a0731 a0224 a0704 a0631 a0225 a0222 a0427 a0603 a0429 a0833 a0705 a0901 a0601 a0227 a0804 a0322 a0226 a0126 a0129 a0605 a0801 a0934 a1622 a0902 a0428 a0537 a1623 a1722 a0228 a0701 a0326 a0327 a0123 a0321 a1621 a0323 a0124 a0534 a0931 a0324 a0933 a0424 a0905 a0128 a0532 a0805 a0521 a0535 a0932 a0127 a0325 a0633 a0831 a0803 a0426 a0425 a0229 a1721 a0602 a0632 a0223 a0422 a0423 a0536 a0328 a0703 anvme7 a0125 a0221 a0604 a0802 a0522 a0531 a0533 a0904; do + echo "$metric,cluster=alex,hostname=$hostname,type=node value=$((1 + RANDOM % 100)).0 $timestamp" >>sample_alex.txt + done + done + + curl -X 'POST' 'http://localhost:8080/metricstore/api/write/?cluster=alex' -H "Authorization: Bearer $JWT" --data-binary @sample_alex.txt + + echo "Fritz Metrics for nodes" + for metric in cpu_irq cpu_load mem_cached net_bytes_in cpu_user cpu_idle nfs4_read mem_used nfs4_write nfs4_total ib_xmit ib_xmit_pkts net_bytes_out cpu_iowait ib_recv cpu_system ib_recv_pkts; do + for hostname in f0201 f0202 f0203 f0204 f0205 f0206 f0207 f0208 f0209 f0210 f0211 f0212 f0213 f0214 f0215 f0217 f0218 f0219 f0220 f0221 f0222 f0223 f0224 f0225 f0226 f0227 f0228 f0229 f0230 f0231 f0232 f0233 f0234 f0235 f0236 f0237 f0238 f0239 f0240 f0241 f0242 f0243 f0244 f0245 f0246 f0247 f0248 f0249 f0250 f0251 f0252 f0253 f0254 f0255 f0256 f0257 f0258 f0259 f0260 f0261 f0262 f0263 f0264 f0378; do + echo "$metric,cluster=fritz,hostname=$hostname,type=node value=$((1 + RANDOM % 100)).0 $timestamp" >>sample_fritz.txt + done + done + + curl -X 'POST' 'http://localhost:8080/metricstore/api/write/?cluster=fritz' -H "Authorization: Bearer $JWT" --data-binary @sample_fritz.txt + + rm sample_fritz.txt + rm sample_alex.txt + + sleep 1m +done +# curl -X 'POST' 'http://localhost:8081/api/write/?cluster=alex' -H "Authorization: Bearer $JWT" -d $'cpu_load,cluster=alex,hostname=a042,type=hwthread,type-id=0 value=35.0 1725827464642231296' \ No newline at end of file