Merge pull request #398 from ClusterCockpit/Refactor-job-struct

Refactor job struct
This commit is contained in:
Jan Eitzinger 2025-06-02 12:13:43 +02:00 committed by GitHub
commit 5186b3f61e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 259 additions and 803 deletions

4
go.mod
View File

@ -20,7 +20,6 @@ require (
github.com/gorilla/handlers v1.5.2 github.com/gorilla/handlers v1.5.2
github.com/gorilla/mux v1.8.1 github.com/gorilla/mux v1.8.1
github.com/gorilla/sessions v1.4.0 github.com/gorilla/sessions v1.4.0
github.com/influxdata/influxdb-client-go/v2 v2.14.0
github.com/jmoiron/sqlx v1.4.0 github.com/jmoiron/sqlx v1.4.0
github.com/joho/godotenv v1.5.1 github.com/joho/godotenv v1.5.1
github.com/mattn/go-sqlite3 v1.14.24 github.com/mattn/go-sqlite3 v1.14.24
@ -42,7 +41,6 @@ require (
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect github.com/KyleBanks/depth v1.2.1 // indirect
github.com/agnivade/levenshtein v1.2.1 // indirect github.com/agnivade/levenshtein v1.2.1 // indirect
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect
@ -60,7 +58,6 @@ require (
github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf // indirect
github.com/jonboulle/clockwork v0.5.0 // indirect github.com/jonboulle/clockwork v0.5.0 // indirect
github.com/josharian/intern v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect
@ -72,7 +69,6 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/oapi-codegen/runtime v1.1.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/procfs v0.15.1 // indirect github.com/prometheus/procfs v0.15.1 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect

12
go.sum
View File

@ -16,7 +16,6 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/PuerkitoBio/goquery v1.9.3 h1:mpJr/ikUA9/GNJB/DBZcGeFDXUtosHRyRrwh7KGdTG0= github.com/PuerkitoBio/goquery v1.9.3 h1:mpJr/ikUA9/GNJB/DBZcGeFDXUtosHRyRrwh7KGdTG0=
github.com/PuerkitoBio/goquery v1.9.3/go.mod h1:1ndLHPdTz+DyQPICCWYlYQMPl0oXZj0G6D4LCYA6u4U= github.com/PuerkitoBio/goquery v1.9.3/go.mod h1:1ndLHPdTz+DyQPICCWYlYQMPl0oXZj0G6D4LCYA6u4U=
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
github.com/agnivade/levenshtein v1.2.1 h1:EHBY3UOn1gwdy/VbFwgo4cxecRznFk7fKWN1KOX7eoM= github.com/agnivade/levenshtein v1.2.1 h1:EHBY3UOn1gwdy/VbFwgo4cxecRznFk7fKWN1KOX7eoM=
github.com/agnivade/levenshtein v1.2.1/go.mod h1:QVVI16kDrtSuwcpd0p1+xMC6Z/VfhtCyDIjcwga4/DU= github.com/agnivade/levenshtein v1.2.1/go.mod h1:QVVI16kDrtSuwcpd0p1+xMC6Z/VfhtCyDIjcwga4/DU=
github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa h1:LHTHcTQiSGT7VVbI0o4wBRNQIgn917usHWOd6VAffYI= github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa h1:LHTHcTQiSGT7VVbI0o4wBRNQIgn917usHWOd6VAffYI=
@ -25,13 +24,10 @@ github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNg
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/andybalholm/cascadia v1.3.2 h1:3Xi6Dw5lHF15JtdcmAHD3i1+T8plmv7BQ/nsViSLyss= github.com/andybalholm/cascadia v1.3.2 h1:3Xi6Dw5lHF15JtdcmAHD3i1+T8plmv7BQ/nsViSLyss=
github.com/andybalholm/cascadia v1.3.2/go.mod h1:7gtRlve5FxPPgIgX36uWBX58OdBsSS6lUvCFb+h7KvU= github.com/andybalholm/cascadia v1.3.2/go.mod h1:7gtRlve5FxPPgIgX36uWBX58OdBsSS6lUvCFb+h7KvU=
github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ=
github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk=
github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig+0+Ap1h4unLjW6YQJpKZVmUzxsD4E/Q= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig+0+Ap1h4unLjW6YQJpKZVmUzxsD4E/Q=
github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coreos/go-oidc/v3 v3.12.0 h1:sJk+8G2qq94rDI6ehZ71Bol3oUHy63qNYmkiSjrc/Jo= github.com/coreos/go-oidc/v3 v3.12.0 h1:sJk+8G2qq94rDI6ehZ71Bol3oUHy63qNYmkiSjrc/Jo=
@ -123,10 +119,6 @@ github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/C
github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/influxdata/influxdb-client-go/v2 v2.14.0 h1:AjbBfJuq+QoaXNcrova8smSjwJdUHnwvfjMF71M1iI4=
github.com/influxdata/influxdb-client-go/v2 v2.14.0/go.mod h1:Ahpm3QXKMJslpXl3IftVLVezreAUtBOTZssDrjZEFHI=
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf h1:7JTmneyiNEwVBOHSjoMxiWAqB992atOeepeFYegn5RU=
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
@ -151,7 +143,6 @@ github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2E
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
@ -186,8 +177,6 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro=
github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=
@ -219,7 +208,6 @@ github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8=
github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I=
github.com/sosodev/duration v1.3.1 h1:qtHBDMQ6lvMQsL15g4aopM4HEfOaYuhWBw3NPTtlqq4= github.com/sosodev/duration v1.3.1 h1:qtHBDMQ6lvMQsL15g4aopM4HEfOaYuhWBw3NPTtlqq4=
github.com/sosodev/duration v1.3.1/go.mod h1:RQIBBX0+fMLc/D9+Jb/fwvVmo0eZvDDEERAikUR6SDg= github.com/sosodev/duration v1.3.1/go.mod h1:RQIBBX0+fMLc/D9+Jb/fwvVmo0eZvDDEERAikUR6SDg=
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=

View File

@ -278,7 +278,7 @@ func TestRestApi(t *testing.T) {
job.MonitoringStatus != 1 || job.MonitoringStatus != 1 ||
job.SMT != 1 || job.SMT != 1 ||
!reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "host123", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) || !reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "host123", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) ||
job.StartTime.Unix() != 123456789 { job.StartTime != 123456789 {
t.Fatalf("unexpected job properties: %#v", job) t.Fatalf("unexpected job properties: %#v", job)
} }

View File

@ -150,9 +150,9 @@ type DeleteJobApiRequest struct {
// GetJobsApiResponse model // GetJobsApiResponse model
type GetJobsApiResponse struct { type GetJobsApiResponse struct {
Jobs []*schema.JobMeta `json:"jobs"` // Array of jobs Jobs []*schema.Job `json:"jobs"` // Array of jobs
Items int `json:"items"` // Number of jobs returned Items int `json:"items"` // Number of jobs returned
Page int `json:"page"` // Page id returned Page int `json:"page"` // Page id returned
} }
// GetClustersApiResponse model // GetClustersApiResponse model
@ -361,7 +361,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
return return
} }
results := make([]*schema.JobMeta, 0, len(jobs)) results := make([]*schema.Job, 0, len(jobs))
for _, job := range jobs { for _, job := range jobs {
if withMetadata { if withMetadata {
if _, err = api.JobRepository.FetchMetadata(job); err != nil { if _, err = api.JobRepository.FetchMetadata(job); err != nil {
@ -370,27 +370,21 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
} }
} }
res := &schema.JobMeta{ job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
ID: &job.ID,
BaseJob: job.BaseJob,
StartTime: job.StartTime.Unix(),
}
res.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID)
if err != nil { if err != nil {
handleError(err, http.StatusInternalServerError, rw) handleError(err, http.StatusInternalServerError, rw)
return return
} }
if res.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful { if job.MonitoringStatus == schema.MonitoringStatusArchivingSuccessful {
res.Statistics, err = archive.GetStatistics(job) job.Statistics, err = archive.GetStatistics(job)
if err != nil { if err != nil {
handleError(err, http.StatusInternalServerError, rw) handleError(err, http.StatusInternalServerError, rw)
return return
} }
} }
results = append(results, res) results = append(results, job)
} }
log.Debugf("/api/jobs: %d jobs returned", len(results)) log.Debugf("/api/jobs: %d jobs returned", len(results))
@ -449,7 +443,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request)
return return
} }
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
if err != nil { if err != nil {
handleError(err, http.StatusInternalServerError, rw) handleError(err, http.StatusInternalServerError, rw)
return return
@ -542,7 +536,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
return return
} }
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
if err != nil { if err != nil {
handleError(err, http.StatusInternalServerError, rw) handleError(err, http.StatusInternalServerError, rw)
return return
@ -683,7 +677,7 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
return return
} }
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
if err != nil { if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError) http.Error(rw, err.Error(), http.StatusInternalServerError)
return return
@ -696,7 +690,7 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
} }
for _, tag := range req { for _, tag := range req {
tagId, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), job.ID, tag.Type, tag.Name, tag.Scope) tagId, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), *job.ID, tag.Type, tag.Name, tag.Scope)
if err != nil { if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError) http.Error(rw, err.Error(), http.StatusInternalServerError)
return return
@ -745,7 +739,7 @@ func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) {
return return
} }
job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), &job.ID) job.Tags, err = api.JobRepository.GetTags(repository.GetUserFromContext(r.Context()), job.ID)
if err != nil { if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError) http.Error(rw, err.Error(), http.StatusInternalServerError)
return return
@ -764,7 +758,7 @@ func (api *RestApi) removeTagJob(rw http.ResponseWriter, r *http.Request) {
continue continue
} }
remainingTags, err := api.JobRepository.RemoveJobTagByRequest(repository.GetUserFromContext(r.Context()), job.ID, rtag.Type, rtag.Name, rtag.Scope) remainingTags, err := api.JobRepository.RemoveJobTagByRequest(repository.GetUserFromContext(r.Context()), *job.ID, rtag.Type, rtag.Name, rtag.Scope)
if err != nil { if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError) http.Error(rw, err.Error(), http.StatusInternalServerError)
return return
@ -840,7 +834,10 @@ func (api *RestApi) removeTags(rw http.ResponseWriter, r *http.Request) {
// @security ApiKeyAuth // @security ApiKeyAuth
// @router /api/jobs/start_job/ [post] // @router /api/jobs/start_job/ [post]
func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
req := schema.JobMeta{BaseJob: schema.JobDefaults} req := schema.Job{
Exclusive: 1,
MonitoringStatus: schema.MonitoringStatusRunningOrArchiving,
}
if err := decode(r.Body, &req); err != nil { if err := decode(r.Body, &req); err != nil {
handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw) handleError(fmt.Errorf("parsing request body failed: %w", err), http.StatusBadRequest, rw)
return return
@ -849,7 +846,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
log.Printf("REST: %s\n", req.GoString()) log.Printf("REST: %s\n", req.GoString())
req.State = schema.JobStateRunning req.State = schema.JobStateRunning
if err := importer.SanityChecks(&req.BaseJob); err != nil { if err := importer.SanityChecks(&req); err != nil {
handleError(err, http.StatusBadRequest, rw) handleError(err, http.StatusBadRequest, rw)
return return
} }
@ -866,7 +863,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
return return
} else if err == nil { } else if err == nil {
for _, job := range jobs { for _, job := range jobs {
if (req.StartTime - job.StartTimeUnix) < 86400 { if (req.StartTime - job.StartTime) < 86400 {
handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID), http.StatusUnprocessableEntity, rw) handleError(fmt.Errorf("a job with that jobId, cluster and startTime already exists: dbid: %d, jobid: %d", job.ID, job.JobID), http.StatusUnprocessableEntity, rw)
return return
} }
@ -1023,7 +1020,7 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request)
return return
} }
err = api.JobRepository.DeleteJobById(job.ID) err = api.JobRepository.DeleteJobById(*job.ID)
if err != nil { if err != nil {
handleError(fmt.Errorf("deleting job failed: %w", err), http.StatusUnprocessableEntity, rw) handleError(fmt.Errorf("deleting job failed: %w", err), http.StatusUnprocessableEntity, rw)
return return
@ -1087,8 +1084,8 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
return return
} }
if job == nil || job.StartTime.Unix() > req.StopTime { if job == nil || job.StartTime > req.StopTime {
handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime.Unix()), http.StatusBadRequest, rw) handleError(fmt.Errorf("jobId %d (id %d) on %s : stopTime %d must be larger/equal than startTime %d", job.JobID, job.ID, job.Cluster, req.StopTime, job.StartTime), http.StatusBadRequest, rw)
return return
} }
@ -1100,11 +1097,11 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
} }
// Mark job as stopped in the database (update state and duration) // Mark job as stopped in the database (update state and duration)
job.Duration = int32(req.StopTime - job.StartTime.Unix()) job.Duration = int32(req.StopTime - job.StartTime)
job.State = req.State job.State = req.State
api.JobRepository.Mutex.Lock() api.JobRepository.Mutex.Lock()
if err := api.JobRepository.Stop(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { if err := api.JobRepository.Stop(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
if err := api.JobRepository.StopCached(job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil { if err := api.JobRepository.StopCached(*job.ID, job.Duration, job.State, job.MonitoringStatus); err != nil {
api.JobRepository.Mutex.Unlock() api.JobRepository.Mutex.Unlock()
handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw) handleError(fmt.Errorf("jobId %d (id %d) on %s : marking job as '%s' (duration: %d) in DB failed: %w", job.JobID, job.ID, job.Cluster, job.State, job.Duration, err), http.StatusInternalServerError, rw)
return return
@ -1112,7 +1109,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
} }
api.JobRepository.Mutex.Unlock() api.JobRepository.Mutex.Unlock()
log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%s, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State) log.Printf("archiving job... (dbid: %d): cluster=%s, jobId=%d, user=%s, startTime=%d, duration=%d, state=%s", job.ID, job.Cluster, job.JobID, job.User, job.StartTime, job.Duration, job.State)
// Send a response (with status OK). This means that erros that happen from here on forward // Send a response (with status OK). This means that erros that happen from here on forward
// can *NOT* be communicated to the client. If reading from a MetricDataRepository or // can *NOT* be communicated to the client. If reading from a MetricDataRepository or

View File

@ -41,7 +41,7 @@ func archivingWorker() {
// will fail if job meta not in repository // will fail if job meta not in repository
if _, err := jobRepo.FetchMetadata(job); err != nil { if _, err := jobRepo.FetchMetadata(job); err != nil {
log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error()) log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error())
jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed)
continue continue
} }
@ -50,7 +50,7 @@ func archivingWorker() {
jobMeta, err := ArchiveJob(job, context.Background()) jobMeta, err := ArchiveJob(job, context.Background())
if err != nil { if err != nil {
log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error()) log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error())
jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed) jobRepo.UpdateMonitoringStatus(*job.ID, schema.MonitoringStatusArchivingFailed)
continue continue
} }

View File

@ -16,7 +16,7 @@ import (
) )
// Writes a running job to the job-archive // Writes a running job to the job-archive
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.Job, error) {
allMetrics := make([]string, 0) allMetrics := make([]string, 0)
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
for _, mc := range metricConfigs { for _, mc := range metricConfigs {
@ -40,11 +40,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
return nil, err return nil, err
} }
jobMeta := &schema.JobMeta{ job.Statistics = make(map[string]schema.JobStatistics)
BaseJob: job.BaseJob,
StartTime: job.StartTime.Unix(),
Statistics: make(map[string]schema.JobStatistics),
}
for metric, data := range jobData { for metric, data := range jobData {
avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32 avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32
@ -61,7 +57,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
} }
// Round AVG Result to 2 Digits // Round AVG Result to 2 Digits
jobMeta.Statistics[metric] = schema.JobStatistics{ job.Statistics[metric] = schema.JobStatistics{
Unit: schema.Unit{ Unit: schema.Unit{
Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix, Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix,
Base: archive.GetMetricConfig(job.Cluster, metric).Unit.Base, Base: archive.GetMetricConfig(job.Cluster, metric).Unit.Base,
@ -76,8 +72,8 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
// only return the JobMeta structure as the // only return the JobMeta structure as the
// statistics in there are needed. // statistics in there are needed.
if config.Keys.DisableArchive { if config.Keys.DisableArchive {
return jobMeta, nil return job, nil
} }
return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData) return job, archive.GetHandle().ImportJob(job, &jobData)
} }

View File

@ -398,6 +398,8 @@ type ClusterResolver interface {
Partitions(ctx context.Context, obj *schema.Cluster) ([]string, error) Partitions(ctx context.Context, obj *schema.Cluster) ([]string, error)
} }
type JobResolver interface { type JobResolver interface {
StartTime(ctx context.Context, obj *schema.Job) (*time.Time, error)
Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error)
ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error)
@ -5456,9 +5458,9 @@ func (ec *executionContext) _Job_id(ctx context.Context, field graphql.Collected
} }
return graphql.Null return graphql.Null
} }
res := resTmp.(int64) res := resTmp.(*int64)
fc.Result = res fc.Result = res
return ec.marshalNID2int64(ctx, field.Selections, res) return ec.marshalNID2int64(ctx, field.Selections, res)
} }
func (ec *executionContext) fieldContext_Job_id(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { func (ec *executionContext) fieldContext_Job_id(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
@ -5708,7 +5710,7 @@ func (ec *executionContext) _Job_startTime(ctx context.Context, field graphql.Co
}() }()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) { resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (any, error) {
ctx = rctx // use context from middleware stack in children ctx = rctx // use context from middleware stack in children
return obj.StartTime, nil return ec.resolvers.Job().StartTime(rctx, obj)
}) })
if err != nil { if err != nil {
ec.Error(ctx, err) ec.Error(ctx, err)
@ -5720,17 +5722,17 @@ func (ec *executionContext) _Job_startTime(ctx context.Context, field graphql.Co
} }
return graphql.Null return graphql.Null
} }
res := resTmp.(time.Time) res := resTmp.(*time.Time)
fc.Result = res fc.Result = res
return ec.marshalNTime2timeᚐTime(ctx, field.Selections, res) return ec.marshalNTime2timeᚐTime(ctx, field.Selections, res)
} }
func (ec *executionContext) fieldContext_Job_startTime(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { func (ec *executionContext) fieldContext_Job_startTime(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{ fc = &graphql.FieldContext{
Object: "Job", Object: "Job",
Field: field, Field: field,
IsMethod: false, IsMethod: true,
IsResolver: false, IsResolver: true,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Time does not have child fields") return nil, errors.New("field of type Time does not have child fields")
}, },
@ -17424,10 +17426,41 @@ func (ec *executionContext) _Job(ctx context.Context, sel ast.SelectionSet, obj
atomic.AddUint32(&out.Invalids, 1) atomic.AddUint32(&out.Invalids, 1)
} }
case "startTime": case "startTime":
out.Values[i] = ec._Job_startTime(ctx, field, obj) field := field
if out.Values[i] == graphql.Null {
atomic.AddUint32(&out.Invalids, 1) innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
}
}()
res = ec._Job_startTime(ctx, field, obj)
if res == graphql.Null {
atomic.AddUint32(&fs.Invalids, 1)
}
return res
} }
if field.Deferrable != nil {
dfs, ok := deferred[field.Deferrable.Label]
di := 0
if ok {
dfs.AddField(field)
di = len(dfs.Values) - 1
} else {
dfs = graphql.NewFieldSet([]graphql.CollectedField{field})
deferred[field.Deferrable.Label] = dfs
}
dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler {
return innerFunc(ctx, dfs)
})
// don't run the out.Concurrently() call below
out.Values[i] = graphql.Null
continue
}
out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) })
case "duration": case "duration":
out.Values[i] = ec._Job_duration(ctx, field, obj) out.Values[i] = ec._Job_duration(ctx, field, obj)
if out.Values[i] == graphql.Null { if out.Values[i] == graphql.Null {
@ -20580,6 +20613,27 @@ func (ec *executionContext) marshalNID2ᚕstringᚄ(ctx context.Context, sel ast
return ret return ret
} }
func (ec *executionContext) unmarshalNID2ᚖint64(ctx context.Context, v any) (*int64, error) {
res, err := graphql.UnmarshalInt64(v)
return &res, graphql.ErrorOnPath(ctx, err)
}
func (ec *executionContext) marshalNID2ᚖint64(ctx context.Context, sel ast.SelectionSet, v *int64) graphql.Marshaler {
if v == nil {
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
}
return graphql.Null
}
res := graphql.MarshalInt64(*v)
if res == graphql.Null {
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
}
}
return res
}
func (ec *executionContext) unmarshalNInt2int(ctx context.Context, v any) (int, error) { func (ec *executionContext) unmarshalNInt2int(ctx context.Context, v any) (int, error) {
res, err := graphql.UnmarshalInt(v) res, err := graphql.UnmarshalInt(v)
return res, graphql.ErrorOnPath(ctx, err) return res, graphql.ErrorOnPath(ctx, err)
@ -21799,6 +21853,27 @@ func (ec *executionContext) marshalNTime2timeᚐTime(ctx context.Context, sel as
return res return res
} }
func (ec *executionContext) unmarshalNTime2ᚖtimeᚐTime(ctx context.Context, v any) (*time.Time, error) {
res, err := graphql.UnmarshalTime(v)
return &res, graphql.ErrorOnPath(ctx, err)
}
func (ec *executionContext) marshalNTime2ᚖtimeᚐTime(ctx context.Context, sel ast.SelectionSet, v *time.Time) graphql.Marshaler {
if v == nil {
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
}
return graphql.Null
}
res := graphql.MarshalTime(*v)
if res == graphql.Null {
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
}
}
return res
}
func (ec *executionContext) marshalNTimeWeights2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐTimeWeights(ctx context.Context, sel ast.SelectionSet, v *model.TimeWeights) graphql.Marshaler { func (ec *executionContext) marshalNTimeWeights2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐTimeWeights(ctx context.Context, sel ast.SelectionSet, v *model.TimeWeights) graphql.Marshaler {
if v == nil { if v == nil {
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {

View File

@ -29,9 +29,14 @@ func (r *clusterResolver) Partitions(ctx context.Context, obj *schema.Cluster) (
return r.Repo.Partitions(obj.Name) return r.Repo.Partitions(obj.Name)
} }
// StartTime is the resolver for the startTime field.
func (r *jobResolver) StartTime(ctx context.Context, obj *schema.Job) (*time.Time, error) {
panic(fmt.Errorf("not implemented: StartTime - startTime"))
}
// Tags is the resolver for the tags field. // Tags is the resolver for the tags field.
func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) { func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) {
return r.Repo.GetTags(repository.GetUserFromContext(ctx), &obj.ID) return r.Repo.GetTags(repository.GetUserFromContext(ctx), obj.ID)
} }
// ConcurrentJobs is the resolver for the concurrentJobs field. // ConcurrentJobs is the resolver for the concurrentJobs field.
@ -615,9 +620,9 @@ func (r *queryResolver) JobsMetricStats(ctx context.Context, filter []*model.Job
numThreadsInt := int(job.NumHWThreads) numThreadsInt := int(job.NumHWThreads)
numAccsInt := int(job.NumAcc) numAccsInt := int(job.NumAcc)
res = append(res, &model.JobStats{ res = append(res, &model.JobStats{
ID: int(job.ID), ID: int(*job.ID),
JobID: strconv.Itoa(int(job.JobID)), JobID: strconv.Itoa(int(job.JobID)),
StartTime: int(job.StartTime.Unix()), StartTime: int(job.StartTime),
Duration: int(job.Duration), Duration: int(job.Duration),
Cluster: job.Cluster, Cluster: job.Cluster,
SubCluster: job.SubCluster, SubCluster: job.SubCluster,
@ -776,11 +781,9 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
// SubCluster returns generated.SubClusterResolver implementation. // SubCluster returns generated.SubClusterResolver implementation.
func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} } func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} }
type ( type clusterResolver struct{ *Resolver }
clusterResolver struct{ *Resolver } type jobResolver struct{ *Resolver }
jobResolver struct{ *Resolver } type metricValueResolver struct{ *Resolver }
metricValueResolver struct{ *Resolver } type mutationResolver struct{ *Resolver }
mutationResolver struct{ *Resolver } type queryResolver struct{ *Resolver }
queryResolver struct{ *Resolver } type subClusterResolver struct{ *Resolver }
subClusterResolver struct{ *Resolver }
)

View File

@ -42,7 +42,10 @@ func HandleImportFlag(flag string) error {
} }
dec := json.NewDecoder(bytes.NewReader(raw)) dec := json.NewDecoder(bytes.NewReader(raw))
dec.DisallowUnknownFields() dec.DisallowUnknownFields()
job := schema.JobMeta{BaseJob: schema.JobDefaults} job := schema.Job{
Exclusive: 1,
MonitoringStatus: schema.MonitoringStatusRunningOrArchiving,
}
if err = dec.Decode(&job); err != nil { if err = dec.Decode(&job); err != nil {
log.Warn("Error while decoding raw json metadata for import") log.Warn("Error while decoding raw json metadata for import")
return err return err
@ -141,7 +144,7 @@ func HandleImportFlag(flag string) error {
return err return err
} }
if err = SanityChecks(&job.BaseJob); err != nil { if err = SanityChecks(&job); err != nil {
log.Warn("BaseJob SanityChecks failed") log.Warn("BaseJob SanityChecks failed")
return err return err
} }

View File

@ -60,11 +60,6 @@ func InitDB() error {
} }
jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful
job := schema.Job{
BaseJob: jobMeta.BaseJob,
StartTime: time.Unix(jobMeta.StartTime, 0),
StartTimeUnix: jobMeta.StartTime,
}
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
if err != nil { if err != nil {
@ -72,7 +67,7 @@ func InitDB() error {
return err return err
} }
job.Footprint = make(map[string]float64) jobMeta.Footprint = make(map[string]float64)
for _, fp := range sc.Footprint { for _, fp := range sc.Footprint {
statType := "avg" statType := "avg"
@ -83,16 +78,16 @@ func InitDB() error {
name := fmt.Sprintf("%s_%s", fp, statType) name := fmt.Sprintf("%s_%s", fp, statType)
job.Footprint[name] = repository.LoadJobStat(jobMeta, fp, statType) jobMeta.Footprint[name] = repository.LoadJobStat(jobMeta, fp, statType)
} }
job.RawFootprint, err = json.Marshal(job.Footprint) jobMeta.RawFootprint, err = json.Marshal(jobMeta.Footprint)
if err != nil { if err != nil {
log.Warn("Error while marshaling job footprint") log.Warn("Error while marshaling job footprint")
return err return err
} }
job.EnergyFootprint = make(map[string]float64) jobMeta.EnergyFootprint = make(map[string]float64)
// Total Job Energy Outside Loop // Total Job Energy Outside Loop
totalEnergy := 0.0 totalEnergy := 0.0
@ -117,45 +112,45 @@ func InitDB() error {
log.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, jobMeta.ID) log.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, jobMeta.ID)
} }
job.EnergyFootprint[fp] = metricEnergy jobMeta.EnergyFootprint[fp] = metricEnergy
totalEnergy += metricEnergy totalEnergy += metricEnergy
} }
job.Energy = (math.Round(totalEnergy*100.0) / 100.0) jobMeta.Energy = (math.Round(totalEnergy*100.0) / 100.0)
if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil { if jobMeta.RawEnergyFootprint, err = json.Marshal(jobMeta.EnergyFootprint); err != nil {
log.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", jobMeta.ID) log.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", jobMeta.ID)
return err return err
} }
job.RawResources, err = json.Marshal(job.Resources) jobMeta.RawResources, err = json.Marshal(jobMeta.Resources)
if err != nil { if err != nil {
log.Errorf("repository initDB(): %v", err) log.Errorf("repository initDB(): %v", err)
errorOccured++ errorOccured++
continue continue
} }
job.RawMetaData, err = json.Marshal(job.MetaData) jobMeta.RawMetaData, err = json.Marshal(jobMeta.MetaData)
if err != nil { if err != nil {
log.Errorf("repository initDB(): %v", err) log.Errorf("repository initDB(): %v", err)
errorOccured++ errorOccured++
continue continue
} }
if err := SanityChecks(&job.BaseJob); err != nil { if err := SanityChecks(jobMeta); err != nil {
log.Errorf("repository initDB(): %v", err) log.Errorf("repository initDB(): %v", err)
errorOccured++ errorOccured++
continue continue
} }
id, err := r.TransactionAddNamed(t, id, err := r.TransactionAddNamed(t,
repository.NamedJobInsert, job) repository.NamedJobInsert, jobMeta)
if err != nil { if err != nil {
log.Errorf("repository initDB(): %v", err) log.Errorf("repository initDB(): %v", err)
errorOccured++ errorOccured++
continue continue
} }
for _, tag := range job.Tags { for _, tag := range jobMeta.Tags {
tagstr := tag.Name + ":" + tag.Type tagstr := tag.Name + ":" + tag.Type
tagId, ok := tags[tagstr] tagId, ok := tags[tagstr]
if !ok { if !ok {
@ -190,7 +185,7 @@ func InitDB() error {
} }
// This function also sets the subcluster if necessary! // This function also sets the subcluster if necessary!
func SanityChecks(job *schema.BaseJob) error { func SanityChecks(job *schema.Job) error {
if c := archive.GetCluster(job.Cluster); c == nil { if c := archive.GetCluster(job.Cluster); c == nil {
return fmt.Errorf("no such cluster: %v", job.Cluster) return fmt.Errorf("no such cluster: %v", job.Cluster)
} }

View File

@ -183,8 +183,8 @@ func (ccms *CCMetricStore) LoadData(
req := ApiQueryRequest{ req := ApiQueryRequest{
Cluster: job.Cluster, Cluster: job.Cluster,
From: job.StartTime.Unix(), From: job.StartTime,
To: job.StartTime.Add(time.Duration(job.Duration) * time.Second).Unix(), To: job.StartTime + int64(job.Duration),
Queries: queries, Queries: queries,
WithStats: true, WithStats: true,
WithData: true, WithData: true,
@ -570,7 +570,6 @@ func (ccms *CCMetricStore) LoadStats(
metrics []string, metrics []string,
ctx context.Context, ctx context.Context,
) (map[string]map[string]schema.MetricStatistics, error) { ) (map[string]map[string]schema.MetricStatistics, error) {
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization? queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization?
if err != nil { if err != nil {
log.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error()) log.Errorf("Error while building queries for jobId %d, Metrics %v: %s", job.JobID, metrics, err.Error())
@ -579,8 +578,8 @@ func (ccms *CCMetricStore) LoadStats(
req := ApiQueryRequest{ req := ApiQueryRequest{
Cluster: job.Cluster, Cluster: job.Cluster,
From: job.StartTime.Unix(), From: job.StartTime,
To: job.StartTime.Add(time.Duration(job.Duration) * time.Second).Unix(), To: job.StartTime + int64(job.Duration),
Queries: queries, Queries: queries,
WithStats: true, WithStats: true,
WithData: false, WithData: false,
@ -638,8 +637,8 @@ func (ccms *CCMetricStore) LoadScopedStats(
req := ApiQueryRequest{ req := ApiQueryRequest{
Cluster: job.Cluster, Cluster: job.Cluster,
From: job.StartTime.Unix(), From: job.StartTime,
To: job.StartTime.Add(time.Duration(job.Duration) * time.Second).Unix(), To: job.StartTime + int64(job.Duration),
Queries: queries, Queries: queries,
WithStats: true, WithStats: true,
WithData: false, WithData: false,
@ -816,7 +815,6 @@ func (ccms *CCMetricStore) LoadNodeListData(
page *model.PageRequest, page *model.PageRequest,
ctx context.Context, ctx context.Context,
) (map[string]schema.JobData, int, bool, error) { ) (map[string]schema.JobData, int, bool, error) {
// 0) Init additional vars // 0) Init additional vars
var totalNodes int = 0 var totalNodes int = 0
var hasNextPage bool = false var hasNextPage bool = false
@ -975,7 +973,6 @@ func (ccms *CCMetricStore) buildNodeQueries(
scopes []schema.MetricScope, scopes []schema.MetricScope,
resolution int, resolution int,
) ([]ApiQuery, []schema.MetricScope, error) { ) ([]ApiQuery, []schema.MetricScope, error) {
queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(nodes)) queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(nodes))
assignedScope := []schema.MetricScope{} assignedScope := []schema.MetricScope{}

View File

@ -1,575 +0,0 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package metricdata
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"math"
"sort"
"strings"
"time"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
)
type InfluxDBv2DataRepositoryConfig struct {
Url string `json:"url"`
Token string `json:"token"`
Bucket string `json:"bucket"`
Org string `json:"org"`
SkipTls bool `json:"skiptls"`
}
type InfluxDBv2DataRepository struct {
client influxdb2.Client
queryClient influxdb2Api.QueryAPI
bucket, measurement string
}
func (idb *InfluxDBv2DataRepository) Init(rawConfig json.RawMessage) error {
var config InfluxDBv2DataRepositoryConfig
if err := json.Unmarshal(rawConfig, &config); err != nil {
log.Warn("Error while unmarshaling raw json config")
return err
}
idb.client = influxdb2.NewClientWithOptions(config.Url, config.Token, influxdb2.DefaultOptions().SetTLSConfig(&tls.Config{InsecureSkipVerify: config.SkipTls}))
idb.queryClient = idb.client.QueryAPI(config.Org)
idb.bucket = config.Bucket
return nil
}
func (idb *InfluxDBv2DataRepository) formatTime(t time.Time) string {
return t.Format(time.RFC3339) // Like “2006-01-02T15:04:05Z07:00”
}
func (idb *InfluxDBv2DataRepository) epochToTime(epoch int64) time.Time {
return time.Unix(epoch, 0)
}
func (idb *InfluxDBv2DataRepository) LoadData(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
resolution int) (schema.JobData, error) {
log.Infof("InfluxDB 2 Backend: Resolution Scaling not Implemented, will return default timestep. Requested Resolution %d", resolution)
measurementsConds := make([]string, 0, len(metrics))
for _, m := range metrics {
measurementsConds = append(measurementsConds, fmt.Sprintf(`r["_measurement"] == "%s"`, m))
}
measurementsCond := strings.Join(measurementsConds, " or ")
hostsConds := make([]string, 0, len(job.Resources))
for _, h := range job.Resources {
if h.HWThreads != nil || h.Accelerators != nil {
// TODO
return nil, errors.New("METRICDATA/INFLUXV2 > the InfluxDB metric data repository does not yet support HWThreads or Accelerators")
}
hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, h.Hostname))
}
hostsCond := strings.Join(hostsConds, " or ")
jobData := make(schema.JobData) // Empty Schema: map[<string>FIELD]map[<MetricScope>SCOPE]<*JobMetric>METRIC
// Requested Scopes
for _, scope := range scopes {
query := ""
switch scope {
case "node":
// Get Finest Granularity, Groupy By Measurement and Hostname (== Metric / Node), Calculate Mean for 60s windows <-- Resolution could be added here?
// log.Info("Scope 'node' requested. ")
query = fmt.Sprintf(`
from(bucket: "%s")
|> range(start: %s, stop: %s)
|> filter(fn: (r) => (%s) and (%s) )
|> drop(columns: ["_start", "_stop"])
|> group(columns: ["hostname", "_measurement"])
|> aggregateWindow(every: 60s, fn: mean)
|> drop(columns: ["_time"])`,
idb.bucket,
idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix+int64(job.Duration)+int64(1))),
measurementsCond, hostsCond)
case "socket":
log.Info("Scope 'socket' requested, but not yet supported: Will return 'node' scope only. ")
continue
case "core":
log.Info(" Scope 'core' requested, but not yet supported: Will return 'node' scope only. ")
continue
// Get Finest Granularity only, Set NULL to 0.0
// query = fmt.Sprintf(`
// from(bucket: "%s")
// |> range(start: %s, stop: %s)
// |> filter(fn: (r) => %s )
// |> filter(fn: (r) => %s )
// |> drop(columns: ["_start", "_stop", "cluster"])
// |> map(fn: (r) => (if exists r._value then {r with _value: r._value} else {r with _value: 0.0}))`,
// idb.bucket,
// idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix + int64(job.Duration) + int64(1) )),
// measurementsCond, hostsCond)
case "hwthread":
log.Info(" Scope 'hwthread' requested, but not yet supported: Will return 'node' scope only. ")
continue
case "accelerator":
log.Info(" Scope 'accelerator' requested, but not yet supported: Will return 'node' scope only. ")
continue
default:
log.Infof("Unknown scope '%s' requested: Will return 'node' scope.", scope)
continue
// return nil, errors.New("METRICDATA/INFLUXV2 > the InfluxDB metric data repository does not yet support other scopes than 'node'")
}
rows, err := idb.queryClient.Query(ctx, query)
if err != nil {
log.Error("Error while performing query")
return nil, err
}
// Init Metrics: Only Node level now -> TODO: Matching /check on scope level ...
for _, metric := range metrics {
jobMetric, ok := jobData[metric]
if !ok {
mc := archive.GetMetricConfig(job.Cluster, metric)
jobMetric = map[schema.MetricScope]*schema.JobMetric{
scope: { // uses scope var from above!
Unit: mc.Unit,
Timestep: mc.Timestep,
Series: make([]schema.Series, 0, len(job.Resources)),
StatisticsSeries: nil, // Should be: &schema.StatsSeries{},
},
}
}
jobData[metric] = jobMetric
}
// Process Result: Time-Data
field, host, hostSeries := "", "", schema.Series{}
// typeId := 0
switch scope {
case "node":
for rows.Next() {
row := rows.Record()
if host == "" || host != row.ValueByKey("hostname").(string) || rows.TableChanged() {
if host != "" {
// Append Series before reset
jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries)
}
field, host = row.Measurement(), row.ValueByKey("hostname").(string)
hostSeries = schema.Series{
Hostname: host,
Statistics: schema.MetricStatistics{}, //TODO Add Statistics
Data: make([]schema.Float, 0),
}
}
val, ok := row.Value().(float64)
if ok {
hostSeries.Data = append(hostSeries.Data, schema.Float(val))
} else {
hostSeries.Data = append(hostSeries.Data, schema.Float(0))
}
}
case "socket":
continue
case "accelerator":
continue
case "hwthread":
// See below @ core
continue
case "core":
continue
// Include Series.Id in hostSeries
// for rows.Next() {
// row := rows.Record()
// if ( host == "" || host != row.ValueByKey("hostname").(string) || typeId != row.ValueByKey("type-id").(int) || rows.TableChanged() ) {
// if ( host != "" ) {
// // Append Series before reset
// jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries)
// }
// field, host, typeId = row.Measurement(), row.ValueByKey("hostname").(string), row.ValueByKey("type-id").(int)
// hostSeries = schema.Series{
// Hostname: host,
// Id: &typeId,
// Statistics: nil,
// Data: make([]schema.Float, 0),
// }
// }
// val := row.Value().(float64)
// hostSeries.Data = append(hostSeries.Data, schema.Float(val))
// }
default:
log.Infof("Unknown scope '%s' requested: Will return 'node' scope.", scope)
continue
// return nil, errors.New("the InfluxDB metric data repository does not yet support other scopes than 'node, core'")
}
// Append last Series
jobData[field][scope].Series = append(jobData[field][scope].Series, hostSeries)
}
// Get Stats
stats, err := idb.LoadStats(job, metrics, ctx)
if err != nil {
log.Warn("Error while loading statistics")
return nil, err
}
for _, scope := range scopes {
if scope == "node" { // No 'socket/core' support yet
for metric, nodes := range stats {
for node, stats := range nodes {
for index, _ := range jobData[metric][scope].Series {
if jobData[metric][scope].Series[index].Hostname == node {
jobData[metric][scope].Series[index].Statistics = schema.MetricStatistics{Avg: stats.Avg, Min: stats.Min, Max: stats.Max}
}
}
}
}
}
}
return jobData, nil
}
func (idb *InfluxDBv2DataRepository) LoadStats(
job *schema.Job,
metrics []string,
ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) {
stats := map[string]map[string]schema.MetricStatistics{}
hostsConds := make([]string, 0, len(job.Resources))
for _, h := range job.Resources {
if h.HWThreads != nil || h.Accelerators != nil {
// TODO
return nil, errors.New("METRICDATA/INFLUXV2 > the InfluxDB metric data repository does not yet support HWThreads or Accelerators")
}
hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, h.Hostname))
}
hostsCond := strings.Join(hostsConds, " or ")
// lenMet := len(metrics)
for _, metric := range metrics {
// log.Debugf("<< You are here: %s (Index %d of %d metrics)", metric, index, lenMet)
query := fmt.Sprintf(`
data = from(bucket: "%s")
|> range(start: %s, stop: %s)
|> filter(fn: (r) => r._measurement == "%s" and r._field == "value" and (%s))
union(tables: [data |> mean(column: "_value") |> set(key: "_field", value: "avg"),
data |> min(column: "_value") |> set(key: "_field", value: "min"),
data |> max(column: "_value") |> set(key: "_field", value: "max")])
|> pivot(rowKey: ["hostname"], columnKey: ["_field"], valueColumn: "_value")
|> group()`,
idb.bucket,
idb.formatTime(job.StartTime), idb.formatTime(idb.epochToTime(job.StartTimeUnix+int64(job.Duration)+int64(1))),
metric, hostsCond)
rows, err := idb.queryClient.Query(ctx, query)
if err != nil {
log.Error("Error while performing query")
return nil, err
}
nodes := map[string]schema.MetricStatistics{}
for rows.Next() {
row := rows.Record()
host := row.ValueByKey("hostname").(string)
avg, avgok := row.ValueByKey("avg").(float64)
if !avgok {
// log.Debugf(">> Assertion error for metric %s, statistic AVG. Expected 'float64', got %v", metric, avg)
avg = 0.0
}
min, minok := row.ValueByKey("min").(float64)
if !minok {
// log.Debugf(">> Assertion error for metric %s, statistic MIN. Expected 'float64', got %v", metric, min)
min = 0.0
}
max, maxok := row.ValueByKey("max").(float64)
if !maxok {
// log.Debugf(">> Assertion error for metric %s, statistic MAX. Expected 'float64', got %v", metric, max)
max = 0.0
}
nodes[host] = schema.MetricStatistics{
Avg: avg,
Min: min,
Max: max,
}
}
stats[metric] = nodes
}
return stats, nil
}
// Used in Job-View StatsTable
// UNTESTED
func (idb *InfluxDBv2DataRepository) LoadScopedStats(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context) (schema.ScopedJobStats, error) {
// Assumption: idb.loadData() only returns series node-scope - use node scope for statsTable
scopedJobStats := make(schema.ScopedJobStats)
data, err := idb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/)
if err != nil {
log.Warn("Error while loading job for scopedJobStats")
return nil, err
}
for metric, metricData := range data {
for _, scope := range scopes {
if scope != schema.MetricScopeNode {
logOnce.Do(func() {
log.Infof("Note: Scope '%s' requested, but not yet supported: Will return 'node' scope only.", scope)
})
continue
}
if _, ok := scopedJobStats[metric]; !ok {
scopedJobStats[metric] = make(map[schema.MetricScope][]*schema.ScopedStats)
}
if _, ok := scopedJobStats[metric][scope]; !ok {
scopedJobStats[metric][scope] = make([]*schema.ScopedStats, 0)
}
for _, series := range metricData[scope].Series {
scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{
Hostname: series.Hostname,
Data: &series.Statistics,
})
}
}
}
return scopedJobStats, nil
}
// Used in Systems-View @ Node-Overview
// UNTESTED
func (idb *InfluxDBv2DataRepository) LoadNodeData(
cluster string,
metrics, nodes []string,
scopes []schema.MetricScope,
from, to time.Time,
ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) {
// Note: scopes[] Array will be ignored, only return node scope
// CONVERT ARGS TO INFLUX
measurementsConds := make([]string, 0)
for _, m := range metrics {
measurementsConds = append(measurementsConds, fmt.Sprintf(`r["_measurement"] == "%s"`, m))
}
measurementsCond := strings.Join(measurementsConds, " or ")
hostsConds := make([]string, 0)
if nodes == nil {
var allNodes []string
subClusterNodeLists := archive.NodeLists[cluster]
for _, nodeList := range subClusterNodeLists {
allNodes = append(nodes, nodeList.PrintList()...)
}
for _, node := range allNodes {
nodes = append(nodes, node)
hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, node))
}
} else {
for _, node := range nodes {
hostsConds = append(hostsConds, fmt.Sprintf(`r["hostname"] == "%s"`, node))
}
}
hostsCond := strings.Join(hostsConds, " or ")
// BUILD AND PERFORM QUERY
query := fmt.Sprintf(`
from(bucket: "%s")
|> range(start: %s, stop: %s)
|> filter(fn: (r) => (%s) and (%s) )
|> drop(columns: ["_start", "_stop"])
|> group(columns: ["hostname", "_measurement"])
|> aggregateWindow(every: 60s, fn: mean)
|> drop(columns: ["_time"])`,
idb.bucket,
idb.formatTime(from), idb.formatTime(to),
measurementsCond, hostsCond)
rows, err := idb.queryClient.Query(ctx, query)
if err != nil {
log.Error("Error while performing query")
return nil, err
}
// HANDLE QUERY RETURN
// Collect Float Arrays for Node@Metric -> No Scope Handling!
influxData := make(map[string]map[string][]schema.Float)
for rows.Next() {
row := rows.Record()
host, field := row.ValueByKey("hostname").(string), row.Measurement()
influxHostData, ok := influxData[host]
if !ok {
influxHostData = make(map[string][]schema.Float)
influxData[host] = influxHostData
}
influxFieldData, ok := influxData[host][field]
if !ok {
influxFieldData = make([]schema.Float, 0)
influxData[host][field] = influxFieldData
}
val, ok := row.Value().(float64)
if ok {
influxData[host][field] = append(influxData[host][field], schema.Float(val))
} else {
influxData[host][field] = append(influxData[host][field], schema.Float(0))
}
}
// BUILD FUNCTION RETURN
data := make(map[string]map[string][]*schema.JobMetric)
for node, metricData := range influxData {
nodeData, ok := data[node]
if !ok {
nodeData = make(map[string][]*schema.JobMetric)
data[node] = nodeData
}
for metric, floatArray := range metricData {
avg, min, max := 0.0, 0.0, 0.0
for _, val := range floatArray {
avg += float64(val)
min = math.Min(min, float64(val))
max = math.Max(max, float64(val))
}
stats := schema.MetricStatistics{
Avg: (math.Round((avg/float64(len(floatArray)))*100) / 100),
Min: (math.Round(min*100) / 100),
Max: (math.Round(max*100) / 100),
}
mc := archive.GetMetricConfig(cluster, metric)
nodeData[metric] = append(nodeData[metric], &schema.JobMetric{
Unit: mc.Unit,
Timestep: mc.Timestep,
Series: []schema.Series{
{
Hostname: node,
Statistics: stats,
Data: floatArray,
},
},
})
}
}
return data, nil
}
// Used in Systems-View @ Node-List
// UNTESTED
func (idb *InfluxDBv2DataRepository) LoadNodeListData(
cluster, subCluster, nodeFilter string,
metrics []string,
scopes []schema.MetricScope,
resolution int,
from, to time.Time,
page *model.PageRequest,
ctx context.Context,
) (map[string]schema.JobData, int, bool, error) {
// Assumption: idb.loadData() only returns series node-scope - use node scope for NodeList
// 0) Init additional vars
var totalNodes int = 0
var hasNextPage bool = false
// 1) Get list of all nodes
var nodes []string
if subCluster != "" {
scNodes := archive.NodeLists[cluster][subCluster]
nodes = scNodes.PrintList()
} else {
subClusterNodeLists := archive.NodeLists[cluster]
for _, nodeList := range subClusterNodeLists {
nodes = append(nodes, nodeList.PrintList()...)
}
}
// 2) Filter nodes
if nodeFilter != "" {
filteredNodes := []string{}
for _, node := range nodes {
if strings.Contains(node, nodeFilter) {
filteredNodes = append(filteredNodes, node)
}
}
nodes = filteredNodes
}
// 2.1) Count total nodes && Sort nodes -> Sorting invalidated after return ...
totalNodes = len(nodes)
sort.Strings(nodes)
// 3) Apply paging
if len(nodes) > page.ItemsPerPage {
start := (page.Page - 1) * page.ItemsPerPage
end := start + page.ItemsPerPage
if end > len(nodes) {
end = len(nodes)
hasNextPage = false
} else {
hasNextPage = true
}
nodes = nodes[start:end]
}
// 4) Fetch And Convert Data, use idb.LoadNodeData() for query
rawNodeData, err := idb.LoadNodeData(cluster, metrics, nodes, scopes, from, to, ctx)
if err != nil {
log.Error(fmt.Sprintf("Error while loading influx nodeData for nodeListData %#v\n", err))
return nil, totalNodes, hasNextPage, err
}
data := make(map[string]schema.JobData)
for node, nodeData := range rawNodeData {
// Init Nested Map Data Structures If Not Found
hostData, ok := data[node]
if !ok {
hostData = make(schema.JobData)
data[node] = hostData
}
for metric, nodeMetricData := range nodeData {
metricData, ok := hostData[metric]
if !ok {
metricData = make(map[schema.MetricScope]*schema.JobMetric)
data[node][metric] = metricData
}
data[node][metric][schema.MetricScopeNode] = nodeMetricData[0] // Only Node Scope Returned from loadNodeData
}
}
return data, totalNodes, hasNextPage, nil
}

View File

@ -54,8 +54,6 @@ func Init() error {
switch kind.Kind { switch kind.Kind {
case "cc-metric-store": case "cc-metric-store":
mdr = &CCMetricStore{} mdr = &CCMetricStore{}
case "influxdb":
mdr = &InfluxDBv2DataRepository{}
case "prometheus": case "prometheus":
mdr = &PrometheusDataRepository{} mdr = &PrometheusDataRepository{}
case "test": case "test":

View File

@ -279,8 +279,8 @@ func (pdb *PrometheusDataRepository) LoadData(
for i, resource := range job.Resources { for i, resource := range job.Resources {
nodes[i] = resource.Hostname nodes[i] = resource.Hostname
} }
from := job.StartTime from := time.Unix(job.StartTime, 0)
to := job.StartTime.Add(time.Duration(job.Duration) * time.Second) to := time.Unix(job.StartTime+int64(job.Duration), 0)
for _, scope := range scopes { for _, scope := range scopes {
if scope != schema.MetricScopeNode { if scope != schema.MetricScopeNode {
@ -453,8 +453,8 @@ func (pdb *PrometheusDataRepository) LoadScopedStats(
job *schema.Job, job *schema.Job,
metrics []string, metrics []string,
scopes []schema.MetricScope, scopes []schema.MetricScope,
ctx context.Context) (schema.ScopedJobStats, error) { ctx context.Context,
) (schema.ScopedJobStats, error) {
// Assumption: pdb.loadData() only returns series node-scope - use node scope for statsTable // Assumption: pdb.loadData() only returns series node-scope - use node scope for statsTable
scopedJobStats := make(schema.ScopedJobStats) scopedJobStats := make(schema.ScopedJobStats)
data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/) data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/)
@ -502,7 +502,6 @@ func (pdb *PrometheusDataRepository) LoadNodeListData(
page *model.PageRequest, page *model.PageRequest,
ctx context.Context, ctx context.Context,
) (map[string]schema.JobData, int, bool, error) { ) (map[string]schema.JobData, int, bool, error) {
// Assumption: pdb.loadData() only returns series node-scope - use node scope for NodeList // Assumption: pdb.loadData() only returns series node-scope - use node scope for NodeList
// 0) Init additional vars // 0) Init additional vars

View File

@ -73,7 +73,7 @@ func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) {
if err := row.Scan( if err := row.Scan(
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster,
&job.StartTimeUnix, &job.Partition, &job.ArrayJobId, &job.NumNodes, &job.NumHWThreads, &job.StartTime, &job.Partition, &job.ArrayJobId, &job.NumNodes, &job.NumHWThreads,
&job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
&job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint, &job.Energy); err != nil { &job.Duration, &job.Walltime, &job.RawResources, &job.RawFootprint, &job.Energy); err != nil {
log.Warnf("Error while scanning rows (Job): %v", err) log.Warnf("Error while scanning rows (Job): %v", err)
@ -92,10 +92,9 @@ func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) {
} }
job.RawFootprint = nil job.RawFootprint = nil
job.StartTime = time.Unix(job.StartTimeUnix, 0)
// Always ensure accurate duration for running jobs // Always ensure accurate duration for running jobs
if job.State == schema.JobStateRunning { if job.State == schema.JobStateRunning {
job.Duration = int32(time.Since(job.StartTime).Seconds()) job.Duration = int32(time.Now().Unix() - job.StartTime)
} }
return job, nil return job, nil
@ -582,7 +581,7 @@ func (r *JobRepository) MarkArchived(
func (r *JobRepository) UpdateEnergy( func (r *JobRepository) UpdateEnergy(
stmt sq.UpdateBuilder, stmt sq.UpdateBuilder,
jobMeta *schema.JobMeta, jobMeta *schema.Job,
) (sq.UpdateBuilder, error) { ) (sq.UpdateBuilder, error) {
/* Note: Only Called for Running Jobs during Intermediate Update or on Archiving */ /* Note: Only Called for Running Jobs during Intermediate Update or on Archiving */
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)
@ -632,7 +631,7 @@ func (r *JobRepository) UpdateEnergy(
func (r *JobRepository) UpdateFootprint( func (r *JobRepository) UpdateFootprint(
stmt sq.UpdateBuilder, stmt sq.UpdateBuilder,
jobMeta *schema.JobMeta, jobMeta *schema.Job,
) (sq.UpdateBuilder, error) { ) (sq.UpdateBuilder, error) {
/* Note: Only Called for Running Jobs during Intermediate Update or on Archiving */ /* Note: Only Called for Running Jobs during Intermediate Update or on Archiving */
sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster)

View File

@ -29,7 +29,7 @@ const NamedJobInsert string = `INSERT INTO job (
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :footprint, :energy, :energy_footprint, :resources, :meta_data
);` );`
func (r *JobRepository) InsertJob(job *schema.JobMeta) (int64, error) { func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) {
r.Mutex.Lock() r.Mutex.Lock()
res, err := r.DB.NamedExec(NamedJobCacheInsert, job) res, err := r.DB.NamedExec(NamedJobCacheInsert, job)
r.Mutex.Unlock() r.Mutex.Unlock()
@ -87,7 +87,7 @@ func (r *JobRepository) SyncJobs() ([]*schema.Job, error) {
// Start inserts a new job in the table, returning the unique job ID. // Start inserts a new job in the table, returning the unique job ID.
// Statistics are not transfered! // Statistics are not transfered!
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) { func (r *JobRepository) Start(job *schema.Job) (id int64, err error) {
job.RawFootprint, err = json.Marshal(job.Footprint) job.RawFootprint, err = json.Marshal(job.Footprint)
if err != nil { if err != nil {
return -1, fmt.Errorf("REPOSITORY/JOB > encoding footprint field failed: %w", err) return -1, fmt.Errorf("REPOSITORY/JOB > encoding footprint field failed: %w", err)

View File

@ -227,7 +227,7 @@ func (r *JobRepository) FindConcurrentJobs(
var startTime int64 var startTime int64
var stopTime int64 var stopTime int64
startTime = job.StartTimeUnix startTime = job.StartTime
hostname := job.Resources[0].Hostname hostname := job.Resources[0].Hostname
if job.State == schema.JobStateRunning { if job.State == schema.JobStateRunning {

View File

@ -24,7 +24,7 @@ func TestFind(t *testing.T) {
// fmt.Printf("%+v", job) // fmt.Printf("%+v", job)
if job.ID != 5 { if *job.ID != 5 {
t.Errorf("wrong summary for diagnostic 3\ngot: %d \nwant: 1366", job.JobID) t.Errorf("wrong summary for diagnostic 3\ngot: %d \nwant: 1366", job.JobID)
} }
} }

View File

@ -291,7 +291,7 @@ func (r *JobRepository) JobsStats(
return stats, nil return stats, nil
} }
func LoadJobStat(job *schema.JobMeta, metric string, statType string) float64 { func LoadJobStat(job *schema.Job, metric string, statType string) float64 {
if stats, ok := job.Statistics[metric]; ok { if stats, ok := job.Statistics[metric]; ok {
switch statType { switch statType {
case "avg": case "avg":
@ -759,7 +759,6 @@ func (r *JobRepository) runningJobsMetricStatisticsHistogram(
filters []*model.JobFilter, filters []*model.JobFilter,
bins *int, bins *int,
) []*model.MetricHistoPoints { ) []*model.MetricHistoPoints {
// Get Jobs // Get Jobs
jobs, err := r.QueryJobs(ctx, filters, &model.PageRequest{Page: 1, ItemsPerPage: 500 + 1}, nil) jobs, err := r.QueryJobs(ctx, filters, &model.PageRequest{Page: 1, ItemsPerPage: 500 + 1}, nil)
if err != nil { if err != nil {

Binary file not shown.

View File

@ -301,7 +301,7 @@ func (t *JobClassTagger) Match(job *schema.Job) {
} }
if match.(bool) { if match.(bool) {
log.Info("Rule matches!") log.Info("Rule matches!")
id := job.ID id := *job.ID
if !r.HasTag(id, t.tagType, tag) { if !r.HasTag(id, t.tagType, tag) {
r.AddTagOrCreateDirect(id, t.tagType, tag) r.AddTagOrCreateDirect(id, t.tagType, tag)
} }

View File

@ -105,7 +105,7 @@ func (t *AppTagger) Match(job *schema.Job) {
jobscript, ok := metadata["jobScript"] jobscript, ok := metadata["jobScript"]
if ok { if ok {
id := job.ID id := *job.ID
out: out:
for _, a := range t.apps { for _, a := range t.apps {

View File

@ -73,11 +73,7 @@ func RegisterFootprintWorker() {
continue continue
} }
jobMeta := &schema.JobMeta{ job.Statistics = make(map[string]schema.JobStatistics)
BaseJob: job.BaseJob,
StartTime: job.StartTime.Unix(),
Statistics: make(map[string]schema.JobStatistics),
}
for _, metric := range allMetrics { for _, metric := range allMetrics {
avg, min, max := 0.0, 0.0, 0.0 avg, min, max := 0.0, 0.0, 0.0
@ -95,7 +91,7 @@ func RegisterFootprintWorker() {
} }
// Add values rounded to 2 digits: repo.LoadStats may return unrounded // Add values rounded to 2 digits: repo.LoadStats may return unrounded
jobMeta.Statistics[metric] = schema.JobStatistics{ job.Statistics[metric] = schema.JobStatistics{
Unit: schema.Unit{ Unit: schema.Unit{
Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix, Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix,
Base: archive.GetMetricConfig(job.Cluster, metric).Unit.Base, Base: archive.GetMetricConfig(job.Cluster, metric).Unit.Base,
@ -108,7 +104,7 @@ func RegisterFootprintWorker() {
// Build Statement per Job, Add to Pending Array // Build Statement per Job, Add to Pending Array
stmt := sq.Update("job") stmt := sq.Update("job")
stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta) stmt, err = jobRepo.UpdateFootprint(stmt, job)
if err != nil { if err != nil {
log.Errorf("update job (dbid: %d) statement build failed at footprint step: %s", job.ID, err.Error()) log.Errorf("update job (dbid: %d) statement build failed at footprint step: %s", job.ID, err.Error())
ce++ ce++

View File

@ -23,7 +23,7 @@ type ArchiveBackend interface {
Exists(job *schema.Job) bool Exists(job *schema.Job) bool
LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) LoadJobMeta(job *schema.Job) (*schema.Job, error)
LoadJobData(job *schema.Job) (schema.JobData, error) LoadJobData(job *schema.Job) (schema.JobData, error)
@ -31,9 +31,9 @@ type ArchiveBackend interface {
LoadClusterCfg(name string) (*schema.Cluster, error) LoadClusterCfg(name string) (*schema.Cluster, error)
StoreJobMeta(jobMeta *schema.JobMeta) error StoreJobMeta(jobMeta *schema.Job) error
ImportJob(jobMeta *schema.JobMeta, jobData *schema.JobData) error ImportJob(jobMeta *schema.Job, jobData *schema.JobData) error
GetClusters() []string GetClusters() []string
@ -51,7 +51,7 @@ type ArchiveBackend interface {
} }
type JobContainer struct { type JobContainer struct {
Meta *schema.JobMeta Meta *schema.Job
Data *schema.JobData Data *schema.JobData
} }
@ -162,7 +162,6 @@ func LoadScopedStatsFromArchive(
metrics []string, metrics []string,
scopes []schema.MetricScope, scopes []schema.MetricScope,
) (schema.ScopedJobStats, error) { ) (schema.ScopedJobStats, error) {
data, err := ar.LoadJobStats(job) data, err := ar.LoadJobStats(job)
if err != nil { if err != nil {
log.Errorf("Error while loading job stats from archiveBackend: %s", err.Error()) log.Errorf("Error while loading job stats from archiveBackend: %s", err.Error())

View File

@ -9,7 +9,6 @@ import (
"fmt" "fmt"
"path/filepath" "path/filepath"
"testing" "testing"
"time"
"github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/internal/util"
"github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/archive"
@ -32,12 +31,12 @@ func setup(t *testing.T) archive.ArchiveBackend {
jobs[0] = &schema.Job{} jobs[0] = &schema.Job{}
jobs[0].JobID = 1403244 jobs[0].JobID = 1403244
jobs[0].Cluster = "emmy" jobs[0].Cluster = "emmy"
jobs[0].StartTime = time.Unix(1608923076, 0) jobs[0].StartTime = 1608923076
jobs[1] = &schema.Job{} jobs[1] = &schema.Job{}
jobs[0].JobID = 1404397 jobs[0].JobID = 1404397
jobs[0].Cluster = "emmy" jobs[0].Cluster = "emmy"
jobs[0].StartTime = time.Unix(1609300556, 0) jobs[0].StartTime = 1609300556
return archive.GetHandle() return archive.GetHandle()
} }

View File

@ -223,7 +223,7 @@ func GetMetricConfig(cluster, metric string) *schema.MetricConfig {
// AssignSubCluster sets the `job.subcluster` property of the job based // AssignSubCluster sets the `job.subcluster` property of the job based
// on its cluster and resources. // on its cluster and resources.
func AssignSubCluster(job *schema.BaseJob) error { func AssignSubCluster(job *schema.Job) error {
cluster := GetCluster(job.Cluster) cluster := GetCluster(job.Cluster)
if cluster == nil { if cluster == nil {
return fmt.Errorf("ARCHIVE/CLUSTERCONFIG > unkown cluster: %v", job.Cluster) return fmt.Errorf("ARCHIVE/CLUSTERCONFIG > unkown cluster: %v", job.Cluster)

View File

@ -53,7 +53,7 @@ func getDirectory(
rootPath, rootPath,
job.Cluster, job.Cluster,
lvl1, lvl2, lvl1, lvl2,
strconv.FormatInt(job.StartTime.Unix(), 10)) strconv.FormatInt(job.StartTime, 10))
} }
func getPath( func getPath(
@ -65,15 +65,15 @@ func getPath(
getDirectory(job, rootPath), file) getDirectory(job, rootPath), file)
} }
func loadJobMeta(filename string) (*schema.JobMeta, error) { func loadJobMeta(filename string) (*schema.Job, error) {
b, err := os.ReadFile(filename) b, err := os.ReadFile(filename)
if err != nil { if err != nil {
log.Errorf("loadJobMeta() > open file error: %v", err) log.Errorf("loadJobMeta() > open file error: %v", err)
return &schema.JobMeta{}, err return nil, err
} }
if config.Keys.Validate { if config.Keys.Validate {
if err := schema.Validate(schema.Meta, bytes.NewReader(b)); err != nil { if err := schema.Validate(schema.Meta, bytes.NewReader(b)); err != nil {
return &schema.JobMeta{}, fmt.Errorf("validate job meta: %v", err) return nil, fmt.Errorf("validate job meta: %v", err)
} }
} }
@ -429,7 +429,7 @@ func (fsa *FsArchive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, erro
return loadJobStats(filename, isCompressed) return loadJobStats(filename, isCompressed)
} }
func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.Job, error) {
filename := getPath(job, fsa.path, "meta.json") filename := getPath(job, fsa.path, "meta.json")
return loadJobMeta(filename) return loadJobMeta(filename)
} }
@ -518,18 +518,13 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
return ch return ch
} }
func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error { func (fsa *FsArchive) StoreJobMeta(job *schema.Job) error {
job := schema.Job{ f, err := os.Create(getPath(job, fsa.path, "meta.json"))
BaseJob: jobMeta.BaseJob,
StartTime: time.Unix(jobMeta.StartTime, 0),
StartTimeUnix: jobMeta.StartTime,
}
f, err := os.Create(getPath(&job, fsa.path, "meta.json"))
if err != nil { if err != nil {
log.Error("Error while creating filepath for meta.json") log.Error("Error while creating filepath for meta.json")
return err return err
} }
if err := EncodeJobMeta(f, jobMeta); err != nil { if err := EncodeJobMeta(f, job); err != nil {
log.Error("Error while encoding job metadata to meta.json file") log.Error("Error while encoding job metadata to meta.json file")
return err return err
} }
@ -546,15 +541,10 @@ func (fsa *FsArchive) GetClusters() []string {
} }
func (fsa *FsArchive) ImportJob( func (fsa *FsArchive) ImportJob(
jobMeta *schema.JobMeta, jobMeta *schema.Job,
jobData *schema.JobData, jobData *schema.JobData,
) error { ) error {
job := schema.Job{ dir := getPath(jobMeta, fsa.path, "")
BaseJob: jobMeta.BaseJob,
StartTime: time.Unix(jobMeta.StartTime, 0),
StartTimeUnix: jobMeta.StartTime,
}
dir := getPath(&job, fsa.path, "")
if err := os.MkdirAll(dir, 0777); err != nil { if err := os.MkdirAll(dir, 0777); err != nil {
log.Error("Error while creating job archive path") log.Error("Error while creating job archive path")
return err return err

View File

@ -9,7 +9,6 @@ import (
"fmt" "fmt"
"path/filepath" "path/filepath"
"testing" "testing"
"time"
"github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/internal/util"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
@ -86,8 +85,11 @@ func TestLoadJobMeta(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
jobIn := schema.Job{BaseJob: schema.JobDefaults} jobIn := schema.Job{
jobIn.StartTime = time.Unix(1608923076, 0) Exclusive: 1,
MonitoringStatus: schema.MonitoringStatusRunningOrArchiving,
}
jobIn.StartTime = 1608923076
jobIn.JobID = 1403244 jobIn.JobID = 1403244
jobIn.Cluster = "emmy" jobIn.Cluster = "emmy"
@ -114,8 +116,11 @@ func TestLoadJobData(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
jobIn := schema.Job{BaseJob: schema.JobDefaults} jobIn := schema.Job{
jobIn.StartTime = time.Unix(1608923076, 0) Exclusive: 1,
MonitoringStatus: schema.MonitoringStatusRunningOrArchiving,
}
jobIn.StartTime = 1608923076
jobIn.JobID = 1403244 jobIn.JobID = 1403244
jobIn.Cluster = "emmy" jobIn.Cluster = "emmy"
@ -142,8 +147,11 @@ func BenchmarkLoadJobData(b *testing.B) {
var fsa FsArchive var fsa FsArchive
fsa.Init(json.RawMessage(archiveCfg)) fsa.Init(json.RawMessage(archiveCfg))
jobIn := schema.Job{BaseJob: schema.JobDefaults} jobIn := schema.Job{
jobIn.StartTime = time.Unix(1608923076, 0) Exclusive: 1,
MonitoringStatus: schema.MonitoringStatusRunningOrArchiving,
}
jobIn.StartTime = 1608923076
jobIn.JobID = 1403244 jobIn.JobID = 1403244
jobIn.Cluster = "emmy" jobIn.Cluster = "emmy"
@ -165,8 +173,11 @@ func BenchmarkLoadJobDataCompressed(b *testing.B) {
var fsa FsArchive var fsa FsArchive
fsa.Init(json.RawMessage(archiveCfg)) fsa.Init(json.RawMessage(archiveCfg))
jobIn := schema.Job{BaseJob: schema.JobDefaults} jobIn := schema.Job{
jobIn.StartTime = time.Unix(1608923076, 0) Exclusive: 1,
MonitoringStatus: schema.MonitoringStatusRunningOrArchiving,
}
jobIn.StartTime = 1608923076
jobIn.JobID = 1403244 jobIn.JobID = 1403244
jobIn.Cluster = "emmy" jobIn.Cluster = "emmy"

View File

@ -69,8 +69,8 @@ func DecodeJobStats(r io.Reader, k string) (schema.ScopedJobStats, error) {
return nil, err return nil, err
} }
func DecodeJobMeta(r io.Reader) (*schema.JobMeta, error) { func DecodeJobMeta(r io.Reader) (*schema.Job, error) {
var d schema.JobMeta var d schema.Job
if err := json.NewDecoder(r).Decode(&d); err != nil { if err := json.NewDecoder(r).Decode(&d); err != nil {
log.Warn("Error while decoding raw job meta json") log.Warn("Error while decoding raw job meta json")
return &d, err return &d, err
@ -103,7 +103,7 @@ func EncodeJobData(w io.Writer, d *schema.JobData) error {
return nil return nil
} }
func EncodeJobMeta(w io.Writer, d *schema.JobMeta) error { func EncodeJobMeta(w io.Writer, d *schema.Job) error {
// Sanitize parameters // Sanitize parameters
if err := json.NewEncoder(w).Encode(d); err != nil { if err := json.NewEncoder(w).Encode(d); err != nil {
log.Warn("Error while encoding new job meta json") log.Warn("Error while encoding new job meta json")

View File

@ -8,43 +8,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"time"
) )
// BaseJob is the common part of the job metadata structs
//
// Common subset of Job and JobMeta. Use one of those, not this type directly.
type BaseJob struct {
Cluster string `json:"cluster" db:"cluster" example:"fritz"`
SubCluster string `json:"subCluster" db:"subcluster" example:"main"`
Partition string `json:"partition,omitempty" db:"cluster_partition" example:"main"`
Project string `json:"project" db:"project" example:"abcd200"`
User string `json:"user" db:"hpc_user" example:"abcd100h"`
State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"`
Tags []*Tag `json:"tags,omitempty"`
RawEnergyFootprint []byte `json:"-" db:"energy_footprint"`
RawFootprint []byte `json:"-" db:"footprint"`
RawMetaData []byte `json:"-" db:"meta_data"`
RawResources []byte `json:"-" db:"resources"`
Resources []*Resource `json:"resources"`
EnergyFootprint map[string]float64 `json:"energyFootprint"`
Footprint map[string]float64 `json:"footprint"`
MetaData map[string]string `json:"metaData"`
ConcurrentJobs JobLinkResultList `json:"concurrentJobs"`
Energy float64 `json:"energy" db:"energy"`
ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"`
Walltime int64 `json:"walltime,omitempty" db:"walltime" example:"86400" minimum:"1"`
JobID int64 `json:"jobId" db:"job_id" example:"123000"`
Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"`
SMT int32 `json:"smt,omitempty" db:"smt" example:"4"`
MonitoringStatus int32 `json:"monitoringStatus,omitempty" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"`
Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"`
NumAcc int32 `json:"numAcc,omitempty" db:"num_acc" example:"2" minimum:"1"`
NumHWThreads int32 `json:"numHwthreads,omitempty" db:"num_hwthreads" example:"20" minimum:"1"`
NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"`
}
// Job struct type // Job struct type
// //
// This type is used as the GraphQL interface and using sqlx as a table row. // This type is used as the GraphQL interface and using sqlx as a table row.
@ -52,10 +17,36 @@ type BaseJob struct {
// Job model // Job model
// @Description Information of a HPC job. // @Description Information of a HPC job.
type Job struct { type Job struct {
StartTime time.Time `json:"startTime"` Cluster string `json:"cluster" db:"cluster" example:"fritz"`
BaseJob SubCluster string `json:"subCluster" db:"subcluster" example:"main"`
ID int64 `json:"id" db:"id"` Partition string `json:"partition,omitempty" db:"cluster_partition" example:"main"`
StartTimeUnix int64 `json:"-" db:"start_time" example:"1649723812"` Project string `json:"project" db:"project" example:"abcd200"`
User string `json:"user" db:"hpc_user" example:"abcd100h"`
State JobState `json:"jobState" db:"job_state" example:"completed" enums:"completed,failed,cancelled,stopped,timeout,out_of_memory"`
Tags []*Tag `json:"tags,omitempty"`
RawEnergyFootprint []byte `json:"-" db:"energy_footprint"`
RawFootprint []byte `json:"-" db:"footprint"`
RawMetaData []byte `json:"-" db:"meta_data"`
RawResources []byte `json:"-" db:"resources"`
Resources []*Resource `json:"resources"`
EnergyFootprint map[string]float64 `json:"energyFootprint"`
Footprint map[string]float64 `json:"footprint"`
MetaData map[string]string `json:"metaData"`
ConcurrentJobs JobLinkResultList `json:"concurrentJobs"`
Energy float64 `json:"energy" db:"energy"`
ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"`
Walltime int64 `json:"walltime,omitempty" db:"walltime" example:"86400" minimum:"1"`
JobID int64 `json:"jobId" db:"job_id" example:"123000"`
Duration int32 `json:"duration" db:"duration" example:"43200" minimum:"1"`
SMT int32 `json:"smt,omitempty" db:"smt" example:"4"`
MonitoringStatus int32 `json:"monitoringStatus,omitempty" db:"monitoring_status" example:"1" minimum:"0" maximum:"3"`
Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"`
NumAcc int32 `json:"numAcc,omitempty" db:"num_acc" example:"2" minimum:"1"`
NumHWThreads int32 `json:"numHwthreads,omitempty" db:"num_hwthreads" example:"20" minimum:"1"`
NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"`
Statistics map[string]JobStatistics `json:"statistics"`
ID *int64 `json:"id,omitempty" db:"id"`
StartTime int64 `json:"startTime" db:"start_time" example:"1649723812"`
} }
// JobMeta struct type // JobMeta struct type
@ -70,12 +61,12 @@ type Job struct {
// //
// JobMeta model // JobMeta model
// @Description Meta data information of a HPC job. // @Description Meta data information of a HPC job.
type JobMeta struct { // type JobMeta struct {
ID *int64 `json:"id,omitempty"` // ID *int64 `json:"id,omitempty"`
Statistics map[string]JobStatistics `json:"statistics"` // BaseJob
BaseJob // Statistics map[string]JobStatistics `json:"statistics"`
StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"` // StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"`
} // }
type JobLink struct { type JobLink struct {
ID int64 `json:"id"` ID int64 `json:"id"`
@ -94,10 +85,10 @@ const (
MonitoringStatusArchivingSuccessful int32 = 3 MonitoringStatusArchivingSuccessful int32 = 3
) )
var JobDefaults BaseJob = BaseJob{ // var JobDefaults Job = Job{
Exclusive: 1, // Exclusive: 1,
MonitoringStatus: MonitoringStatusRunningOrArchiving, // MonitoringStatus: MonitoringStatusRunningOrArchiving,
} // }
type Unit struct { type Unit struct {
Base string `json:"base"` Base string `json:"base"`
@ -144,9 +135,9 @@ const (
JobStateOutOfMemory JobState = "out_of_memory" JobStateOutOfMemory JobState = "out_of_memory"
) )
func (j JobMeta) GoString() string { func (j Job) GoString() string {
return fmt.Sprintf("JobMeta{ID:%d, StartTime:%d, JobID:%v, BaseJob:%v}", return fmt.Sprintf("Job{ID:%d, StartTime:%d, JobID:%v, BaseJob:%v}",
j.ID, j.StartTime, j.JobID, j.BaseJob) j.ID, j.StartTime, j.JobID, j)
} }
func (e *JobState) UnmarshalGQL(v any) error { func (e *JobState) UnmarshalGQL(v any) error {