mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-07-30 08:26:08 +02:00
Use receiver, sinks, ccLogger and ccConfig from cc-lib
This commit is contained in:
@@ -11,8 +11,8 @@ import (
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
|
||||
lp "github.com/ClusterCockpit/cc-lib/ccMessage"
|
||||
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
|
||||
mp "github.com/ClusterCockpit/cc-lib/messageProcessor"
|
||||
agg "github.com/ClusterCockpit/cc-metric-collector/internal/metricAggregator"
|
||||
mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker"
|
||||
)
|
||||
|
||||
@@ -63,7 +63,7 @@ type metricRouter struct {
|
||||
|
||||
// MetricRouter access functions
|
||||
type MetricRouter interface {
|
||||
Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error
|
||||
Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfig json.RawMessage) error
|
||||
AddCollectorInput(input chan lp.CCMessage)
|
||||
AddReceiverInput(input chan lp.CCMessage)
|
||||
AddOutput(output chan lp.CCMessage)
|
||||
@@ -77,7 +77,7 @@ type MetricRouter interface {
|
||||
// * wait group synchronization (from variable wg)
|
||||
// * ticker (from variable ticker)
|
||||
// * configuration (read from config file in variable routerConfigFile)
|
||||
func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error {
|
||||
func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfig json.RawMessage) error {
|
||||
r.outputs = make([]chan lp.CCMessage, 0)
|
||||
r.done = make(chan bool)
|
||||
r.cache_input = make(chan lp.CCMessage)
|
||||
@@ -95,15 +95,7 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
|
||||
// Drop domain part of host name
|
||||
r.hostname = strings.SplitN(hostname, `.`, 2)[0]
|
||||
|
||||
// Read metric router config file
|
||||
configFile, err := os.Open(routerConfigFile)
|
||||
if err != nil {
|
||||
cclog.ComponentError("MetricRouter", err.Error())
|
||||
return err
|
||||
}
|
||||
defer configFile.Close()
|
||||
jsonParser := json.NewDecoder(configFile)
|
||||
err = jsonParser.Decode(&r.config)
|
||||
err = json.Unmarshal(routerConfig, &r.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError("MetricRouter", err.Error())
|
||||
return err
|
||||
@@ -449,9 +441,9 @@ func (r *metricRouter) Close() {
|
||||
}
|
||||
|
||||
// New creates a new initialized metric router
|
||||
func New(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) (MetricRouter, error) {
|
||||
func New(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfig json.RawMessage) (MetricRouter, error) {
|
||||
r := new(metricRouter)
|
||||
err := r.Init(ticker, wg, routerConfigFile)
|
||||
err := r.Init(ticker, wg, routerConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
Reference in New Issue
Block a user