mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2024-11-14 02:37:25 +01:00
add jwt header auth for nats
This commit is contained in:
parent
d729fdfec1
commit
fafc729c86
@ -5,26 +5,30 @@
|
|||||||
package natsMessenger
|
package natsMessenger
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/ed25519"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/importer"
|
"github.com/ClusterCockpit/cc-backend/internal/importer"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||||
|
"github.com/golang-jwt/jwt/v5"
|
||||||
"github.com/nats-io/nats-server/v2/server"
|
"github.com/nats-io/nats-server/v2/server"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Authentication *auth.Authentication
|
|
||||||
type NatsMessenger struct {
|
type NatsMessenger struct {
|
||||||
Server *server.Server
|
Server *server.Server
|
||||||
Connection *nats.Conn
|
Connection *nats.Conn
|
||||||
Subscriptions []*nats.Subscription
|
Subscriptions []*nats.Subscription
|
||||||
JobRepository *repository.JobRepository
|
JobRepository *repository.JobRepository
|
||||||
|
jwtPubKey ed25519.PublicKey
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(config *schema.NatsConfig) (nm *NatsMessenger, err error) {
|
func New(config *schema.NatsConfig) (nm *NatsMessenger, err error) {
|
||||||
@ -84,6 +88,21 @@ func SetupNatsMessenger(config *schema.NatsConfig) (nm *NatsMessenger, err error
|
|||||||
Connection: nil,
|
Connection: nil,
|
||||||
Subscriptions: []*nats.Subscription{},
|
Subscriptions: []*nats.Subscription{},
|
||||||
JobRepository: repository.GetJobRepository(),
|
JobRepository: repository.GetJobRepository(),
|
||||||
|
jwtPubKey: nil,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Init JWT PubKey
|
||||||
|
pubKey := os.Getenv("JWT_PUBLIC_KEY")
|
||||||
|
if pubKey == "" {
|
||||||
|
log.Warn("environment variable 'JWT_PUBLIC_KEY' not set (token based authentication will not work for nats: abort setup)")
|
||||||
|
return nil, fmt.Errorf("environment variable 'JWT_PUBLIC_KEY' not set (token based authentication will not work for nats: abort nats setup)")
|
||||||
|
} else {
|
||||||
|
bytes, err := base64.StdEncoding.DecodeString(pubKey)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("Could not decode JWT public key")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
nmr.jwtPubKey = ed25519.PublicKey(bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start Nats Server
|
// Start Nats Server
|
||||||
@ -130,6 +149,7 @@ func (nm *NatsMessenger) StopNatsMessenger() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("NATS unsubscribe failed: %s", err.Error())
|
log.Errorf("NATS unsubscribe failed: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
sub.Drain()
|
||||||
}
|
}
|
||||||
|
|
||||||
nm.Connection.Close()
|
nm.Connection.Close()
|
||||||
@ -174,13 +194,22 @@ func (nm *NatsMessenger) setupSubscriptions() (err error) {
|
|||||||
|
|
||||||
func (nm *NatsMessenger) startJobListener() (sub *nats.Subscription, err error) {
|
func (nm *NatsMessenger) startJobListener() (sub *nats.Subscription, err error) {
|
||||||
return nm.Connection.Subscribe("start-job", func(m *nats.Msg) {
|
return nm.Connection.Subscribe("start-job", func(m *nats.Msg) {
|
||||||
|
user, err := nm.verifyMessageJWT(m)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("not authd: %s", err.Error())
|
||||||
|
m.Respond([]byte("not authd: " + err.Error()))
|
||||||
|
} else if user != nil && user.HasRole(schema.RoleApi) {
|
||||||
req := schema.JobMeta{BaseJob: schema.JobDefaults}
|
req := schema.JobMeta{BaseJob: schema.JobDefaults}
|
||||||
if err := json.Unmarshal(m.Data, &req); err != nil {
|
if err := json.Unmarshal(m.Data, &req); err != nil {
|
||||||
log.Warnf("Error while unmarshaling raw json nats message content on channel start-job: %s", err.Error())
|
log.Warnf("Error while unmarshaling raw json nats message content on channel start-job: %s", err.Error())
|
||||||
m.Respond([]byte("Error while unmarshaling raw json nats message content on channel start-job: " + err.Error()))
|
m.Respond([]byte("Error while unmarshaling raw json nats message content on channel start-job: " + err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
m.Respond(nm.startJobHandler(req))
|
m.Respond(nm.startJobHandler(req))
|
||||||
|
} else {
|
||||||
|
log.Warnf("missing role for nats")
|
||||||
|
m.Respond([]byte("missing role for nats"))
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -339,6 +368,52 @@ func (nm *NatsMessenger) jobEventHandler(req DevNatsMessage) (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Auth
|
||||||
|
|
||||||
|
func (nm *NatsMessenger) verifyMessageJWT(msg *nats.Msg) (user *schema.User, err error) {
|
||||||
|
|
||||||
|
var rawtoken string
|
||||||
|
if rawtoken = msg.Header.Get("auth"); rawtoken == "" {
|
||||||
|
return nil, errors.New("missing token")
|
||||||
|
}
|
||||||
|
|
||||||
|
token, err := jwt.Parse(rawtoken, func(t *jwt.Token) (interface{}, error) {
|
||||||
|
if t.Method != jwt.SigningMethodEdDSA {
|
||||||
|
return nil, errors.New("only Ed25519/EdDSA supported")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nm.jwtPubKey, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("Error while parsing JWT token")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !token.Valid {
|
||||||
|
log.Warn("jwt token claims are not valid")
|
||||||
|
return nil, errors.New("jwt token claims are not valid")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Token is valid, extract payload
|
||||||
|
claims := token.Claims.(jwt.MapClaims)
|
||||||
|
sub, _ := claims["sub"].(string)
|
||||||
|
|
||||||
|
// NATS: Always Validate user + roles from JWT against database
|
||||||
|
ur := repository.GetUserRepository()
|
||||||
|
user, err = ur.GetUser(sub)
|
||||||
|
// Deny any logins for unknown usernames
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("Could not find user from JWT in internal database.")
|
||||||
|
return nil, errors.New("unknown user")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &schema.User{
|
||||||
|
Username: sub,
|
||||||
|
Roles: user.Roles, // Take user roles from database instead of trusting the JWT
|
||||||
|
AuthType: schema.AuthToken,
|
||||||
|
AuthSource: -1,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Helper
|
// Helper
|
||||||
|
|
||||||
func handleErr(err error) []byte {
|
func handleErr(err error) []byte {
|
||||||
|
Loading…
Reference in New Issue
Block a user