mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2024-12-26 15:29:04 +01:00
Use ccLogger anywhere
This commit is contained in:
parent
b4fde31626
commit
2925ad9f40
@ -70,17 +70,17 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
|
|||||||
}
|
}
|
||||||
for k, cfg := range cm.config {
|
for k, cfg := range cm.config {
|
||||||
if _, found := AvailableCollectors[k]; !found {
|
if _, found := AvailableCollectors[k]; !found {
|
||||||
cclog.ComponentPrint("CollectorManager", "SKIP unknown collector ", k)
|
cclog.ComponentError("CollectorManager", "SKIP unknown collector", k)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
c := AvailableCollectors[k]
|
c := AvailableCollectors[k]
|
||||||
|
|
||||||
err = c.Init(cfg)
|
err = c.Init(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentPrint("CollectorManager", "Collector ", k, "initialization failed: ", err.Error())
|
cclog.ComponentError("CollectorManager", "Collector", k, "initialization failed:", err.Error())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
cclog.ComponentDebug("CollectorManager", "Collector ", k, "initialized")
|
cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", c.Name())
|
||||||
cm.collectors = append(cm.collectors, c)
|
cm.collectors = append(cm.collectors, c)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -99,7 +99,7 @@ func (cm *collectorManager) Start() {
|
|||||||
c.Close()
|
c.Close()
|
||||||
}
|
}
|
||||||
cm.wg.Done()
|
cm.wg.Done()
|
||||||
cclog.ComponentPrint("CollectorManager", "DONE")
|
cclog.ComponentDebug("CollectorManager", "DONE")
|
||||||
break CollectorManagerLoop
|
break CollectorManagerLoop
|
||||||
case t := <-tick:
|
case t := <-tick:
|
||||||
for _, c := range cm.collectors {
|
for _, c := range cm.collectors {
|
||||||
@ -110,17 +110,17 @@ func (cm *collectorManager) Start() {
|
|||||||
c.Close()
|
c.Close()
|
||||||
}
|
}
|
||||||
cm.wg.Done()
|
cm.wg.Done()
|
||||||
cclog.ComponentPrint("CollectorManager", "DONE")
|
cclog.ComponentDebug("CollectorManager", "DONE")
|
||||||
break CollectorManagerInputLoop
|
break CollectorManagerInputLoop
|
||||||
default:
|
default:
|
||||||
cclog.ComponentPrint("CollectorManager", c.Name(), " ", t)
|
cclog.ComponentDebug("CollectorManager", c.Name(), t)
|
||||||
c.Read(cm.duration, cm.output)
|
c.Read(cm.duration, cm.output)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
cclog.ComponentPrint("CollectorManager", "STARTED")
|
cclog.ComponentDebug("CollectorManager", "STARTED")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cm *collectorManager) AddOutput(output chan lp.CCMetric) {
|
func (cm *collectorManager) AddOutput(output chan lp.CCMetric) {
|
||||||
@ -129,7 +129,7 @@ func (cm *collectorManager) AddOutput(output chan lp.CCMetric) {
|
|||||||
|
|
||||||
func (cm *collectorManager) Close() {
|
func (cm *collectorManager) Close() {
|
||||||
cm.done <- true
|
cm.done <- true
|
||||||
cclog.ComponentPrint("CollectorManager", "CLOSE")
|
cclog.ComponentDebug("CollectorManager", "CLOSE")
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) {
|
func New(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) {
|
||||||
|
@ -4,7 +4,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -102,7 +102,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
y, err := lp.New(strings.ToLower(mname), tags, m.meta, map[string]interface{}{"value": int(float64(x) / 1000)}, time.Now())
|
y, err := lp.New(strings.ToLower(mname), tags, m.meta, map[string]interface{}{"value": int(float64(x) / 1000)}, time.Now())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
log.Print("[", m.name, "] ", y)
|
cclog.ComponentDebug(m.name, y)
|
||||||
output <- y
|
output <- y
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -37,52 +37,56 @@ func initLogger() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func CCPrint(logger *log.Logger, e ... interface {}) {
|
|
||||||
if logger != nil {
|
|
||||||
logger.Print(e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func Print(e ... interface{}) {
|
func Print(e ... interface{}) {
|
||||||
CCPrint(defaultLog, e)
|
initLogger()
|
||||||
|
defaultLog.Print(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
func ComponentPrint(component string, e ... interface{}) {
|
func ComponentPrint(component string, e ... interface{}) {
|
||||||
CCPrint(defaultLog, fmt.Sprintf("[%s]", component), e)
|
initLogger()
|
||||||
|
defaultLog.Print(fmt.Sprintf("[%s] ", component), e)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Info(e ... interface{}) {
|
func Info(e ... interface{}) {
|
||||||
CCPrint(infoLog, e)
|
initLogger()
|
||||||
|
infoLog.Print(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
func ComponentInfo(component string, e ... interface{}) {
|
func ComponentInfo(component string, e ... interface{}) {
|
||||||
CCPrint(infoLog, fmt.Sprintf("[%s]", component), e)
|
initLogger()
|
||||||
|
infoLog.Print(fmt.Sprintf("[%s] ", component), e)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Debug(e ... interface{}) {
|
func Debug(e ... interface{}) {
|
||||||
if globalDebug {
|
initLogger()
|
||||||
CCPrint(debugLog, e)
|
if globalDebug == true {
|
||||||
|
debugLog.Print(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ComponentDebug(component string, e ... interface{}) {
|
func ComponentDebug(component string, e ... interface{}) {
|
||||||
if globalDebug {
|
initLogger()
|
||||||
CCPrint(debugLog, fmt.Sprintf("[%s]", component), e)
|
if globalDebug == true && debugLog != nil {
|
||||||
|
//CCComponentPrint(debugLog, component, e)
|
||||||
|
debugLog.Print(fmt.Sprintf("[%s] ", component), e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Error(e ... interface{}) {
|
func Error(e ... interface{}) {
|
||||||
|
initLogger()
|
||||||
_, fn, line, _ := runtime.Caller(1)
|
_, fn, line, _ := runtime.Caller(1)
|
||||||
CCPrint(errorLog, fn, line, e)
|
errorLog.Print(fmt.Sprintf("[%s:%d] ", fn, line), e)
|
||||||
}
|
}
|
||||||
|
|
||||||
func ComponentError(component string, e ... interface{}) {
|
func ComponentError(component string, e ... interface{}) {
|
||||||
|
initLogger()
|
||||||
_, fn, line, _ := runtime.Caller(1)
|
_, fn, line, _ := runtime.Caller(1)
|
||||||
CCPrint(errorLog, fmt.Sprintf("[%s]", component), fn, line, e)
|
errorLog.Print(fmt.Sprintf("[%s|%s:%d] ", component, fn, line), e)
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetDebug() {
|
func SetDebug() {
|
||||||
globalDebug = true
|
globalDebug = true
|
||||||
|
initLogger()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@ package metricRouter
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"log"
|
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -50,14 +50,14 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
|
|||||||
r.ticker = ticker
|
r.ticker = ticker
|
||||||
configFile, err := os.Open(routerConfigFile)
|
configFile, err := os.Open(routerConfigFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(err.Error())
|
cclog.ComponentError("MetricRouter", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer configFile.Close()
|
defer configFile.Close()
|
||||||
jsonParser := json.NewDecoder(configFile)
|
jsonParser := json.NewDecoder(configFile)
|
||||||
err = jsonParser.Decode(&r.config)
|
err = jsonParser.Decode(&r.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(err.Error())
|
cclog.ComponentError("MetricRouter", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -79,7 +79,7 @@ func (r *metricRouter) StartTimer() {
|
|||||||
func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error) {
|
func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error) {
|
||||||
expression, err := govaluate.NewEvaluableExpression(Cond)
|
expression, err := govaluate.NewEvaluableExpression(Cond)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(Cond, " = ", err.Error())
|
cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error())
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
params := make(map[string]interface{})
|
params := make(map[string]interface{})
|
||||||
@ -97,7 +97,7 @@ func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, erro
|
|||||||
|
|
||||||
result, err := expression.Evaluate(params)
|
result, err := expression.Evaluate(params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(Cond, " = ", err.Error())
|
cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error())
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
return bool(result.(bool)), err
|
return bool(result.(bool)), err
|
||||||
@ -113,7 +113,7 @@ func (r *metricRouter) DoAddTags(point lp.CCMetric) {
|
|||||||
var err error
|
var err error
|
||||||
conditionMatches, err = r.EvalCondition(m.Condition, point)
|
conditionMatches, err = r.EvalCondition(m.Condition, point)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(err.Error())
|
cclog.ComponentError("MetricRouter", err.Error())
|
||||||
conditionMatches = false
|
conditionMatches = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -133,7 +133,7 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) {
|
|||||||
var err error
|
var err error
|
||||||
conditionMatches, err = r.EvalCondition(m.Condition, point)
|
conditionMatches, err = r.EvalCondition(m.Condition, point)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(err.Error())
|
cclog.ComponentError("MetricRouter", err.Error())
|
||||||
conditionMatches = false
|
conditionMatches = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -154,7 +154,7 @@ func (r *metricRouter) Start() {
|
|||||||
RouterLoop:
|
RouterLoop:
|
||||||
select {
|
select {
|
||||||
case <-r.done:
|
case <-r.done:
|
||||||
log.Print("[MetricRouter] DONE\n")
|
cclog.ComponentDebug("MetricRouter", "DONE")
|
||||||
r.wg.Done()
|
r.wg.Done()
|
||||||
break RouterLoop
|
break RouterLoop
|
||||||
default:
|
default:
|
||||||
@ -162,11 +162,11 @@ func (r *metricRouter) Start() {
|
|||||||
RouterInputLoop:
|
RouterInputLoop:
|
||||||
select {
|
select {
|
||||||
case <-r.done:
|
case <-r.done:
|
||||||
log.Print("[MetricRouter] DONE\n")
|
cclog.ComponentDebug("MetricRouter", "DONE")
|
||||||
r.wg.Done()
|
r.wg.Done()
|
||||||
break RouterInputLoop
|
break RouterInputLoop
|
||||||
case p := <-c:
|
case p := <-c:
|
||||||
log.Print("[MetricRouter] FORWARD ", p)
|
cclog.ComponentDebug("MetricRouter", "FORWARD", p)
|
||||||
r.DoAddTags(p)
|
r.DoAddTags(p)
|
||||||
r.DoDelTags(p)
|
r.DoDelTags(p)
|
||||||
if r.config.IntervalStamp {
|
if r.config.IntervalStamp {
|
||||||
@ -180,9 +180,8 @@ func (r *metricRouter) Start() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Print("[MetricRouter] EXIT\n")
|
|
||||||
}()
|
}()
|
||||||
log.Print("[MetricRouter] STARTED\n")
|
cclog.ComponentDebug("MetricRouter", "STARTED")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *metricRouter) AddInput(input chan lp.CCMetric) {
|
func (r *metricRouter) AddInput(input chan lp.CCMetric) {
|
||||||
@ -195,7 +194,7 @@ func (r *metricRouter) AddOutput(output chan lp.CCMetric) {
|
|||||||
|
|
||||||
func (r *metricRouter) Close() {
|
func (r *metricRouter) Close() {
|
||||||
r.done <- true
|
r.done <- true
|
||||||
log.Print("[MetricRouter] CLOSE\n")
|
cclog.ComponentDebug("MetricRouter", "CLOSE")
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) (MetricRouter, error) {
|
func New(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) (MetricRouter, error) {
|
||||||
|
@ -100,6 +100,7 @@ func ReadCli() map[string]string {
|
|||||||
logfile := flag.String("log", "stderr", "Path for logfile")
|
logfile := flag.String("log", "stderr", "Path for logfile")
|
||||||
pidfile := flag.String("pidfile", "/var/run/cc-metric-collector.pid", "Path for PID file")
|
pidfile := flag.String("pidfile", "/var/run/cc-metric-collector.pid", "Path for PID file")
|
||||||
once := flag.Bool("once", false, "Run all collectors only once")
|
once := flag.Bool("once", false, "Run all collectors only once")
|
||||||
|
debug := flag.Bool("debug", false, "Activate debug output")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
m = make(map[string]string)
|
m = make(map[string]string)
|
||||||
m["configfile"] = *cfg
|
m["configfile"] = *cfg
|
||||||
@ -110,6 +111,12 @@ func ReadCli() map[string]string {
|
|||||||
} else {
|
} else {
|
||||||
m["once"] = "false"
|
m["once"] = "false"
|
||||||
}
|
}
|
||||||
|
if *debug {
|
||||||
|
m["debug"] = "true"
|
||||||
|
cclog.SetDebug()
|
||||||
|
} else {
|
||||||
|
m["debug"] = "false"
|
||||||
|
}
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -219,6 +226,10 @@ func mainFunc() int {
|
|||||||
// Drop domain part of host name
|
// Drop domain part of host name
|
||||||
rcfg.Hostname = strings.SplitN(rcfg.Hostname, `.`, 2)[0]
|
rcfg.Hostname = strings.SplitN(rcfg.Hostname, `.`, 2)[0]
|
||||||
// err = CreatePidfile(rcfg.CliArgs["pidfile"])
|
// err = CreatePidfile(rcfg.CliArgs["pidfile"])
|
||||||
|
|
||||||
|
if rcfg.CliArgs["logfile"] != "stderr" {
|
||||||
|
cclog.SetOutput(rcfg.CliArgs["logfile"])
|
||||||
|
}
|
||||||
// err = SetLogging(rcfg.CliArgs["logfile"])
|
// err = SetLogging(rcfg.CliArgs["logfile"])
|
||||||
// if err != nil {
|
// if err != nil {
|
||||||
// log.Print("Error setting up logging system to ", rcfg.CliArgs["logfile"], " on ", rcfg.Hostname)
|
// log.Print("Error setting up logging system to ", rcfg.CliArgs["logfile"], " on ", rcfg.Hostname)
|
||||||
|
@ -6,7 +6,7 @@ import (
|
|||||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||||
influx "github.com/influxdata/line-protocol"
|
influx "github.com/influxdata/line-protocol"
|
||||||
nats "github.com/nats-io/nats.go"
|
nats "github.com/nats-io/nats.go"
|
||||||
"log"
|
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -46,8 +46,8 @@ func (r *NatsReceiver) Init(config ReceiverConfig) error {
|
|||||||
if len(r.port) == 0 {
|
if len(r.port) == 0 {
|
||||||
r.port = "4222"
|
r.port = "4222"
|
||||||
}
|
}
|
||||||
log.Print("[NatsReceiver] INIT")
|
|
||||||
uri := fmt.Sprintf("%s:%s", r.addr, r.port)
|
uri := fmt.Sprintf("%s:%s", r.addr, r.port)
|
||||||
|
cclog.ComponentDebug("NatsReceiver", "INIT", uri)
|
||||||
nc, err := nats.Connect(uri)
|
nc, err := nats.Connect(uri)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
r.database = r.config.Database
|
r.database = r.config.Database
|
||||||
@ -63,7 +63,7 @@ func (r *NatsReceiver) Init(config ReceiverConfig) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *NatsReceiver) Start() {
|
func (r *NatsReceiver) Start() {
|
||||||
log.Print("[NatsReceiver] START")
|
cclog.ComponentDebug("NatsReceiver", "START")
|
||||||
r.nc.Subscribe(r.database, r._NatsReceive)
|
r.nc.Subscribe(r.database, r._NatsReceive)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,7 +75,6 @@ func (r *NatsReceiver) _NatsReceive(m *nats.Msg) {
|
|||||||
for k, v := range r.meta {
|
for k, v := range r.meta {
|
||||||
y.AddMeta(k, v)
|
y.AddMeta(k, v)
|
||||||
}
|
}
|
||||||
//y, err := lp.New(m.Name(), Tags2Map(m), r.meta, Fields2Map(m), m.Time())
|
|
||||||
if r.sink != nil {
|
if r.sink != nil {
|
||||||
r.sink <- y
|
r.sink <- y
|
||||||
}
|
}
|
||||||
@ -85,7 +84,7 @@ func (r *NatsReceiver) _NatsReceive(m *nats.Msg) {
|
|||||||
|
|
||||||
func (r *NatsReceiver) Close() {
|
func (r *NatsReceiver) Close() {
|
||||||
if r.nc != nil {
|
if r.nc != nil {
|
||||||
log.Print("[NatsReceiver] CLOSE")
|
cclog.ComponentDebug("NatsReceiver", "CLOSE")
|
||||||
r.nc.Close()
|
r.nc.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,7 @@ package receivers
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||||
"log"
|
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
@ -36,7 +36,7 @@ func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) er
|
|||||||
rm.config = make([]ReceiverConfig, 0)
|
rm.config = make([]ReceiverConfig, 0)
|
||||||
configFile, err := os.Open(receiverConfigFile)
|
configFile, err := os.Open(receiverConfigFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(err.Error())
|
cclog.ComponentError("ReceiveManager", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer configFile.Close()
|
defer configFile.Close()
|
||||||
@ -44,23 +44,11 @@ func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) er
|
|||||||
var rawConfigs []json.RawMessage
|
var rawConfigs []json.RawMessage
|
||||||
err = jsonParser.Decode(&rawConfigs)
|
err = jsonParser.Decode(&rawConfigs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(err.Error())
|
cclog.ComponentError("ReceiveManager", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, raw := range rawConfigs {
|
for _, raw := range rawConfigs {
|
||||||
log.Print("[ReceiveManager] ", string(raw))
|
|
||||||
rm.AddInput(raw)
|
rm.AddInput(raw)
|
||||||
// if _, found := AvailableReceivers[k.Type]; !found {
|
|
||||||
// log.Print("[ReceiveManager] SKIP Config specifies unknown receiver 'type': ", k.Type)
|
|
||||||
// continue
|
|
||||||
// }
|
|
||||||
// r := AvailableReceivers[k.Type]
|
|
||||||
// err = r.Init(k)
|
|
||||||
// if err != nil {
|
|
||||||
// log.Print("[ReceiveManager] SKIP Receiver ", k.Type, " cannot be initialized: ", err.Error())
|
|
||||||
// continue
|
|
||||||
// }
|
|
||||||
// rm.inputs = append(rm.inputs, r)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -69,60 +57,32 @@ func (rm *receiveManager) Start() {
|
|||||||
rm.wg.Add(1)
|
rm.wg.Add(1)
|
||||||
|
|
||||||
for _, r := range rm.inputs {
|
for _, r := range rm.inputs {
|
||||||
log.Print("[ReceiveManager] START ", r.Name())
|
cclog.ComponentDebug("ReceiveManager", "START", r.Name())
|
||||||
r.Start()
|
r.Start()
|
||||||
}
|
}
|
||||||
log.Print("[ReceiveManager] STARTED\n")
|
cclog.ComponentDebug("ReceiveManager", "STARTED")
|
||||||
// go func() {
|
|
||||||
// for {
|
|
||||||
//ReceiveManagerLoop:
|
|
||||||
// select {
|
|
||||||
// case <- rm.done:
|
|
||||||
// log.Print("ReceiveManager done\n")
|
|
||||||
// rm.wg.Done()
|
|
||||||
// break ReceiveManagerLoop
|
|
||||||
// default:
|
|
||||||
// for _, c := range rm.inputs {
|
|
||||||
//ReceiveManagerInputLoop:
|
|
||||||
// select {
|
|
||||||
// case <- rm.done:
|
|
||||||
// log.Print("ReceiveManager done\n")
|
|
||||||
// rm.wg.Done()
|
|
||||||
// break ReceiveManagerInputLoop
|
|
||||||
// case p := <- c:
|
|
||||||
// log.Print("ReceiveManager: ", p)
|
|
||||||
// rm.output <- p
|
|
||||||
// default:
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }()
|
|
||||||
// for _, r := range rm.inputs {
|
|
||||||
// r.Close()
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rm *receiveManager) AddInput(rawConfig json.RawMessage) error {
|
func (rm *receiveManager) AddInput(rawConfig json.RawMessage) error {
|
||||||
var config ReceiverConfig
|
var config ReceiverConfig
|
||||||
err := json.Unmarshal(rawConfig, &config)
|
err := json.Unmarshal(rawConfig, &config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print("[ReceiveManager] SKIP ", config.Type, " JSON config error: ", err.Error())
|
cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "JSON config error:", err.Error())
|
||||||
log.Print(err.Error())
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, found := AvailableReceivers[config.Type]; !found {
|
if _, found := AvailableReceivers[config.Type]; !found {
|
||||||
log.Print("[ReceiveManager] SKIP ", config.Type, " unknown receiver: ", err.Error())
|
cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "unknown receiver:", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
r := AvailableReceivers[config.Type]
|
r := AvailableReceivers[config.Type]
|
||||||
err = r.Init(config)
|
err = r.Init(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print("[ReceiveManager] SKIP ", r.Name(), " initialization failed: ", err.Error())
|
cclog.ComponentError("ReceiveManager", "SKIP", r.Name(), "initialization failed:", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
rm.inputs = append(rm.inputs, r)
|
rm.inputs = append(rm.inputs, r)
|
||||||
rm.config = append(rm.config, config)
|
rm.config = append(rm.config, config)
|
||||||
|
cclog.ComponentDebug("ReceiveManager", "ADD RECEIVER", r.Name())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -135,12 +95,11 @@ func (rm *receiveManager) AddOutput(output chan lp.CCMetric) {
|
|||||||
|
|
||||||
func (rm *receiveManager) Close() {
|
func (rm *receiveManager) Close() {
|
||||||
for _, r := range rm.inputs {
|
for _, r := range rm.inputs {
|
||||||
log.Print("[ReceiveManager] CLOSE ", r.Name())
|
cclog.ComponentDebug("ReceiveManager", "CLOSE", r.Name())
|
||||||
r.Close()
|
r.Close()
|
||||||
}
|
}
|
||||||
rm.wg.Done()
|
rm.wg.Done()
|
||||||
log.Print("[ReceiveManager] CLOSE\n")
|
cclog.ComponentDebug("ReceiveManager", "CLOSE")
|
||||||
log.Print("[ReceiveManager] EXIT\n")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(wg *sync.WaitGroup, receiverConfigFile string) (ReceiveManager, error) {
|
func New(wg *sync.WaitGroup, receiverConfigFile string) (ReceiveManager, error) {
|
||||||
|
@ -2,11 +2,12 @@ package sinks
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"log"
|
// "log"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||||
|
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||||
)
|
)
|
||||||
|
|
||||||
var AvailableSinks = map[string]Sink{
|
var AvailableSinks = map[string]Sink{
|
||||||
@ -41,7 +42,7 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
|
|||||||
if len(sinkConfigFile) > 0 {
|
if len(sinkConfigFile) > 0 {
|
||||||
configFile, err := os.Open(sinkConfigFile)
|
configFile, err := os.Open(sinkConfigFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print("[SinkManager] ", err.Error())
|
cclog.ComponentError("SinkManager", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer configFile.Close()
|
defer configFile.Close()
|
||||||
@ -49,7 +50,7 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
|
|||||||
var rawConfigs []json.RawMessage
|
var rawConfigs []json.RawMessage
|
||||||
err = jsonParser.Decode(&rawConfigs)
|
err = jsonParser.Decode(&rawConfigs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print("[SinkManager] ", err.Error())
|
cclog.ComponentError("SinkManager", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, raw := range rawConfigs {
|
for _, raw := range rawConfigs {
|
||||||
@ -73,16 +74,16 @@ func (sm *sinkManager) Start() {
|
|||||||
for _, s := range sm.outputs {
|
for _, s := range sm.outputs {
|
||||||
s.Close()
|
s.Close()
|
||||||
}
|
}
|
||||||
log.Print("[SinkManager] DONE\n")
|
cclog.ComponentDebug("SinkManager", "DONE")
|
||||||
sm.wg.Done()
|
sm.wg.Done()
|
||||||
break SinkManagerLoop
|
break SinkManagerLoop
|
||||||
case p := <-sm.input:
|
case p := <-sm.input:
|
||||||
log.Print("[SinkManager] WRITE ", p)
|
cclog.ComponentDebug("SinkManager", "WRITE", p)
|
||||||
for _, s := range sm.outputs {
|
for _, s := range sm.outputs {
|
||||||
s.Write(p)
|
s.Write(p)
|
||||||
}
|
}
|
||||||
if batchcount == 0 {
|
if batchcount == 0 {
|
||||||
log.Print("[SinkManager] FLUSH")
|
cclog.ComponentDebug("SinkManager", "FLUSH")
|
||||||
for _, s := range sm.outputs {
|
for _, s := range sm.outputs {
|
||||||
s.Flush()
|
s.Flush()
|
||||||
}
|
}
|
||||||
@ -92,9 +93,8 @@ func (sm *sinkManager) Start() {
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Print("[SinkManager] EXIT\n")
|
|
||||||
}()
|
}()
|
||||||
log.Print("[SinkManager] STARTED\n")
|
cclog.ComponentDebug("SinkManager", "STARTED")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *sinkManager) AddInput(input chan lp.CCMetric) {
|
func (sm *sinkManager) AddInput(input chan lp.CCMetric) {
|
||||||
@ -107,28 +107,29 @@ func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error {
|
|||||||
if len(rawConfig) > 3 {
|
if len(rawConfig) > 3 {
|
||||||
err = json.Unmarshal(rawConfig, &config)
|
err = json.Unmarshal(rawConfig, &config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print("[SinkManager] SKIP ", config.Type, " JSON config error: ", err.Error())
|
cclog.ComponentError("SinkManager", "SKIP", config.Type, "JSON config error:", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if _, found := AvailableSinks[config.Type]; !found {
|
if _, found := AvailableSinks[config.Type]; !found {
|
||||||
log.Print("[SinkManager] SKIP ", config.Type, " unknown sink: ", err.Error())
|
cclog.ComponentError("SinkManager", "SKIP", config.Type, "unknown sink:", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s := AvailableSinks[config.Type]
|
s := AvailableSinks[config.Type]
|
||||||
err = s.Init(config)
|
err = s.Init(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print("[SinkManager] SKIP ", s.Name(), " initialization failed: ", err.Error())
|
cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
sm.outputs = append(sm.outputs, s)
|
sm.outputs = append(sm.outputs, s)
|
||||||
sm.config = append(sm.config, config)
|
sm.config = append(sm.config, config)
|
||||||
|
cclog.ComponentDebug("SinkManager", "ADD SINK", s.Name())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *sinkManager) Close() {
|
func (sm *sinkManager) Close() {
|
||||||
sm.done <- true
|
sm.done <- true
|
||||||
log.Print("[SinkManager] CLOSE")
|
cclog.ComponentDebug("SinkManager", "CLOSE")
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) {
|
func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user