mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2024-11-14 05:57:25 +01:00
Refactor: Embed Init() into New() function
This commit is contained in:
parent
24e12ccc57
commit
73981527d3
@ -33,41 +33,6 @@ type GangliaSink struct {
|
|||||||
config GangliaSinkConfig
|
config GangliaSinkConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *GangliaSink) Init(name string, config json.RawMessage) error {
|
|
||||||
var err error = nil
|
|
||||||
s.name = fmt.Sprintf("GangliaSink(%s)", name)
|
|
||||||
s.config.AddTagsAsDesc = false
|
|
||||||
s.config.AddGangliaGroup = false
|
|
||||||
if len(config) > 0 {
|
|
||||||
err := json.Unmarshal(config, &s.config)
|
|
||||||
if err != nil {
|
|
||||||
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
s.gmetric_path = ""
|
|
||||||
s.gmetric_config = ""
|
|
||||||
if len(s.config.GmetricPath) > 0 {
|
|
||||||
p, err := exec.LookPath(s.config.GmetricPath)
|
|
||||||
if err == nil {
|
|
||||||
s.gmetric_path = p
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(s.gmetric_path) == 0 {
|
|
||||||
p, err := exec.LookPath(string(GMETRIC_EXEC))
|
|
||||||
if err == nil {
|
|
||||||
s.gmetric_path = p
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(s.gmetric_path) == 0 {
|
|
||||||
err = errors.New("cannot find executable 'gmetric'")
|
|
||||||
}
|
|
||||||
if len(s.config.GmetricConfig) > 0 {
|
|
||||||
s.gmetric_config = s.config.GmetricConfig
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *GangliaSink) Write(point lp.CCMetric) error {
|
func (s *GangliaSink) Write(point lp.CCMetric) error {
|
||||||
var err error = nil
|
var err error = nil
|
||||||
var tagsstr []string
|
var tagsstr []string
|
||||||
@ -170,6 +135,35 @@ func (s *GangliaSink) Close() {
|
|||||||
|
|
||||||
func NewGangliaSink(name string, config json.RawMessage) (Sink, error) {
|
func NewGangliaSink(name string, config json.RawMessage) (Sink, error) {
|
||||||
s := new(GangliaSink)
|
s := new(GangliaSink)
|
||||||
s.Init(name, config)
|
s.name = fmt.Sprintf("GangliaSink(%s)", name)
|
||||||
|
s.config.AddTagsAsDesc = false
|
||||||
|
s.config.AddGangliaGroup = false
|
||||||
|
if len(config) > 0 {
|
||||||
|
err := json.Unmarshal(config, &s.config)
|
||||||
|
if err != nil {
|
||||||
|
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.gmetric_path = ""
|
||||||
|
s.gmetric_config = ""
|
||||||
|
if len(s.config.GmetricPath) > 0 {
|
||||||
|
p, err := exec.LookPath(s.config.GmetricPath)
|
||||||
|
if err == nil {
|
||||||
|
s.gmetric_path = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(s.gmetric_path) == 0 {
|
||||||
|
p, err := exec.LookPath(string(GMETRIC_EXEC))
|
||||||
|
if err == nil {
|
||||||
|
s.gmetric_path = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(s.gmetric_path) == 0 {
|
||||||
|
return nil, errors.New("cannot find executable 'gmetric'")
|
||||||
|
}
|
||||||
|
if len(s.config.GmetricConfig) > 0 {
|
||||||
|
s.gmetric_config = s.config.GmetricConfig
|
||||||
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
@ -38,57 +38,6 @@ type HttpSink struct {
|
|||||||
flushDelay time.Duration
|
flushDelay time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *HttpSink) Init(name string, config json.RawMessage) error {
|
|
||||||
// Set default values
|
|
||||||
s.name = fmt.Sprintf("HttpSink(%s)", name)
|
|
||||||
s.config.MaxIdleConns = 10
|
|
||||||
s.config.IdleConnTimeout = "5s"
|
|
||||||
s.config.Timeout = "5s"
|
|
||||||
s.config.FlushDelay = "1s"
|
|
||||||
|
|
||||||
// Read config
|
|
||||||
if len(config) > 0 {
|
|
||||||
err := json.Unmarshal(config, &s.config)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(s.config.URL) == 0 {
|
|
||||||
return errors.New("`url` config option is required for HTTP sink")
|
|
||||||
}
|
|
||||||
if s.config.MaxIdleConns > 0 {
|
|
||||||
s.maxIdleConns = s.config.MaxIdleConns
|
|
||||||
}
|
|
||||||
if len(s.config.IdleConnTimeout) > 0 {
|
|
||||||
t, err := time.ParseDuration(s.config.IdleConnTimeout)
|
|
||||||
if err == nil {
|
|
||||||
s.idleConnTimeout = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(s.config.Timeout) > 0 {
|
|
||||||
t, err := time.ParseDuration(s.config.Timeout)
|
|
||||||
if err == nil {
|
|
||||||
s.timeout = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(s.config.FlushDelay) > 0 {
|
|
||||||
t, err := time.ParseDuration(s.config.FlushDelay)
|
|
||||||
if err == nil {
|
|
||||||
s.flushDelay = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tr := &http.Transport{
|
|
||||||
MaxIdleConns: s.maxIdleConns,
|
|
||||||
IdleConnTimeout: s.idleConnTimeout,
|
|
||||||
}
|
|
||||||
s.client = &http.Client{Transport: tr, Timeout: s.timeout}
|
|
||||||
s.buffer = &bytes.Buffer{}
|
|
||||||
s.encoder = influx.NewEncoder(s.buffer)
|
|
||||||
s.encoder.SetPrecision(time.Second)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *HttpSink) Write(m lp.CCMetric) error {
|
func (s *HttpSink) Write(m lp.CCMetric) error {
|
||||||
if s.buffer.Len() == 0 && s.flushDelay != 0 {
|
if s.buffer.Len() == 0 && s.flushDelay != 0 {
|
||||||
// This is the first write since the last flush, start the flushTimer!
|
// This is the first write since the last flush, start the flushTimer!
|
||||||
@ -172,6 +121,51 @@ func (s *HttpSink) Close() {
|
|||||||
|
|
||||||
func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
|
func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
|
||||||
s := new(HttpSink)
|
s := new(HttpSink)
|
||||||
s.Init(name, config)
|
// Set default values
|
||||||
|
s.name = fmt.Sprintf("HttpSink(%s)", name)
|
||||||
|
s.config.MaxIdleConns = 10
|
||||||
|
s.config.IdleConnTimeout = "5s"
|
||||||
|
s.config.Timeout = "5s"
|
||||||
|
s.config.FlushDelay = "1s"
|
||||||
|
|
||||||
|
// Read config
|
||||||
|
if len(config) > 0 {
|
||||||
|
err := json.Unmarshal(config, &s.config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(s.config.URL) == 0 {
|
||||||
|
return nil, errors.New("`url` config option is required for HTTP sink")
|
||||||
|
}
|
||||||
|
if s.config.MaxIdleConns > 0 {
|
||||||
|
s.maxIdleConns = s.config.MaxIdleConns
|
||||||
|
}
|
||||||
|
if len(s.config.IdleConnTimeout) > 0 {
|
||||||
|
t, err := time.ParseDuration(s.config.IdleConnTimeout)
|
||||||
|
if err == nil {
|
||||||
|
s.idleConnTimeout = t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(s.config.Timeout) > 0 {
|
||||||
|
t, err := time.ParseDuration(s.config.Timeout)
|
||||||
|
if err == nil {
|
||||||
|
s.timeout = t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(s.config.FlushDelay) > 0 {
|
||||||
|
t, err := time.ParseDuration(s.config.FlushDelay)
|
||||||
|
if err == nil {
|
||||||
|
s.flushDelay = t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tr := &http.Transport{
|
||||||
|
MaxIdleConns: s.maxIdleConns,
|
||||||
|
IdleConnTimeout: s.idleConnTimeout,
|
||||||
|
}
|
||||||
|
s.client = &http.Client{Transport: tr, Timeout: s.timeout}
|
||||||
|
s.buffer = &bytes.Buffer{}
|
||||||
|
s.encoder = influx.NewEncoder(s.buffer)
|
||||||
|
s.encoder.SetPrecision(time.Second)
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
@ -30,11 +30,10 @@ type InfluxAsyncSinkConfig struct {
|
|||||||
|
|
||||||
type InfluxAsyncSink struct {
|
type InfluxAsyncSink struct {
|
||||||
sink
|
sink
|
||||||
client influxdb2.Client
|
client influxdb2.Client
|
||||||
writeApi influxdb2Api.WriteAPI
|
writeApi influxdb2Api.WriteAPI
|
||||||
retPolicy string
|
errors <-chan error
|
||||||
errors <-chan error
|
config InfluxAsyncSinkConfig
|
||||||
config InfluxAsyncSinkConfig
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *InfluxAsyncSink) connect() error {
|
func (s *InfluxAsyncSink) connect() error {
|
||||||
@ -68,39 +67,6 @@ func (s *InfluxAsyncSink) connect() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *InfluxAsyncSink) Init(name string, config json.RawMessage) error {
|
|
||||||
s.name = fmt.Sprintf("InfluxSink(%s)", name)
|
|
||||||
|
|
||||||
// Set default for maximum number of points sent to server in single request.
|
|
||||||
s.config.BatchSize = 100
|
|
||||||
|
|
||||||
if len(config) > 0 {
|
|
||||||
err := json.Unmarshal(config, &s.config)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(s.config.Host) == 0 ||
|
|
||||||
len(s.config.Port) == 0 ||
|
|
||||||
len(s.config.Database) == 0 ||
|
|
||||||
len(s.config.Organization) == 0 ||
|
|
||||||
len(s.config.Password) == 0 {
|
|
||||||
return errors.New("not all configuration variables set required by InfluxAsyncSink")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Connect to InfluxDB server
|
|
||||||
err := s.connect()
|
|
||||||
|
|
||||||
// Start background: Read from error channel
|
|
||||||
s.errors = s.writeApi.Errors()
|
|
||||||
go func() {
|
|
||||||
for err := range s.errors {
|
|
||||||
cclog.ComponentError(s.name, err.Error())
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
|
func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
|
||||||
s.writeApi.WritePoint(
|
s.writeApi.WritePoint(
|
||||||
m.ToPoint(s.config.MetaAsTags),
|
m.ToPoint(s.config.MetaAsTags),
|
||||||
@ -121,6 +87,37 @@ func (s *InfluxAsyncSink) Close() {
|
|||||||
|
|
||||||
func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
|
func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
|
||||||
s := new(InfluxAsyncSink)
|
s := new(InfluxAsyncSink)
|
||||||
s.Init(name, config)
|
s.name = fmt.Sprintf("InfluxSink(%s)", name)
|
||||||
|
|
||||||
|
// Set default for maximum number of points sent to server in single request.
|
||||||
|
s.config.BatchSize = 100
|
||||||
|
|
||||||
|
if len(config) > 0 {
|
||||||
|
err := json.Unmarshal(config, &s.config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(s.config.Host) == 0 ||
|
||||||
|
len(s.config.Port) == 0 ||
|
||||||
|
len(s.config.Database) == 0 ||
|
||||||
|
len(s.config.Organization) == 0 ||
|
||||||
|
len(s.config.Password) == 0 {
|
||||||
|
return nil, errors.New("not all configuration variables set required by InfluxAsyncSink")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect to InfluxDB server
|
||||||
|
if err := s.connect(); err != nil {
|
||||||
|
return nil, fmt.Errorf("Unable to connect: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start background: Read from error channel
|
||||||
|
s.errors = s.writeApi.Errors()
|
||||||
|
go func() {
|
||||||
|
for err := range s.errors {
|
||||||
|
cclog.ComponentError(s.name, err.Error())
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
@ -57,26 +57,6 @@ func (s *InfluxSink) connect() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *InfluxSink) Init(name string, config json.RawMessage) error {
|
|
||||||
s.name = fmt.Sprintf("InfluxSink(%s)", name)
|
|
||||||
if len(config) > 0 {
|
|
||||||
err := json.Unmarshal(config, &s.config)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(s.config.Host) == 0 ||
|
|
||||||
len(s.config.Port) == 0 ||
|
|
||||||
len(s.config.Database) == 0 ||
|
|
||||||
len(s.config.Organization) == 0 ||
|
|
||||||
len(s.config.Password) == 0 {
|
|
||||||
return errors.New("not all configuration variables set required by InfluxSink")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Connect to InfluxDB server
|
|
||||||
return s.connect()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *InfluxSink) Write(m lp.CCMetric) error {
|
func (s *InfluxSink) Write(m lp.CCMetric) error {
|
||||||
err :=
|
err :=
|
||||||
s.writeApi.WritePoint(
|
s.writeApi.WritePoint(
|
||||||
@ -97,6 +77,24 @@ func (s *InfluxSink) Close() {
|
|||||||
|
|
||||||
func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
|
func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
|
||||||
s := new(InfluxSink)
|
s := new(InfluxSink)
|
||||||
s.Init(name, config)
|
s.name = fmt.Sprintf("InfluxSink(%s)", name)
|
||||||
|
if len(config) > 0 {
|
||||||
|
err := json.Unmarshal(config, &s.config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(s.config.Host) == 0 ||
|
||||||
|
len(s.config.Port) == 0 ||
|
||||||
|
len(s.config.Database) == 0 ||
|
||||||
|
len(s.config.Organization) == 0 ||
|
||||||
|
len(s.config.Password) == 0 {
|
||||||
|
return nil, errors.New("not all configuration variables set required by InfluxSink")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect to InfluxDB server
|
||||||
|
if err := s.connect(); err != nil {
|
||||||
|
return nil, fmt.Errorf("Unable to connect: %v", err)
|
||||||
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
@ -109,65 +109,6 @@ type LibgangliaSink struct {
|
|||||||
cstrCache map[string]*C.char
|
cstrCache map[string]*C.char
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *LibgangliaSink) Init(name string, config json.RawMessage) error {
|
|
||||||
var err error = nil
|
|
||||||
s.name = fmt.Sprintf("LibgangliaSink(%s)", name)
|
|
||||||
//s.config.AddTagsAsDesc = false
|
|
||||||
s.config.AddGangliaGroup = false
|
|
||||||
s.config.AddTypeToName = false
|
|
||||||
s.config.AddUnits = true
|
|
||||||
s.config.GmondConfig = string(GMOND_CONFIG_FILE)
|
|
||||||
s.config.GangliaLib = string(GANGLIA_LIB_NAME)
|
|
||||||
if len(config) > 0 {
|
|
||||||
err = json.Unmarshal(config, &s.config)
|
|
||||||
if err != nil {
|
|
||||||
cclog.ComponentError(s.name, "Error reading config:", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS)
|
|
||||||
if lib == nil {
|
|
||||||
return fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib)
|
|
||||||
}
|
|
||||||
err = lib.Open()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set up cache for the C strings
|
|
||||||
s.cstrCache = make(map[string]*C.char)
|
|
||||||
// s.cstrCache["globals"] = C.CString("globals")
|
|
||||||
|
|
||||||
// s.cstrCache["override_hostname"] = C.CString("override_hostname")
|
|
||||||
// s.cstrCache["override_ip"] = C.CString("override_ip")
|
|
||||||
|
|
||||||
// Add some constant strings
|
|
||||||
s.cstrCache["GROUP"] = C.CString("GROUP")
|
|
||||||
s.cstrCache["CLUSTER"] = C.CString("CLUSTER")
|
|
||||||
s.cstrCache[""] = C.CString("")
|
|
||||||
|
|
||||||
// Add cluster name for lookup in Write()
|
|
||||||
if len(s.config.ClusterName) > 0 {
|
|
||||||
s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName)
|
|
||||||
}
|
|
||||||
// Add supported types for later lookup in Write()
|
|
||||||
s.cstrCache["double"] = C.CString("double")
|
|
||||||
s.cstrCache["int32"] = C.CString("int32")
|
|
||||||
s.cstrCache["string"] = C.CString("string")
|
|
||||||
|
|
||||||
// Create Ganglia pool
|
|
||||||
s.global_context = C.Ganglia_pool_create(nil)
|
|
||||||
// Load Ganglia configuration
|
|
||||||
s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig)
|
|
||||||
s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0)
|
|
||||||
//globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"])
|
|
||||||
//override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"])
|
|
||||||
//override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"])
|
|
||||||
|
|
||||||
s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *LibgangliaSink) Write(point lp.CCMetric) error {
|
func (s *LibgangliaSink) Write(point lp.CCMetric) error {
|
||||||
var err error = nil
|
var err error = nil
|
||||||
var c_name *C.char
|
var c_name *C.char
|
||||||
@ -319,6 +260,60 @@ func (s *LibgangliaSink) Close() {
|
|||||||
|
|
||||||
func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) {
|
func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) {
|
||||||
s := new(LibgangliaSink)
|
s := new(LibgangliaSink)
|
||||||
s.Init(name, config)
|
var err error = nil
|
||||||
|
s.name = fmt.Sprintf("LibgangliaSink(%s)", name)
|
||||||
|
//s.config.AddTagsAsDesc = false
|
||||||
|
s.config.AddGangliaGroup = false
|
||||||
|
s.config.AddTypeToName = false
|
||||||
|
s.config.AddUnits = true
|
||||||
|
s.config.GmondConfig = string(GMOND_CONFIG_FILE)
|
||||||
|
s.config.GangliaLib = string(GANGLIA_LIB_NAME)
|
||||||
|
if len(config) > 0 {
|
||||||
|
err = json.Unmarshal(config, &s.config)
|
||||||
|
if err != nil {
|
||||||
|
cclog.ComponentError(s.name, "Error reading config:", err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS)
|
||||||
|
if lib == nil {
|
||||||
|
return nil, fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib)
|
||||||
|
}
|
||||||
|
err = lib.Open()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set up cache for the C strings
|
||||||
|
s.cstrCache = make(map[string]*C.char)
|
||||||
|
// s.cstrCache["globals"] = C.CString("globals")
|
||||||
|
|
||||||
|
// s.cstrCache["override_hostname"] = C.CString("override_hostname")
|
||||||
|
// s.cstrCache["override_ip"] = C.CString("override_ip")
|
||||||
|
|
||||||
|
// Add some constant strings
|
||||||
|
s.cstrCache["GROUP"] = C.CString("GROUP")
|
||||||
|
s.cstrCache["CLUSTER"] = C.CString("CLUSTER")
|
||||||
|
s.cstrCache[""] = C.CString("")
|
||||||
|
|
||||||
|
// Add cluster name for lookup in Write()
|
||||||
|
if len(s.config.ClusterName) > 0 {
|
||||||
|
s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName)
|
||||||
|
}
|
||||||
|
// Add supported types for later lookup in Write()
|
||||||
|
s.cstrCache["double"] = C.CString("double")
|
||||||
|
s.cstrCache["int32"] = C.CString("int32")
|
||||||
|
s.cstrCache["string"] = C.CString("string")
|
||||||
|
|
||||||
|
// Create Ganglia pool
|
||||||
|
s.global_context = C.Ganglia_pool_create(nil)
|
||||||
|
// Load Ganglia configuration
|
||||||
|
s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig)
|
||||||
|
s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0)
|
||||||
|
//globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"])
|
||||||
|
//override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"])
|
||||||
|
//override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"])
|
||||||
|
|
||||||
|
s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config)
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
package sinks
|
package sinks
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
|
|
||||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -17,7 +15,6 @@ type sink struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Sink interface {
|
type Sink interface {
|
||||||
Init(name string, config json.RawMessage) error
|
|
||||||
Write(point lp.CCMetric) error
|
Write(point lp.CCMetric) error
|
||||||
Flush() error
|
Flush() error
|
||||||
Close()
|
Close()
|
||||||
|
@ -53,30 +53,6 @@ func (s *NatsSink) connect() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *NatsSink) Init(name string, config json.RawMessage) error {
|
|
||||||
s.name = fmt.Sprintf("NatsSink(%s)", name)
|
|
||||||
if len(config) > 0 {
|
|
||||||
err := json.Unmarshal(config, &s.config)
|
|
||||||
if err != nil {
|
|
||||||
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(s.config.Host) == 0 ||
|
|
||||||
len(s.config.Port) == 0 ||
|
|
||||||
len(s.config.Database) == 0 {
|
|
||||||
return errors.New("not all configuration variables set required by NatsSink")
|
|
||||||
}
|
|
||||||
// Setup Influx line protocol
|
|
||||||
s.buffer = &bytes.Buffer{}
|
|
||||||
s.buffer.Grow(1025)
|
|
||||||
s.encoder = influx.NewEncoder(s.buffer)
|
|
||||||
s.encoder.SetPrecision(time.Second)
|
|
||||||
s.encoder.SetMaxLineBytes(1024)
|
|
||||||
// Setup infos for connection
|
|
||||||
return s.connect()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *NatsSink) Write(m lp.CCMetric) error {
|
func (s *NatsSink) Write(m lp.CCMetric) error {
|
||||||
if s.client != nil {
|
if s.client != nil {
|
||||||
_, err := s.encoder.Encode(m.ToPoint(s.config.MetaAsTags))
|
_, err := s.encoder.Encode(m.ToPoint(s.config.MetaAsTags))
|
||||||
@ -108,6 +84,28 @@ func (s *NatsSink) Close() {
|
|||||||
|
|
||||||
func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
|
func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
|
||||||
s := new(NatsSink)
|
s := new(NatsSink)
|
||||||
s.Init(name, config)
|
s.name = fmt.Sprintf("NatsSink(%s)", name)
|
||||||
|
if len(config) > 0 {
|
||||||
|
err := json.Unmarshal(config, &s.config)
|
||||||
|
if err != nil {
|
||||||
|
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(s.config.Host) == 0 ||
|
||||||
|
len(s.config.Port) == 0 ||
|
||||||
|
len(s.config.Database) == 0 {
|
||||||
|
return nil, errors.New("not all configuration variables set required by NatsSink")
|
||||||
|
}
|
||||||
|
// Setup Influx line protocol
|
||||||
|
s.buffer = &bytes.Buffer{}
|
||||||
|
s.buffer.Grow(1025)
|
||||||
|
s.encoder = influx.NewEncoder(s.buffer)
|
||||||
|
s.encoder.SetPrecision(time.Second)
|
||||||
|
s.encoder.SetMaxLineBytes(1024)
|
||||||
|
// Setup infos for connection
|
||||||
|
if err := s.connect(); err != nil {
|
||||||
|
return nil, fmt.Errorf("Unable to connect: %v", err)
|
||||||
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
@ -19,34 +19,6 @@ type StdoutSink struct {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StdoutSink) Init(name string, config json.RawMessage) error {
|
|
||||||
s.name = fmt.Sprintf("StdoutSink(%s)", name)
|
|
||||||
if len(config) > 0 {
|
|
||||||
err := json.Unmarshal(config, &s.config)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
s.output = os.Stdout
|
|
||||||
if len(s.config.Output) > 0 {
|
|
||||||
switch strings.ToLower(s.config.Output) {
|
|
||||||
case "stdout":
|
|
||||||
s.output = os.Stdout
|
|
||||||
case "stderr":
|
|
||||||
s.output = os.Stderr
|
|
||||||
default:
|
|
||||||
f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
s.output = f
|
|
||||||
}
|
|
||||||
}
|
|
||||||
s.meta_as_tags = s.config.MetaAsTags
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StdoutSink) Write(m lp.CCMetric) error {
|
func (s *StdoutSink) Write(m lp.CCMetric) error {
|
||||||
fmt.Fprint(
|
fmt.Fprint(
|
||||||
s.output,
|
s.output,
|
||||||
@ -68,6 +40,30 @@ func (s *StdoutSink) Close() {
|
|||||||
|
|
||||||
func NewStdoutSink(name string, config json.RawMessage) (Sink, error) {
|
func NewStdoutSink(name string, config json.RawMessage) (Sink, error) {
|
||||||
s := new(StdoutSink)
|
s := new(StdoutSink)
|
||||||
s.Init(name, config)
|
s.name = fmt.Sprintf("StdoutSink(%s)", name)
|
||||||
|
if len(config) > 0 {
|
||||||
|
err := json.Unmarshal(config, &s.config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s.output = os.Stdout
|
||||||
|
if len(s.config.Output) > 0 {
|
||||||
|
switch strings.ToLower(s.config.Output) {
|
||||||
|
case "stdout":
|
||||||
|
s.output = os.Stdout
|
||||||
|
case "stderr":
|
||||||
|
s.output = os.Stderr
|
||||||
|
default:
|
||||||
|
f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
s.output = f
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.meta_as_tags = s.config.MetaAsTags
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user