mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-11-04 02:35:07 +01:00 
			
		
		
		
	Hand over full config to Sink and Receiver
This commit is contained in:
		@@ -5,6 +5,7 @@ import (
 | 
			
		||||
	"flag"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"github.com/ClusterCockpit/cc-metric-collector/collectors"
 | 
			
		||||
	"github.com/ClusterCockpit/cc-metric-collector/receivers"
 | 
			
		||||
	"github.com/ClusterCockpit/cc-metric-collector/sinks"
 | 
			
		||||
	"log"
 | 
			
		||||
	"os"
 | 
			
		||||
@@ -34,19 +35,17 @@ var Sinks = map[string]sinks.SinkFuncs{
 | 
			
		||||
	"nats":     &sinks.NatsSink{},
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var Receivers = map[string]receivers.ReceiverFuncs{
 | 
			
		||||
	"nats": &receivers.NatsReceiver{},
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Structure of the configuration file
 | 
			
		||||
type GlobalConfig struct {
 | 
			
		||||
	Sink struct {
 | 
			
		||||
		User     string `json:"user"`
 | 
			
		||||
		Password string `json:"password"`
 | 
			
		||||
		Host     string `json:"host"`
 | 
			
		||||
		Port     string `json:"port"`
 | 
			
		||||
		Database string `json:"database"`
 | 
			
		||||
		Type     string `json:"type"`
 | 
			
		||||
	} `json:"sink"`
 | 
			
		||||
	Interval   int      `json:"interval"`
 | 
			
		||||
	Duration   int      `json:"duration"`
 | 
			
		||||
	Collectors []string `json:"collectors"`
 | 
			
		||||
	Sink       sinks.SinkConfig         `json:"sink"`
 | 
			
		||||
	Interval   int                      `json:"interval"`
 | 
			
		||||
	Duration   int                      `json:"duration"`
 | 
			
		||||
	Collectors []string                 `json:"collectors"`
 | 
			
		||||
	Receiver   receivers.ReceiverConfig `json:"receiver"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Load JSON configuration file
 | 
			
		||||
@@ -91,7 +90,7 @@ func SetLogging(logfile string) error {
 | 
			
		||||
 | 
			
		||||
// Register an interrupt handler for Ctrl+C and similar. At signal,
 | 
			
		||||
// all collectors are closed
 | 
			
		||||
func shutdown(wg *sync.WaitGroup, config *GlobalConfig, sink sinks.SinkFuncs) {
 | 
			
		||||
func shutdown(wg *sync.WaitGroup, config *GlobalConfig, sink sinks.SinkFuncs, recv receivers.ReceiverFuncs) {
 | 
			
		||||
	sigs := make(chan os.Signal, 1)
 | 
			
		||||
	signal.Notify(sigs, os.Interrupt)
 | 
			
		||||
 | 
			
		||||
@@ -104,6 +103,9 @@ func shutdown(wg *sync.WaitGroup, config *GlobalConfig, sink sinks.SinkFuncs) {
 | 
			
		||||
			col.Close()
 | 
			
		||||
		}
 | 
			
		||||
		time.Sleep(1 * time.Second)
 | 
			
		||||
		if recv != nil {
 | 
			
		||||
			recv.Close()
 | 
			
		||||
		}
 | 
			
		||||
		sink.Close()
 | 
			
		||||
		wg.Done()
 | 
			
		||||
	}(wg)
 | 
			
		||||
@@ -112,6 +114,9 @@ func shutdown(wg *sync.WaitGroup, config *GlobalConfig, sink sinks.SinkFuncs) {
 | 
			
		||||
func main() {
 | 
			
		||||
	var config GlobalConfig
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	var recv receivers.ReceiverFuncs = nil
 | 
			
		||||
	var use_recv bool
 | 
			
		||||
	use_recv = false
 | 
			
		||||
	wg.Add(1)
 | 
			
		||||
	host, err := os.Hostname()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -159,13 +164,29 @@ func main() {
 | 
			
		||||
	}
 | 
			
		||||
	// Setup sink
 | 
			
		||||
	sink := Sinks[config.Sink.Type]
 | 
			
		||||
	err = sink.Init(config.Sink.Host, config.Sink.Port, config.Sink.User, config.Sink.Password, config.Sink.Database)
 | 
			
		||||
	err = sink.Init(config.Sink)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Print(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	// Setup receiver
 | 
			
		||||
	if config.Receiver.Type != "none" {
 | 
			
		||||
		if _, found := Receivers[config.Receiver.Type]; !found {
 | 
			
		||||
			log.Print("Invalid receiver type '", config.Receiver.Type, "' in configuration")
 | 
			
		||||
			return
 | 
			
		||||
		} else {
 | 
			
		||||
			recv = Receivers[config.Receiver.Type]
 | 
			
		||||
			err = recv.Init(config.Receiver, sink)
 | 
			
		||||
			if err == nil {
 | 
			
		||||
				use_recv = true
 | 
			
		||||
			} else {
 | 
			
		||||
				log.Print(err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Register interrupt handler
 | 
			
		||||
	shutdown(&wg, &config, sink)
 | 
			
		||||
	shutdown(&wg, &config, sink, recv)
 | 
			
		||||
 | 
			
		||||
	// Initialize all collectors
 | 
			
		||||
	tmp := make([]string, 0)
 | 
			
		||||
@@ -203,6 +224,11 @@ func main() {
 | 
			
		||||
		cpuFields[s] = make(map[string]interface{})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Start receiver
 | 
			
		||||
	if use_recv {
 | 
			
		||||
		recv.Start()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			select {
 | 
			
		||||
 
 | 
			
		||||
@@ -2,6 +2,7 @@ package sinks
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
 | 
			
		||||
	influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
 | 
			
		||||
@@ -11,27 +12,42 @@ import (
 | 
			
		||||
 | 
			
		||||
type InfluxSink struct {
 | 
			
		||||
	Sink
 | 
			
		||||
	client       influxdb2.Client
 | 
			
		||||
	writeApi     influxdb2Api.WriteAPIBlocking
 | 
			
		||||
	retPolicy    string
 | 
			
		||||
	organization string
 | 
			
		||||
	client    influxdb2.Client
 | 
			
		||||
	writeApi  influxdb2Api.WriteAPIBlocking
 | 
			
		||||
	retPolicy string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *InfluxSink) Init(host string, port string, user string, password string, database string) error {
 | 
			
		||||
	s.host = host
 | 
			
		||||
	s.port = port
 | 
			
		||||
	s.user = user
 | 
			
		||||
	s.password = password
 | 
			
		||||
	s.database = database
 | 
			
		||||
	s.organization = ""
 | 
			
		||||
	uri := fmt.Sprintf("http://%s:%s", host, port)
 | 
			
		||||
	auth := fmt.Sprintf("%s:%s", user, password)
 | 
			
		||||
	log.Print("Using URI ", uri, " for connection")
 | 
			
		||||
func (s *InfluxSink) connect() error {
 | 
			
		||||
	var auth string
 | 
			
		||||
	uri := fmt.Sprintf("http://%s:%s", s.host, s.port)
 | 
			
		||||
	if len(s.user) == 0 {
 | 
			
		||||
		auth = s.password
 | 
			
		||||
	} else {
 | 
			
		||||
		auth = fmt.Sprintf("%s:%s", s.user, s.password)
 | 
			
		||||
	}
 | 
			
		||||
	log.Print("Using URI ", uri, " Org ", s.organization, " Bucket ", s.database)
 | 
			
		||||
	s.client = influxdb2.NewClient(uri, auth)
 | 
			
		||||
	s.writeApi = s.client.WriteAPIBlocking(s.organization, s.database)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *InfluxSink) Init(config SinkConfig) error {
 | 
			
		||||
	if len(config.Host) == 0 ||
 | 
			
		||||
		len(config.Port) == 0 ||
 | 
			
		||||
		len(config.Database) == 0 ||
 | 
			
		||||
		len(config.Organization) == 0 ||
 | 
			
		||||
		len(config.Password) == 0 {
 | 
			
		||||
		return errors.New("Not all configuration variables set required by InfluxSink")
 | 
			
		||||
	}
 | 
			
		||||
	s.host = config.Host
 | 
			
		||||
	s.port = config.Port
 | 
			
		||||
	s.database = config.Database
 | 
			
		||||
	s.organization = config.Organization
 | 
			
		||||
	s.user = config.User
 | 
			
		||||
	s.password = config.Password
 | 
			
		||||
	return s.connect()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *InfluxSink) Write(measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error {
 | 
			
		||||
	p := influxdb2.NewPoint(measurement, tags, fields, t)
 | 
			
		||||
	err := s.writeApi.WritePoint(context.Background(), p)
 | 
			
		||||
 
 | 
			
		||||
@@ -2,6 +2,7 @@ package sinks
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	protocol "github.com/influxdata/line-protocol"
 | 
			
		||||
	nats "github.com/nats-io/nats.go"
 | 
			
		||||
@@ -16,21 +17,11 @@ type NatsSink struct {
 | 
			
		||||
	buffer  *bytes.Buffer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *NatsSink) Init(host string, port string, user string, password string, database string) error {
 | 
			
		||||
	s.host = host
 | 
			
		||||
	s.port = port
 | 
			
		||||
	s.user = user
 | 
			
		||||
	s.password = password
 | 
			
		||||
	s.database = database
 | 
			
		||||
	// Setup Influx line protocol
 | 
			
		||||
	s.buffer = &bytes.Buffer{}
 | 
			
		||||
	s.buffer.Grow(1025)
 | 
			
		||||
	s.encoder = protocol.NewEncoder(s.buffer)
 | 
			
		||||
	s.encoder.SetPrecision(time.Second)
 | 
			
		||||
	s.encoder.SetMaxLineBytes(1024)
 | 
			
		||||
	// Setup infos for connection
 | 
			
		||||
func (s *NatsSink) connect() error {
 | 
			
		||||
	uinfo := nats.UserInfo(s.user, s.password)
 | 
			
		||||
	uri := fmt.Sprintf("nats://%s:%s", s.host, s.port)
 | 
			
		||||
	log.Print("Using URI ", uri)
 | 
			
		||||
	s.client = nil
 | 
			
		||||
	nc, err := nats.Connect(uri, uinfo)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Fatal(err)
 | 
			
		||||
@@ -40,21 +31,49 @@ func (s *NatsSink) Init(host string, port string, user string, password string,
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *NatsSink) Init(config SinkConfig) error {
 | 
			
		||||
	if len(config.Host) == 0 ||
 | 
			
		||||
		len(config.Port) == 0 ||
 | 
			
		||||
		len(config.Database) == 0 {
 | 
			
		||||
		return errors.New("Not all configuration variables set required by NatsSink")
 | 
			
		||||
	}
 | 
			
		||||
	s.host = config.Host
 | 
			
		||||
	s.port = config.Port
 | 
			
		||||
	s.database = config.Database
 | 
			
		||||
	s.organization = config.Organization
 | 
			
		||||
	s.user = config.User
 | 
			
		||||
	s.password = config.Password
 | 
			
		||||
	// Setup Influx line protocol
 | 
			
		||||
	s.buffer = &bytes.Buffer{}
 | 
			
		||||
	s.buffer.Grow(1025)
 | 
			
		||||
	s.encoder = protocol.NewEncoder(s.buffer)
 | 
			
		||||
	s.encoder.SetPrecision(time.Second)
 | 
			
		||||
	s.encoder.SetMaxLineBytes(1024)
 | 
			
		||||
	// Setup infos for connection
 | 
			
		||||
	return s.connect()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *NatsSink) Write(measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error {
 | 
			
		||||
	m, err := protocol.New(measurement, tags, fields, t)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Print(err)
 | 
			
		||||
		return err
 | 
			
		||||
	if s.client != nil {
 | 
			
		||||
		m, err := protocol.New(measurement, tags, fields, t)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Print(err)
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		_, err = s.encoder.Encode(m)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Print(err)
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		s.client.Publish(s.database, s.buffer.Bytes())
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
	_, err = s.encoder.Encode(m)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Print(err)
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	s.client.Publish(s.database, s.buffer.Bytes())
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *NatsSink) Close() {
 | 
			
		||||
	s.client.Close()
 | 
			
		||||
	log.Print("Closing Nats connection")
 | 
			
		||||
	if s.client != nil {
 | 
			
		||||
		s.client.Close()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -4,16 +4,27 @@ import (
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type SinkConfig struct {
 | 
			
		||||
	Host         string `json:"host"`
 | 
			
		||||
	Port         string `json:"port"`
 | 
			
		||||
	Database     string `json:"database"`
 | 
			
		||||
	User         string `json:"user"`
 | 
			
		||||
	Password     string `json:"password"`
 | 
			
		||||
	Organization string `json:"organization"`
 | 
			
		||||
	Type         string `json:"type"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Sink struct {
 | 
			
		||||
	host     string
 | 
			
		||||
	port     string
 | 
			
		||||
	user     string
 | 
			
		||||
	password string
 | 
			
		||||
	database string
 | 
			
		||||
	host         string
 | 
			
		||||
	port         string
 | 
			
		||||
	user         string
 | 
			
		||||
	password     string
 | 
			
		||||
	database     string
 | 
			
		||||
	organization string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type SinkFuncs interface {
 | 
			
		||||
	Init(host string, port string, user string, password string, database string) error
 | 
			
		||||
	Init(config SinkConfig) error
 | 
			
		||||
	Write(measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error
 | 
			
		||||
	Close()
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -11,12 +11,7 @@ type StdoutSink struct {
 | 
			
		||||
	Sink
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *StdoutSink) Init(host string, port string, user string, password string, database string) error {
 | 
			
		||||
	s.host = host
 | 
			
		||||
	s.port = port
 | 
			
		||||
	s.user = user
 | 
			
		||||
	s.password = password
 | 
			
		||||
	s.database = database
 | 
			
		||||
func (s *StdoutSink) Init(config SinkConfig) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -27,13 +22,13 @@ func (s *StdoutSink) Write(measurement string, tags map[string]string, fields ma
 | 
			
		||||
		tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", k, v))
 | 
			
		||||
	}
 | 
			
		||||
	for k, v := range fields {
 | 
			
		||||
	    switch v.(type) {
 | 
			
		||||
	    case float64:
 | 
			
		||||
		    if !math.IsNaN(v.(float64)) {
 | 
			
		||||
			    fieldstr = append(fieldstr, fmt.Sprintf("%s=%v", k, v.(float64)))
 | 
			
		||||
		    }
 | 
			
		||||
		switch v.(type) {
 | 
			
		||||
		case float64:
 | 
			
		||||
			if !math.IsNaN(v.(float64)) {
 | 
			
		||||
				fieldstr = append(fieldstr, fmt.Sprintf("%s=%v", k, v.(float64)))
 | 
			
		||||
			}
 | 
			
		||||
		case string:
 | 
			
		||||
		    fieldstr = append(fieldstr, fmt.Sprintf("%s=%q", k, v.(string)))
 | 
			
		||||
			fieldstr = append(fieldstr, fmt.Sprintf("%s=%q", k, v.(string)))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if len(tagsstr) > 0 {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user