diff --git a/go.mod b/go.mod index 12fc2fd..664a2f4 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.16.5 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -65,6 +66,9 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect + github.com/nats-io/nats.go v1.27.1 // indirect + github.com/nats-io/nkeys v0.4.4 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_model v0.3.0 // indirect diff --git a/go.sum b/go.sum index 691fef1..899a037 100644 --- a/go.sum +++ b/go.sum @@ -465,6 +465,7 @@ github.com/go-ldap/ldap/v3 v3.4.4/go.mod h1:fe1MsuN5eJJ1FeLT/LEBVdWfNWKh459R7aXg github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= @@ -595,6 +596,7 @@ github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8l github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= @@ -819,6 +821,8 @@ github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdY github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= +github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -938,6 +942,12 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/nakagami/firebirdsql v0.0.0-20190310045651-3c02a58cfed8/go.mod h1:86wM1zFnC6/uDBfZGNwB65O+pR2OFi5q/YQaEUid1qA= +github.com/nats-io/nats.go v1.27.1 h1:OuYnal9aKVSnOzLQIzf7554OXMCG7KbaTkCSBHRcSoo= +github.com/nats-io/nats.go v1.27.1/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= +github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= +github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= github.com/neo4j/neo4j-go-driver v1.8.1-0.20200803113522-b626aa943eba/go.mod h1:ncO5VaFWh0Nrt+4KT4mOZboaczBZcLuHrG+/sUeP8gI= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= @@ -1175,6 +1185,7 @@ github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX github.com/urfave/cli/v2 v2.8.1/go.mod h1:Z41J9TPoffeoqP0Iza0YbAhGvymRdZAd2uPmZ5JxRdY= github.com/urfave/cli/v2 v2.24.4 h1:0gyJJEBYtCV87zI/x2nZCPyDxD51K6xM8SkwjHFCNEU= github.com/urfave/cli/v2 v2.24.4/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/vektah/gqlparser/v2 v2.5.1 h1:ZGu+bquAY23jsxDRcYpWjttRZrUz07LbiY77gUOHcr4= github.com/vektah/gqlparser/v2 v2.5.1/go.mod h1:mPgqFBu/woKTVYWyNk8cO3kh4S/f4aRFZrvOnp3hmCs= github.com/vishvananda/netlink v0.0.0-20181108222139-023a6dafdcdf/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk= diff --git a/internal/scheduler/slurmNats.go b/internal/scheduler/slurmNats.go index 1fabab2..c8511a6 100644 --- a/internal/scheduler/slurmNats.go +++ b/internal/scheduler/slurmNats.go @@ -4,7 +4,14 @@ // license that can be found in the LICENSE file. package scheduler -import "encoding/json" +import ( + "encoding/json" + "log" + "strings" + "sync" + + "github.com/nats-io/nats.go" +) type SlurmNatsConfig struct { URL string `json:"url"` @@ -15,6 +22,53 @@ type SlurmNatsScheduler struct { } func (sd *SlurmNatsScheduler) Init(rawConfig json.RawMessage) error { + servers := []string{"nats://127.0.0.1:4222", "nats://127.0.0.1:1223"} + + nc, err := nats.Connect(strings.Join(servers, ",")) + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + getStatusTxt := func(nc *nats.Conn) string { + switch nc.Status() { + case nats.CONNECTED: + return "Connected" + case nats.CLOSED: + return "Closed" + default: + return "Other" + } + } + log.Printf("The connection status is %v\n", getStatusTxt(nc)) + + ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) + if err != nil { + log.Fatal(err) + } + defer ec.Close() + + // Define the object + type encodedMessage struct { + ServerName string + ResponseCode int + } + + wg := sync.WaitGroup{} + wg.Add(1) + + // Subscribe + if _, err := ec.Subscribe("stopJob", func(s *encodedMessage) { + log.Printf("Server Name: %s - Response Code: %v", s.ServerName, s.ResponseCode) + if s.ResponseCode == 500 { + wg.Done() + } + }); err != nil { + log.Fatal(err) + } + + // Wait for a message to come in + wg.Wait() return nil } diff --git a/tools/nats-manager/main.go b/tools/nats-manager/main.go new file mode 100644 index 0000000..d27bbab --- /dev/null +++ b/tools/nats-manager/main.go @@ -0,0 +1,82 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package main + +import ( + "flag" + "fmt" + "log" + "os" + + "github.com/ClusterCockpit/cc-backend/internal/scheduler" + "github.com/nats-io/nats.go" +) + +func usage() { + log.Printf("Usage: nats-pub [-s server] [-creds file] \n") + flag.PrintDefaults() +} + +func showUsageAndExit(exitcode int) { + usage() + os.Exit(exitcode) +} + +func setupPublisher() { + var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)") + var userCreds = flag.String("creds", "", "User Credentials File") + var showHelp = flag.Bool("h", false, "Show help message") + + log.SetFlags(0) + flag.Usage = usage + flag.Parse() + + if *showHelp { + showUsageAndExit(0) + } + + args := flag.Args() + if len(args) != 2 { + showUsageAndExit(1) + } + + fmt.Printf("Hello Nats\n") + + // Connect Options. + opts := []nats.Option{nats.Name("NATS Sample Publisher")} + + // Use UserCredentials + if *userCreds != "" { + opts = append(opts, nats.UserCredentials(*userCreds)) + } + + // Connect to NATS + nc, err := nats.Connect(*urls, opts...) + if err != nil { + log.Fatal(err) + } + defer nc.Close() + subj, msg := args[0], []byte(args[1]) + + nc.Publish(subj, msg) + nc.Flush() + + if err := nc.LastError(); err != nil { + log.Fatal(err) + } else { + log.Printf("Published [%s] : '%s'\n", subj, msg) + } + + os.Exit(0) +} + +func main() { + cfgData := []byte(`{"target": "localhost"}`) + + var sch scheduler.SlurmNatsScheduler + // sch.URL = "nats://127.0.0.1:1223" + sch.Init(cfgData) + os.Exit(0) +}