From 1b9cb8955c19aaeffa892a7e57a2964cff73966e Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Tue, 18 May 2021 15:16:10 +0200 Subject: [PATCH] Hand over full config to Sink and Receiver --- metric-collector.go | 54 ++++++++++++++++++++++++++---------- sinks/influxSink.go | 44 +++++++++++++++++++---------- sinks/natsSink.go | 67 +++++++++++++++++++++++++++++---------------- sinks/sink.go | 23 ++++++++++++---- sinks/stdoutSink.go | 19 +++++-------- 5 files changed, 137 insertions(+), 70 deletions(-) diff --git a/metric-collector.go b/metric-collector.go index 47fddfd..381b17a 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "github.com/ClusterCockpit/cc-metric-collector/collectors" + "github.com/ClusterCockpit/cc-metric-collector/receivers" "github.com/ClusterCockpit/cc-metric-collector/sinks" "log" "os" @@ -34,19 +35,17 @@ var Sinks = map[string]sinks.SinkFuncs{ "nats": &sinks.NatsSink{}, } +var Receivers = map[string]receivers.ReceiverFuncs{ + "nats": &receivers.NatsReceiver{}, +} + // Structure of the configuration file type GlobalConfig struct { - Sink struct { - User string `json:"user"` - Password string `json:"password"` - Host string `json:"host"` - Port string `json:"port"` - Database string `json:"database"` - Type string `json:"type"` - } `json:"sink"` - Interval int `json:"interval"` - Duration int `json:"duration"` - Collectors []string `json:"collectors"` + Sink sinks.SinkConfig `json:"sink"` + Interval int `json:"interval"` + Duration int `json:"duration"` + Collectors []string `json:"collectors"` + Receiver receivers.ReceiverConfig `json:"receiver"` } // Load JSON configuration file @@ -91,7 +90,7 @@ func SetLogging(logfile string) error { // Register an interrupt handler for Ctrl+C and similar. At signal, // all collectors are closed -func shutdown(wg *sync.WaitGroup, config *GlobalConfig, sink sinks.SinkFuncs) { +func shutdown(wg *sync.WaitGroup, config *GlobalConfig, sink sinks.SinkFuncs, recv receivers.ReceiverFuncs) { sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt) @@ -104,6 +103,9 @@ func shutdown(wg *sync.WaitGroup, config *GlobalConfig, sink sinks.SinkFuncs) { col.Close() } time.Sleep(1 * time.Second) + if recv != nil { + recv.Close() + } sink.Close() wg.Done() }(wg) @@ -112,6 +114,9 @@ func shutdown(wg *sync.WaitGroup, config *GlobalConfig, sink sinks.SinkFuncs) { func main() { var config GlobalConfig var wg sync.WaitGroup + var recv receivers.ReceiverFuncs = nil + var use_recv bool + use_recv = false wg.Add(1) host, err := os.Hostname() if err != nil { @@ -159,13 +164,29 @@ func main() { } // Setup sink sink := Sinks[config.Sink.Type] - err = sink.Init(config.Sink.Host, config.Sink.Port, config.Sink.User, config.Sink.Password, config.Sink.Database) + err = sink.Init(config.Sink) if err != nil { + log.Print(err) return } + // Setup receiver + if config.Receiver.Type != "none" { + if _, found := Receivers[config.Receiver.Type]; !found { + log.Print("Invalid receiver type '", config.Receiver.Type, "' in configuration") + return + } else { + recv = Receivers[config.Receiver.Type] + err = recv.Init(config.Receiver, sink) + if err == nil { + use_recv = true + } else { + log.Print(err) + } + } + } // Register interrupt handler - shutdown(&wg, &config, sink) + shutdown(&wg, &config, sink, recv) // Initialize all collectors tmp := make([]string, 0) @@ -203,6 +224,11 @@ func main() { cpuFields[s] = make(map[string]interface{}) } + // Start receiver + if use_recv { + recv.Start() + } + go func() { for { select { diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 628a403..c5d23d4 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -2,6 +2,7 @@ package sinks import ( "context" + "errors" "fmt" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" @@ -11,27 +12,42 @@ import ( type InfluxSink struct { Sink - client influxdb2.Client - writeApi influxdb2Api.WriteAPIBlocking - retPolicy string - organization string + client influxdb2.Client + writeApi influxdb2Api.WriteAPIBlocking + retPolicy string } -func (s *InfluxSink) Init(host string, port string, user string, password string, database string) error { - s.host = host - s.port = port - s.user = user - s.password = password - s.database = database - s.organization = "" - uri := fmt.Sprintf("http://%s:%s", host, port) - auth := fmt.Sprintf("%s:%s", user, password) - log.Print("Using URI ", uri, " for connection") +func (s *InfluxSink) connect() error { + var auth string + uri := fmt.Sprintf("http://%s:%s", s.host, s.port) + if len(s.user) == 0 { + auth = s.password + } else { + auth = fmt.Sprintf("%s:%s", s.user, s.password) + } + log.Print("Using URI ", uri, " Org ", s.organization, " Bucket ", s.database) s.client = influxdb2.NewClient(uri, auth) s.writeApi = s.client.WriteAPIBlocking(s.organization, s.database) return nil } +func (s *InfluxSink) Init(config SinkConfig) error { + if len(config.Host) == 0 || + len(config.Port) == 0 || + len(config.Database) == 0 || + len(config.Organization) == 0 || + len(config.Password) == 0 { + return errors.New("Not all configuration variables set required by InfluxSink") + } + s.host = config.Host + s.port = config.Port + s.database = config.Database + s.organization = config.Organization + s.user = config.User + s.password = config.Password + return s.connect() +} + func (s *InfluxSink) Write(measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error { p := influxdb2.NewPoint(measurement, tags, fields, t) err := s.writeApi.WritePoint(context.Background(), p) diff --git a/sinks/natsSink.go b/sinks/natsSink.go index 1e3bd99..6a938b8 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -2,6 +2,7 @@ package sinks import ( "bytes" + "errors" "fmt" protocol "github.com/influxdata/line-protocol" nats "github.com/nats-io/nats.go" @@ -16,21 +17,11 @@ type NatsSink struct { buffer *bytes.Buffer } -func (s *NatsSink) Init(host string, port string, user string, password string, database string) error { - s.host = host - s.port = port - s.user = user - s.password = password - s.database = database - // Setup Influx line protocol - s.buffer = &bytes.Buffer{} - s.buffer.Grow(1025) - s.encoder = protocol.NewEncoder(s.buffer) - s.encoder.SetPrecision(time.Second) - s.encoder.SetMaxLineBytes(1024) - // Setup infos for connection +func (s *NatsSink) connect() error { uinfo := nats.UserInfo(s.user, s.password) uri := fmt.Sprintf("nats://%s:%s", s.host, s.port) + log.Print("Using URI ", uri) + s.client = nil nc, err := nats.Connect(uri, uinfo) if err != nil { log.Fatal(err) @@ -40,21 +31,49 @@ func (s *NatsSink) Init(host string, port string, user string, password string, return nil } +func (s *NatsSink) Init(config SinkConfig) error { + if len(config.Host) == 0 || + len(config.Port) == 0 || + len(config.Database) == 0 { + return errors.New("Not all configuration variables set required by NatsSink") + } + s.host = config.Host + s.port = config.Port + s.database = config.Database + s.organization = config.Organization + s.user = config.User + s.password = config.Password + // Setup Influx line protocol + s.buffer = &bytes.Buffer{} + s.buffer.Grow(1025) + s.encoder = protocol.NewEncoder(s.buffer) + s.encoder.SetPrecision(time.Second) + s.encoder.SetMaxLineBytes(1024) + // Setup infos for connection + return s.connect() +} + func (s *NatsSink) Write(measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error { - m, err := protocol.New(measurement, tags, fields, t) - if err != nil { - log.Print(err) - return err + if s.client != nil { + m, err := protocol.New(measurement, tags, fields, t) + if err != nil { + log.Print(err) + return err + } + _, err = s.encoder.Encode(m) + if err != nil { + log.Print(err) + return err + } + s.client.Publish(s.database, s.buffer.Bytes()) + } - _, err = s.encoder.Encode(m) - if err != nil { - log.Print(err) - return err - } - s.client.Publish(s.database, s.buffer.Bytes()) return nil } func (s *NatsSink) Close() { - s.client.Close() + log.Print("Closing Nats connection") + if s.client != nil { + s.client.Close() + } } diff --git a/sinks/sink.go b/sinks/sink.go index 1efdc81..5c84d8c 100644 --- a/sinks/sink.go +++ b/sinks/sink.go @@ -4,16 +4,27 @@ import ( "time" ) +type SinkConfig struct { + Host string `json:"host"` + Port string `json:"port"` + Database string `json:"database"` + User string `json:"user"` + Password string `json:"password"` + Organization string `json:"organization"` + Type string `json:"type"` +} + type Sink struct { - host string - port string - user string - password string - database string + host string + port string + user string + password string + database string + organization string } type SinkFuncs interface { - Init(host string, port string, user string, password string, database string) error + Init(config SinkConfig) error Write(measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error Close() } diff --git a/sinks/stdoutSink.go b/sinks/stdoutSink.go index b255dca..20ec628 100644 --- a/sinks/stdoutSink.go +++ b/sinks/stdoutSink.go @@ -11,12 +11,7 @@ type StdoutSink struct { Sink } -func (s *StdoutSink) Init(host string, port string, user string, password string, database string) error { - s.host = host - s.port = port - s.user = user - s.password = password - s.database = database +func (s *StdoutSink) Init(config SinkConfig) error { return nil } @@ -27,13 +22,13 @@ func (s *StdoutSink) Write(measurement string, tags map[string]string, fields ma tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", k, v)) } for k, v := range fields { - switch v.(type) { - case float64: - if !math.IsNaN(v.(float64)) { - fieldstr = append(fieldstr, fmt.Sprintf("%s=%v", k, v.(float64))) - } + switch v.(type) { + case float64: + if !math.IsNaN(v.(float64)) { + fieldstr = append(fieldstr, fmt.Sprintf("%s=%v", k, v.(float64))) + } case string: - fieldstr = append(fieldstr, fmt.Sprintf("%s=%q", k, v.(string))) + fieldstr = append(fieldstr, fmt.Sprintf("%s=%q", k, v.(string))) } } if len(tagsstr) > 0 {