mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2024-11-10 08:57:25 +01:00
initial nats data concept commit
This commit is contained in:
parent
c80d3a6958
commit
5c7733dc4b
@ -17,6 +17,7 @@ import (
|
|||||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/importer"
|
"github.com/ClusterCockpit/cc-backend/internal/importer"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/natsMessenger"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/taskManager"
|
"github.com/ClusterCockpit/cc-backend/internal/taskManager"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
@ -213,9 +214,18 @@ func main() {
|
|||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// Start NATS Messenger if Config exists
|
||||||
|
wg.Add(1)
|
||||||
|
nm, err := natsMessenger.New(config.Keys.Nats)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Error on NATS startup!")
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
|
||||||
|
// Start HTTP server
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
wg.Done()
|
||||||
serverStart()
|
serverStart()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -227,6 +237,10 @@ func main() {
|
|||||||
<-sigs
|
<-sigs
|
||||||
runtimeEnv.SystemdNotifiy(false, "Shutting down ...")
|
runtimeEnv.SystemdNotifiy(false, "Shutting down ...")
|
||||||
|
|
||||||
|
if nm != nil {
|
||||||
|
nm.StopNatsMessenger()
|
||||||
|
}
|
||||||
|
|
||||||
serverShutdown()
|
serverShutdown()
|
||||||
|
|
||||||
taskManager.Shutdown()
|
taskManager.Shutdown()
|
||||||
|
@ -316,9 +316,9 @@ func serverStart() {
|
|||||||
MinVersion: tls.VersionTLS12,
|
MinVersion: tls.VersionTLS12,
|
||||||
PreferServerCipherSuites: true,
|
PreferServerCipherSuites: true,
|
||||||
})
|
})
|
||||||
fmt.Printf("HTTPS server listening at %s...", config.Keys.Addr)
|
fmt.Printf("HTTPS server listening at %s...\n", config.Keys.Addr)
|
||||||
} else {
|
} else {
|
||||||
fmt.Printf("HTTP server listening at %s...", config.Keys.Addr)
|
fmt.Printf("HTTP server listening at %s...\n", config.Keys.Addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = server.Serve(listener); err != nil && err != http.ErrServerClosed {
|
if err = server.Serve(listener); err != nil && err != http.ErrServerClosed {
|
||||||
|
9
go.mod
9
go.mod
@ -19,6 +19,7 @@ require (
|
|||||||
github.com/influxdata/influxdb-client-go/v2 v2.13.0
|
github.com/influxdata/influxdb-client-go/v2 v2.13.0
|
||||||
github.com/jmoiron/sqlx v1.4.0
|
github.com/jmoiron/sqlx v1.4.0
|
||||||
github.com/mattn/go-sqlite3 v1.14.22
|
github.com/mattn/go-sqlite3 v1.14.22
|
||||||
|
github.com/nats-io/nats.go v1.36.0
|
||||||
github.com/prometheus/client_golang v1.19.1
|
github.com/prometheus/client_golang v1.19.1
|
||||||
github.com/prometheus/common v0.55.0
|
github.com/prometheus/common v0.55.0
|
||||||
github.com/qustavo/sqlhooks/v2 v2.1.0
|
github.com/qustavo/sqlhooks/v2 v2.1.0
|
||||||
@ -58,14 +59,20 @@ require (
|
|||||||
github.com/josharian/intern v1.0.0 // indirect
|
github.com/josharian/intern v1.0.0 // indirect
|
||||||
github.com/jpillora/backoff v1.0.0 // indirect
|
github.com/jpillora/backoff v1.0.0 // indirect
|
||||||
github.com/json-iterator/go v1.1.12 // indirect
|
github.com/json-iterator/go v1.1.12 // indirect
|
||||||
|
github.com/klauspost/compress v1.17.9 // indirect
|
||||||
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
|
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
|
||||||
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
|
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
|
||||||
github.com/mailru/easyjson v0.7.7 // indirect
|
github.com/mailru/easyjson v0.7.7 // indirect
|
||||||
|
github.com/minio/highwayhash v1.0.3 // indirect
|
||||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
|
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
|
||||||
|
github.com/nats-io/jwt/v2 v2.5.8 // indirect
|
||||||
|
github.com/nats-io/nats-server/v2 v2.10.18 // indirect
|
||||||
|
github.com/nats-io/nkeys v0.4.7 // indirect
|
||||||
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
github.com/oapi-codegen/runtime v1.1.1 // indirect
|
github.com/oapi-codegen/runtime v1.1.1 // indirect
|
||||||
github.com/opencontainers/image-spec v1.1.0-rc2.0.20221005185240-3a7f492d3f1b // indirect
|
github.com/opencontainers/image-spec v1.1.0-rc2.0.20221005185240-3a7f492d3f1b // indirect
|
||||||
github.com/prometheus/client_model v0.6.1 // indirect
|
github.com/prometheus/client_model v0.6.1 // indirect
|
||||||
@ -77,11 +84,13 @@ require (
|
|||||||
github.com/urfave/cli/v2 v2.27.2 // indirect
|
github.com/urfave/cli/v2 v2.27.2 // indirect
|
||||||
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
|
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
|
||||||
go.uber.org/atomic v1.11.0 // indirect
|
go.uber.org/atomic v1.11.0 // indirect
|
||||||
|
go.uber.org/automaxprocs v1.5.3 // indirect
|
||||||
golang.org/x/mod v0.19.0 // indirect
|
golang.org/x/mod v0.19.0 // indirect
|
||||||
golang.org/x/net v0.27.0 // indirect
|
golang.org/x/net v0.27.0 // indirect
|
||||||
golang.org/x/sync v0.7.0 // indirect
|
golang.org/x/sync v0.7.0 // indirect
|
||||||
golang.org/x/sys v0.22.0 // indirect
|
golang.org/x/sys v0.22.0 // indirect
|
||||||
golang.org/x/text v0.16.0 // indirect
|
golang.org/x/text v0.16.0 // indirect
|
||||||
|
golang.org/x/time v0.5.0 // indirect
|
||||||
golang.org/x/tools v0.23.0 // indirect
|
golang.org/x/tools v0.23.0 // indirect
|
||||||
google.golang.org/protobuf v1.34.2 // indirect
|
google.golang.org/protobuf v1.34.2 // indirect
|
||||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||||
|
19
go.sum
19
go.sum
@ -140,6 +140,8 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX
|
|||||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||||
github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE=
|
github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE=
|
||||||
|
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
|
||||||
|
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
|
||||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||||
@ -156,6 +158,8 @@ github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ
|
|||||||
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
|
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
|
||||||
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
|
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
|
||||||
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||||
|
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
|
||||||
|
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
|
||||||
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
|
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
|
||||||
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||||
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
|
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
|
||||||
@ -171,6 +175,16 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
|
|||||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
|
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
|
||||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
|
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
|
||||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||||
|
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
|
||||||
|
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
|
||||||
|
github.com/nats-io/nats-server/v2 v2.10.18 h1:tRdZmBuWKVAFYtayqlBB2BuCHNGAQPvoQIXOKwU3WSM=
|
||||||
|
github.com/nats-io/nats-server/v2 v2.10.18/go.mod h1:97Qyg7YydD8blKlR8yBsUlPlWyZKjA7Bp5cl3MUE9K8=
|
||||||
|
github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU=
|
||||||
|
github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
||||||
|
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
||||||
|
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
|
||||||
|
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||||
|
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||||
github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro=
|
github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro=
|
||||||
github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg=
|
github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg=
|
||||||
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
|
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
|
||||||
@ -231,6 +245,8 @@ github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913/go.mod h1:4aEEwZQut
|
|||||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||||
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
|
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
|
||||||
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
||||||
|
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
|
||||||
|
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
|
||||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||||
@ -273,6 +289,7 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
|||||||
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
|
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
|
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
|
||||||
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||||
@ -289,6 +306,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
|
|||||||
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||||
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
|
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
|
||||||
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
|
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
|
||||||
|
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
|
||||||
|
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
|
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
|
||||||
|
@ -19,6 +19,7 @@ var Keys schema.ProgramConfig = schema.ProgramConfig{
|
|||||||
EmbedStaticFiles: true,
|
EmbedStaticFiles: true,
|
||||||
DBDriver: "sqlite3",
|
DBDriver: "sqlite3",
|
||||||
DB: "./var/job.db",
|
DB: "./var/job.db",
|
||||||
|
Nats: nil,
|
||||||
Archive: json.RawMessage(`{\"kind\":\"file\",\"path\":\"./var/job-archive\"}`),
|
Archive: json.RawMessage(`{\"kind\":\"file\",\"path\":\"./var/job-archive\"}`),
|
||||||
DisableArchive: false,
|
DisableArchive: false,
|
||||||
Validate: false,
|
Validate: false,
|
||||||
|
177
internal/natsMessenger/natsMessenger.go
Normal file
177
internal/natsMessenger/natsMessenger.go
Normal file
@ -0,0 +1,177 @@
|
|||||||
|
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||||
|
// All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
package natsMessenger
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
|
"github.com/nats-io/nats-server/v2/server"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
// JobRepository *repository.JobRepository
|
||||||
|
// Authentication *auth.Authentication
|
||||||
|
type NatsMessenger struct {
|
||||||
|
Server *server.Server
|
||||||
|
Connection *nats.Conn
|
||||||
|
Subscriptions []*nats.Subscription
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(config *schema.NatsConfig) (nm *NatsMessenger, err error) {
|
||||||
|
return SetupNatsMessenger(config)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartJobNatsMessage model
|
||||||
|
type DevNatsMessage struct {
|
||||||
|
Content string `json:"content"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartJobNatsMessage model
|
||||||
|
type StartJobNatsMessage struct {
|
||||||
|
schema.BaseJob
|
||||||
|
ID *int64 `json:"id,omitempty"`
|
||||||
|
Statistics map[string]schema.JobStatistics `json:"statistics"`
|
||||||
|
StartTime int64 `json:"startTime" db:"start_time" example:"1649723812" minimum:"1"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// StopJobNatsMessage model
|
||||||
|
type StopJobNatsMessage struct {
|
||||||
|
JobId *int64 `json:"jobId" example:"123000"`
|
||||||
|
Cluster *string `json:"cluster" example:"fritz"`
|
||||||
|
StartTime *int64 `json:"startTime" example:"1649723812"`
|
||||||
|
State schema.JobState `json:"jobState" validate:"required" example:"completed"`
|
||||||
|
StopTime int64 `json:"stopTime" validate:"required" example:"1649763839"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteJobNatsMessage model
|
||||||
|
type DeleteJobNatsMessage struct {
|
||||||
|
JobId *int64 `json:"jobId" validate:"required" example:"123000"` // Cluster Job ID of job
|
||||||
|
Cluster *string `json:"cluster" example:"fritz"` // Cluster of job
|
||||||
|
StartTime *int64 `json:"startTime" example:"1649723812"` // Start Time of job as epoch
|
||||||
|
}
|
||||||
|
|
||||||
|
// jobEventNatsMessage model
|
||||||
|
type ReceiveEventNatsMessage struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check auth and setup listeners to channels
|
||||||
|
|
||||||
|
// ns *server.Server, nc *nats.Conn, subs []*nats.Subscription, err error
|
||||||
|
func SetupNatsMessenger(config *schema.NatsConfig) (nm *NatsMessenger, err error) {
|
||||||
|
// Check if Config present
|
||||||
|
if config == nil {
|
||||||
|
log.Info("No NATS config found: Skip NATS init.")
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Init Raw
|
||||||
|
nmr := NatsMessenger{
|
||||||
|
Server: nil,
|
||||||
|
Connection: nil,
|
||||||
|
Subscriptions: []*nats.Subscription{},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start Nats Server
|
||||||
|
// Note: You can configure things like Host, Port, Authorization, and much more using server.Options.
|
||||||
|
opts := &server.Options{Port: config.Port}
|
||||||
|
nmr.Server, err = server.NewServer(opts)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Error("nats server error on creation")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
go nmr.Server.Start()
|
||||||
|
|
||||||
|
if !nmr.Server.ReadyForConnections(4 * time.Second) {
|
||||||
|
log.Error("nats server not ready for connection")
|
||||||
|
return nil, fmt.Errorf("nats server not ready for connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect
|
||||||
|
var copts []nats.Option
|
||||||
|
nmr.Connection, err = nats.Connect(nmr.Server.ClientURL(), copts...)
|
||||||
|
if nmr.Connection == nil {
|
||||||
|
nmr.Server.Shutdown()
|
||||||
|
log.Error("nats connection could not be established: nats shut down")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe
|
||||||
|
sub, err := startJobListener(nmr.Connection)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("startJobListener subscription error")
|
||||||
|
return nil, err
|
||||||
|
} else {
|
||||||
|
log.Infof("NATS subscription to 'start-job' on port '%d' established\n", config.Port)
|
||||||
|
nmr.Subscriptions = append(nmr.Subscriptions, sub)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &nmr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nm *NatsMessenger) StopNatsMessenger() {
|
||||||
|
for _, sub := range nm.Subscriptions {
|
||||||
|
err := sub.Unsubscribe()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("NATS unsubscribe failed: %s", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nm.Connection.Close()
|
||||||
|
nm.Server.Shutdown()
|
||||||
|
log.Info("NATS connections closed and server shut down")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Listeners: Subscribe to specified channels for actions
|
||||||
|
|
||||||
|
func startJobListener(conn *nats.Conn) (sub *nats.Subscription, err error) {
|
||||||
|
|
||||||
|
sub, err = conn.Subscribe("start-job", func(m *nats.Msg) {
|
||||||
|
var job DevNatsMessage
|
||||||
|
if err := json.Unmarshal(m.Data, &job); err != nil {
|
||||||
|
log.Error("Error while unmarshaling raw json nats message content")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := startJobHandler(job); err != nil {
|
||||||
|
log.Errorf("error: %s", err.Error())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else {
|
||||||
|
return sub, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nm *NatsMessenger) stopJobListener(conn *nats.Conn) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nm *NatsMessenger) deleteJobListener(conn *nats.Conn) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nm *NatsMessenger) jobEventListener(conn *nats.Conn) {
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handlers: Take content of message and perform action, e.g. adding job in db
|
||||||
|
|
||||||
|
func startJobHandler(job DevNatsMessage) (err error) {
|
||||||
|
log.Debugf("CALLED HANDLER FOR startJob: %s", job.Content)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nm *NatsMessenger) stopJobHandler() {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nm *NatsMessenger) deleteJobHandler() {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nm *NatsMessenger) jobEventHandler() {
|
||||||
|
}
|
@ -76,6 +76,11 @@ type Retention struct {
|
|||||||
IncludeDB bool `json:"includeDB"`
|
IncludeDB bool `json:"includeDB"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type NatsConfig struct {
|
||||||
|
// Port of the nats server
|
||||||
|
Port int `json:"port"`
|
||||||
|
}
|
||||||
|
|
||||||
// Format of the configuration (file). See below for the defaults.
|
// Format of the configuration (file). See below for the defaults.
|
||||||
type ProgramConfig struct {
|
type ProgramConfig struct {
|
||||||
// Address where the http (or https) server will listen on (for example: 'localhost:80').
|
// Address where the http (or https) server will listen on (for example: 'localhost:80').
|
||||||
@ -103,6 +108,9 @@ type ProgramConfig struct {
|
|||||||
// For sqlite3 a filename, for mysql a DSN in this format: https://github.com/go-sql-driver/mysql#dsn-data-source-name (Without query parameters!).
|
// For sqlite3 a filename, for mysql a DSN in this format: https://github.com/go-sql-driver/mysql#dsn-data-source-name (Without query parameters!).
|
||||||
DB string `json:"db"`
|
DB string `json:"db"`
|
||||||
|
|
||||||
|
// Nats Config: If !nil, start NATS-Server on startup
|
||||||
|
Nats *NatsConfig `json:"nats-config"`
|
||||||
|
|
||||||
// Config for job archive
|
// Config for job archive
|
||||||
Archive json.RawMessage `json:"archive"`
|
Archive json.RawMessage `json:"archive"`
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user