mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-11-03 18:25:07 +01:00 
			
		
		
		
	* Use common configuration format of 'receiver_name' -> 'receiver_config' * Adjust receiver configuration files
		
			
				
	
	
		
			94 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			94 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package receivers
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/json"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"time"
 | 
						|
 | 
						|
	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
 | 
						|
	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
 | 
						|
	influx "github.com/influxdata/line-protocol"
 | 
						|
	nats "github.com/nats-io/nats.go"
 | 
						|
)
 | 
						|
 | 
						|
type NatsReceiverConfig struct {
 | 
						|
	Type    string `json:"type"`
 | 
						|
	Addr    string `json:"address"`
 | 
						|
	Port    string `json:"port"`
 | 
						|
	Subject string `json:"subject"`
 | 
						|
}
 | 
						|
 | 
						|
type NatsReceiver struct {
 | 
						|
	receiver
 | 
						|
	nc      *nats.Conn
 | 
						|
	handler *influx.MetricHandler
 | 
						|
	parser  *influx.Parser
 | 
						|
	meta    map[string]string
 | 
						|
	config  NatsReceiverConfig
 | 
						|
}
 | 
						|
 | 
						|
var DefaultTime = func() time.Time {
 | 
						|
	return time.Unix(42, 0)
 | 
						|
}
 | 
						|
 | 
						|
func (r *NatsReceiver) Init(name string, config json.RawMessage) error {
 | 
						|
	r.typename = "NatsReceiver"
 | 
						|
	r.name = name
 | 
						|
	r.config.Addr = nats.DefaultURL
 | 
						|
	r.config.Port = "4222"
 | 
						|
	if len(config) > 0 {
 | 
						|
		err := json.Unmarshal(config, &r.config)
 | 
						|
		if err != nil {
 | 
						|
			cclog.ComponentError(r.name, "Error reading config:", err.Error())
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if len(r.config.Addr) == 0 ||
 | 
						|
		len(r.config.Port) == 0 ||
 | 
						|
		len(r.config.Subject) == 0 {
 | 
						|
		return errors.New("not all configuration variables set required by NatsReceiver")
 | 
						|
	}
 | 
						|
	r.meta = map[string]string{"source": r.name}
 | 
						|
	uri := fmt.Sprintf("%s:%s", r.config.Addr, r.config.Port)
 | 
						|
	cclog.ComponentDebug(r.name, "INIT", uri, "Subject", r.config.Subject)
 | 
						|
	nc, err := nats.Connect(uri)
 | 
						|
	if err == nil {
 | 
						|
		r.nc = nc
 | 
						|
	} else {
 | 
						|
		r.nc = nil
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	r.handler = influx.NewMetricHandler()
 | 
						|
	r.parser = influx.NewParser(r.handler)
 | 
						|
	r.parser.SetTimeFunc(DefaultTime)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (r *NatsReceiver) Start() {
 | 
						|
	cclog.ComponentDebug(r.name, "START")
 | 
						|
	r.nc.Subscribe(r.config.Subject, r._NatsReceive)
 | 
						|
}
 | 
						|
 | 
						|
func (r *NatsReceiver) _NatsReceive(m *nats.Msg) {
 | 
						|
	metrics, err := r.parser.Parse(m.Data)
 | 
						|
	if err == nil {
 | 
						|
		for _, m := range metrics {
 | 
						|
			y := lp.FromInfluxMetric(m)
 | 
						|
			for k, v := range r.meta {
 | 
						|
				y.AddMeta(k, v)
 | 
						|
			}
 | 
						|
			if r.sink != nil {
 | 
						|
				r.sink <- y
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (r *NatsReceiver) Close() {
 | 
						|
	if r.nc != nil {
 | 
						|
		cclog.ComponentDebug(r.name, "CLOSE")
 | 
						|
		r.nc.Close()
 | 
						|
	}
 | 
						|
}
 |