diff --git a/.github/ci-receivers.json b/.github/ci-receivers.json index fe51488..0967ef4 100644 --- a/.github/ci-receivers.json +++ b/.github/ci-receivers.json @@ -1 +1 @@ -[] +{} diff --git a/receivers.json b/receivers.json index e368fc3..a27f07d 100644 --- a/receivers.json +++ b/receivers.json @@ -1,8 +1,8 @@ -[ - { +{ + "natsrecv" : { "type": "nats", "address": "nats://my-url", "port" : "4222", "database": "testcluster" } -] +} diff --git a/receivers/metricReceiver.go b/receivers/metricReceiver.go index 50724b1..c712186 100644 --- a/receivers/metricReceiver.go +++ b/receivers/metricReceiver.go @@ -2,9 +2,15 @@ package receivers import ( // "time" + "encoding/json" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) +type defaultReceiverConfig struct { + Type string `json:"type"` +} + type ReceiverConfig struct { Addr string `json:"address"` Port string `json:"port"` @@ -14,16 +20,13 @@ type ReceiverConfig struct { } type receiver struct { - name string - addr string - port string - database string - organization string - sink chan lp.CCMetric + typename string + name string + sink chan lp.CCMetric } type Receiver interface { - Init(config ReceiverConfig) error + Init(name string, config json.RawMessage) error Start() Close() Name() string diff --git a/receivers/natsReceiver.go b/receivers/natsReceiver.go index 853edf1..dc96971 100644 --- a/receivers/natsReceiver.go +++ b/receivers/natsReceiver.go @@ -1,19 +1,22 @@ package receivers import ( + "encoding/json" "errors" "fmt" + "time" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" influx "github.com/influxdata/line-protocol" nats "github.com/nats-io/nats.go" - cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" - "time" ) type NatsReceiverConfig struct { - Addr string `json:"address"` - Port string `json:"port"` - Database string `json:"database"` + Type string `json:"type"` + Addr string `json:"address"` + Port string `json:"port"` + Subject string `json:"subject"` } type NatsReceiver struct { @@ -22,35 +25,35 @@ type NatsReceiver struct { handler *influx.MetricHandler parser *influx.Parser meta map[string]string - config ReceiverConfig + config NatsReceiverConfig } var DefaultTime = func() time.Time { return time.Unix(42, 0) } -func (r *NatsReceiver) Init(config ReceiverConfig) error { - r.name = "NatsReceiver" - r.config = config +func (r *NatsReceiver) Init(name string, config json.RawMessage) error { + r.typename = "NatsReceiver" + r.name = name + r.config.Addr = nats.DefaultURL + r.config.Port = "4222" + if len(config) > 0 { + err := json.Unmarshal(config, &r.config) + if err != nil { + cclog.ComponentError(r.name, "Error reading config:", err.Error()) + return err + } + } if len(r.config.Addr) == 0 || len(r.config.Port) == 0 || - len(r.config.Database) == 0 { - return errors.New("Not all configuration variables set required by NatsReceiver") + len(r.config.Subject) == 0 { + return errors.New("not all configuration variables set required by NatsReceiver") } r.meta = map[string]string{"source": r.name} - r.addr = r.config.Addr - if len(r.addr) == 0 { - r.addr = nats.DefaultURL - } - r.port = r.config.Port - if len(r.port) == 0 { - r.port = "4222" - } - uri := fmt.Sprintf("%s:%s", r.addr, r.port) - cclog.ComponentDebug("NatsReceiver", "INIT", uri) + uri := fmt.Sprintf("%s:%s", r.config.Addr, r.config.Port) + cclog.ComponentDebug(r.name, "INIT", uri, "Subject", r.config.Subject) nc, err := nats.Connect(uri) if err == nil { - r.database = r.config.Database r.nc = nc } else { r.nc = nil @@ -63,8 +66,8 @@ func (r *NatsReceiver) Init(config ReceiverConfig) error { } func (r *NatsReceiver) Start() { - cclog.ComponentDebug("NatsReceiver", "START") - r.nc.Subscribe(r.database, r._NatsReceive) + cclog.ComponentDebug(r.name, "START") + r.nc.Subscribe(r.config.Subject, r._NatsReceive) } func (r *NatsReceiver) _NatsReceive(m *nats.Msg) { @@ -84,7 +87,7 @@ func (r *NatsReceiver) _NatsReceive(m *nats.Msg) { func (r *NatsReceiver) Close() { if r.nc != nil { - cclog.ComponentDebug("NatsReceiver", "CLOSE") + cclog.ComponentDebug(r.name, "CLOSE") r.nc.Close() } } diff --git a/receivers/receiveManager.go b/receivers/receiveManager.go index c570aa4..7141170 100644 --- a/receivers/receiveManager.go +++ b/receivers/receiveManager.go @@ -18,12 +18,12 @@ type receiveManager struct { output chan lp.CCMetric done chan bool wg *sync.WaitGroup - config []ReceiverConfig + config []json.RawMessage } type ReceiveManager interface { Init(wg *sync.WaitGroup, receiverConfigFile string) error - AddInput(rawConfig json.RawMessage) error + AddInput(name string, rawConfig json.RawMessage) error AddOutput(output chan lp.CCMetric) Start() Close() @@ -34,7 +34,7 @@ func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) er rm.output = nil rm.done = make(chan bool) rm.wg = wg - rm.config = make([]ReceiverConfig, 0) + rm.config = make([]json.RawMessage, 0) configFile, err := os.Open(receiverConfigFile) if err != nil { cclog.ComponentError("ReceiveManager", err.Error()) @@ -42,14 +42,14 @@ func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) er } defer configFile.Close() jsonParser := json.NewDecoder(configFile) - var rawConfigs []json.RawMessage + var rawConfigs map[string]json.RawMessage err = jsonParser.Decode(&rawConfigs) if err != nil { cclog.ComponentError("ReceiveManager", err.Error()) return err } - for _, raw := range rawConfigs { - rm.AddInput(raw) + for name, raw := range rawConfigs { + rm.AddInput(name, raw) } return nil } @@ -64,8 +64,8 @@ func (rm *receiveManager) Start() { cclog.ComponentDebug("ReceiveManager", "STARTED") } -func (rm *receiveManager) AddInput(rawConfig json.RawMessage) error { - var config ReceiverConfig +func (rm *receiveManager) AddInput(name string, rawConfig json.RawMessage) error { + var config defaultReceiverConfig err := json.Unmarshal(rawConfig, &config) if err != nil { cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "JSON config error:", err.Error()) @@ -76,13 +76,13 @@ func (rm *receiveManager) AddInput(rawConfig json.RawMessage) error { return err } r := AvailableReceivers[config.Type] - err = r.Init(config) + err = r.Init(name, rawConfig) if err != nil { cclog.ComponentError("ReceiveManager", "SKIP", r.Name(), "initialization failed:", err.Error()) return err } rm.inputs = append(rm.inputs, r) - rm.config = append(rm.config, config) + rm.config = append(rm.config, rawConfig) cclog.ComponentDebug("ReceiveManager", "ADD RECEIVER", r.Name()) return nil }