diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index 73b2891..88cfdf8 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -70,17 +70,17 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat } for k, cfg := range cm.config { if _, found := AvailableCollectors[k]; !found { - cclog.ComponentPrint("CollectorManager", "SKIP unknown collector ", k) + cclog.ComponentError("CollectorManager", "SKIP unknown collector", k) continue } c := AvailableCollectors[k] err = c.Init(cfg) if err != nil { - cclog.ComponentPrint("CollectorManager", "Collector ", k, "initialization failed: ", err.Error()) + cclog.ComponentError("CollectorManager", "Collector", k, "initialization failed:", err.Error()) continue } - cclog.ComponentDebug("CollectorManager", "Collector ", k, "initialized") + cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", c.Name()) cm.collectors = append(cm.collectors, c) } return nil @@ -99,7 +99,7 @@ func (cm *collectorManager) Start() { c.Close() } cm.wg.Done() - cclog.ComponentPrint("CollectorManager", "DONE") + cclog.ComponentDebug("CollectorManager", "DONE") break CollectorManagerLoop case t := <-tick: for _, c := range cm.collectors { @@ -110,17 +110,17 @@ func (cm *collectorManager) Start() { c.Close() } cm.wg.Done() - cclog.ComponentPrint("CollectorManager", "DONE") + cclog.ComponentDebug("CollectorManager", "DONE") break CollectorManagerInputLoop default: - cclog.ComponentPrint("CollectorManager", c.Name(), " ", t) + cclog.ComponentDebug("CollectorManager", c.Name(), t) c.Read(cm.duration, cm.output) } } } } }() - cclog.ComponentPrint("CollectorManager", "STARTED") + cclog.ComponentDebug("CollectorManager", "STARTED") } func (cm *collectorManager) AddOutput(output chan lp.CCMetric) { @@ -129,7 +129,7 @@ func (cm *collectorManager) AddOutput(output chan lp.CCMetric) { func (cm *collectorManager) Close() { cm.done <- true - cclog.ComponentPrint("CollectorManager", "CLOSE") + cclog.ComponentDebug("CollectorManager", "CLOSE") } func New(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) { diff --git a/collectors/tempMetric.go b/collectors/tempMetric.go index b73d582..caa726e 100644 --- a/collectors/tempMetric.go +++ b/collectors/tempMetric.go @@ -4,7 +4,7 @@ import ( "encoding/json" "fmt" "io/ioutil" - "log" + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" "os" "path/filepath" "strconv" @@ -102,7 +102,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) { if err == nil { y, err := lp.New(strings.ToLower(mname), tags, m.meta, map[string]interface{}{"value": int(float64(x) / 1000)}, time.Now()) if err == nil { - log.Print("[", m.name, "] ", y) + cclog.ComponentDebug(m.name, y) output <- y } } diff --git a/internal/ccLogger/cclogger.go b/internal/ccLogger/cclogger.go index ad5b986..ee92376 100644 --- a/internal/ccLogger/cclogger.go +++ b/internal/ccLogger/cclogger.go @@ -21,68 +21,72 @@ var ( func initLogger() { if debugLog == nil { - debugLog = log.New(stderr, "DEBUG", log.LstdFlags) + debugLog = log.New(stderr, "DEBUG ", log.LstdFlags) } if infoLog == nil { - infoLog = log.New(stdout, "INFO", log.LstdFlags) + infoLog = log.New(stdout, "INFO ", log.LstdFlags) } if errorLog == nil { - errorLog = log.New(stderr, "ERROR", log.LstdFlags) + errorLog = log.New(stderr, "ERROR ", log.LstdFlags) } if warnLog == nil { - warnLog = log.New(stderr, "WARN", log.LstdFlags) + warnLog = log.New(stderr, "WARN ", log.LstdFlags) } if defaultLog == nil { defaultLog = log.New(stdout, "", log.LstdFlags) } } -func CCPrint(logger *log.Logger, e ... interface {}) { - if logger != nil { - logger.Print(e) - } -} - func Print(e ... interface{}) { - CCPrint(defaultLog, e) + initLogger() + defaultLog.Print(e) } func ComponentPrint(component string, e ... interface{}) { - CCPrint(defaultLog, fmt.Sprintf("[%s]", component), e) + initLogger() + defaultLog.Print(fmt.Sprintf("[%s] ", component), e) } func Info(e ... interface{}) { - CCPrint(infoLog, e) + initLogger() + infoLog.Print(e) } func ComponentInfo(component string, e ... interface{}) { - CCPrint(infoLog, fmt.Sprintf("[%s]", component), e) + initLogger() + infoLog.Print(fmt.Sprintf("[%s] ", component), e) } func Debug(e ... interface{}) { - if globalDebug { - CCPrint(debugLog, e) + initLogger() + if globalDebug == true { + debugLog.Print(e) } } func ComponentDebug(component string, e ... interface{}) { - if globalDebug { - CCPrint(debugLog, fmt.Sprintf("[%s]", component), e) + initLogger() + if globalDebug == true && debugLog != nil { + //CCComponentPrint(debugLog, component, e) + debugLog.Print(fmt.Sprintf("[%s] ", component), e) } } func Error(e ... interface{}) { + initLogger() _, fn, line, _ := runtime.Caller(1) - CCPrint(errorLog, fn, line, e) + errorLog.Print(fmt.Sprintf("[%s:%d] ", fn, line), e) } func ComponentError(component string, e ... interface{}) { + initLogger() _, fn, line, _ := runtime.Caller(1) - CCPrint(errorLog, fmt.Sprintf("[%s]", component), fn, line, e) + errorLog.Print(fmt.Sprintf("[%s|%s:%d] ", component, fn, line), e) } func SetDebug() { globalDebug = true + initLogger() } diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 25b0dc2..5fd55ba 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -2,7 +2,7 @@ package metricRouter import ( "encoding/json" - "log" + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" "os" "sync" "time" @@ -50,14 +50,14 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout r.ticker = ticker configFile, err := os.Open(routerConfigFile) if err != nil { - log.Print(err.Error()) + cclog.ComponentError("MetricRouter", err.Error()) return err } defer configFile.Close() jsonParser := json.NewDecoder(configFile) err = jsonParser.Decode(&r.config) if err != nil { - log.Print(err.Error()) + cclog.ComponentError("MetricRouter", err.Error()) return err } return nil @@ -79,7 +79,7 @@ func (r *metricRouter) StartTimer() { func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error) { expression, err := govaluate.NewEvaluableExpression(Cond) if err != nil { - log.Print(Cond, " = ", err.Error()) + cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error()) return false, err } params := make(map[string]interface{}) @@ -97,7 +97,7 @@ func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, erro result, err := expression.Evaluate(params) if err != nil { - log.Print(Cond, " = ", err.Error()) + cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error()) return false, err } return bool(result.(bool)), err @@ -113,7 +113,7 @@ func (r *metricRouter) DoAddTags(point lp.CCMetric) { var err error conditionMatches, err = r.EvalCondition(m.Condition, point) if err != nil { - log.Print(err.Error()) + cclog.ComponentError("MetricRouter", err.Error()) conditionMatches = false } } @@ -133,7 +133,7 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) { var err error conditionMatches, err = r.EvalCondition(m.Condition, point) if err != nil { - log.Print(err.Error()) + cclog.ComponentError("MetricRouter", err.Error()) conditionMatches = false } } @@ -154,7 +154,7 @@ func (r *metricRouter) Start() { RouterLoop: select { case <-r.done: - log.Print("[MetricRouter] DONE\n") + cclog.ComponentDebug("MetricRouter", "DONE") r.wg.Done() break RouterLoop default: @@ -162,11 +162,11 @@ func (r *metricRouter) Start() { RouterInputLoop: select { case <-r.done: - log.Print("[MetricRouter] DONE\n") + cclog.ComponentDebug("MetricRouter", "DONE") r.wg.Done() break RouterInputLoop case p := <-c: - log.Print("[MetricRouter] FORWARD ", p) + cclog.ComponentDebug("MetricRouter", "FORWARD", p) r.DoAddTags(p) r.DoDelTags(p) if r.config.IntervalStamp { @@ -180,9 +180,8 @@ func (r *metricRouter) Start() { } } } - log.Print("[MetricRouter] EXIT\n") }() - log.Print("[MetricRouter] STARTED\n") + cclog.ComponentDebug("MetricRouter", "STARTED") } func (r *metricRouter) AddInput(input chan lp.CCMetric) { @@ -195,7 +194,7 @@ func (r *metricRouter) AddOutput(output chan lp.CCMetric) { func (r *metricRouter) Close() { r.done <- true - log.Print("[MetricRouter] CLOSE\n") + cclog.ComponentDebug("MetricRouter", "CLOSE") } func New(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) (MetricRouter, error) { diff --git a/metric-collector.go b/metric-collector.go index 494bcbf..b3ad9d0 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -100,6 +100,7 @@ func ReadCli() map[string]string { logfile := flag.String("log", "stderr", "Path for logfile") pidfile := flag.String("pidfile", "/var/run/cc-metric-collector.pid", "Path for PID file") once := flag.Bool("once", false, "Run all collectors only once") + debug := flag.Bool("debug", false, "Activate debug output") flag.Parse() m = make(map[string]string) m["configfile"] = *cfg @@ -110,6 +111,12 @@ func ReadCli() map[string]string { } else { m["once"] = "false" } + if *debug { + m["debug"] = "true" + cclog.SetDebug() + } else { + m["debug"] = "false" + } return m } @@ -219,6 +226,10 @@ func mainFunc() int { // Drop domain part of host name rcfg.Hostname = strings.SplitN(rcfg.Hostname, `.`, 2)[0] // err = CreatePidfile(rcfg.CliArgs["pidfile"]) + + if rcfg.CliArgs["logfile"] != "stderr" { + cclog.SetOutput(rcfg.CliArgs["logfile"]) + } // err = SetLogging(rcfg.CliArgs["logfile"]) // if err != nil { // log.Print("Error setting up logging system to ", rcfg.CliArgs["logfile"], " on ", rcfg.Hostname) diff --git a/receivers/natsReceiver.go b/receivers/natsReceiver.go index 5cbe90d..853edf1 100644 --- a/receivers/natsReceiver.go +++ b/receivers/natsReceiver.go @@ -6,7 +6,7 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" influx "github.com/influxdata/line-protocol" nats "github.com/nats-io/nats.go" - "log" + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" "time" ) @@ -46,8 +46,8 @@ func (r *NatsReceiver) Init(config ReceiverConfig) error { if len(r.port) == 0 { r.port = "4222" } - log.Print("[NatsReceiver] INIT") uri := fmt.Sprintf("%s:%s", r.addr, r.port) + cclog.ComponentDebug("NatsReceiver", "INIT", uri) nc, err := nats.Connect(uri) if err == nil { r.database = r.config.Database @@ -63,7 +63,7 @@ func (r *NatsReceiver) Init(config ReceiverConfig) error { } func (r *NatsReceiver) Start() { - log.Print("[NatsReceiver] START") + cclog.ComponentDebug("NatsReceiver", "START") r.nc.Subscribe(r.database, r._NatsReceive) } @@ -75,7 +75,6 @@ func (r *NatsReceiver) _NatsReceive(m *nats.Msg) { for k, v := range r.meta { y.AddMeta(k, v) } - //y, err := lp.New(m.Name(), Tags2Map(m), r.meta, Fields2Map(m), m.Time()) if r.sink != nil { r.sink <- y } @@ -85,7 +84,7 @@ func (r *NatsReceiver) _NatsReceive(m *nats.Msg) { func (r *NatsReceiver) Close() { if r.nc != nil { - log.Print("[NatsReceiver] CLOSE") + cclog.ComponentDebug("NatsReceiver", "CLOSE") r.nc.Close() } } diff --git a/receivers/receiveManager.go b/receivers/receiveManager.go index 62f70b3..e6a2eee 100644 --- a/receivers/receiveManager.go +++ b/receivers/receiveManager.go @@ -3,7 +3,7 @@ package receivers import ( "encoding/json" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - "log" + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" "os" "sync" ) @@ -36,7 +36,7 @@ func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) er rm.config = make([]ReceiverConfig, 0) configFile, err := os.Open(receiverConfigFile) if err != nil { - log.Print(err.Error()) + cclog.ComponentError("ReceiveManager", err.Error()) return err } defer configFile.Close() @@ -44,23 +44,11 @@ func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) er var rawConfigs []json.RawMessage err = jsonParser.Decode(&rawConfigs) if err != nil { - log.Print(err.Error()) + cclog.ComponentError("ReceiveManager", err.Error()) return err } for _, raw := range rawConfigs { - log.Print("[ReceiveManager] ", string(raw)) rm.AddInput(raw) - // if _, found := AvailableReceivers[k.Type]; !found { - // log.Print("[ReceiveManager] SKIP Config specifies unknown receiver 'type': ", k.Type) - // continue - // } - // r := AvailableReceivers[k.Type] - // err = r.Init(k) - // if err != nil { - // log.Print("[ReceiveManager] SKIP Receiver ", k.Type, " cannot be initialized: ", err.Error()) - // continue - // } - // rm.inputs = append(rm.inputs, r) } return nil } @@ -69,60 +57,32 @@ func (rm *receiveManager) Start() { rm.wg.Add(1) for _, r := range rm.inputs { - log.Print("[ReceiveManager] START ", r.Name()) + cclog.ComponentDebug("ReceiveManager", "START", r.Name()) r.Start() } - log.Print("[ReceiveManager] STARTED\n") - // go func() { - // for { - //ReceiveManagerLoop: - // select { - // case <- rm.done: - // log.Print("ReceiveManager done\n") - // rm.wg.Done() - // break ReceiveManagerLoop - // default: - // for _, c := range rm.inputs { - //ReceiveManagerInputLoop: - // select { - // case <- rm.done: - // log.Print("ReceiveManager done\n") - // rm.wg.Done() - // break ReceiveManagerInputLoop - // case p := <- c: - // log.Print("ReceiveManager: ", p) - // rm.output <- p - // default: - // } - // } - // } - // } - // }() - // for _, r := range rm.inputs { - // r.Close() - // } + cclog.ComponentDebug("ReceiveManager", "STARTED") } func (rm *receiveManager) AddInput(rawConfig json.RawMessage) error { var config ReceiverConfig err := json.Unmarshal(rawConfig, &config) if err != nil { - log.Print("[ReceiveManager] SKIP ", config.Type, " JSON config error: ", err.Error()) - log.Print(err.Error()) + cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "JSON config error:", err.Error()) return err } if _, found := AvailableReceivers[config.Type]; !found { - log.Print("[ReceiveManager] SKIP ", config.Type, " unknown receiver: ", err.Error()) + cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "unknown receiver:", err.Error()) return err } r := AvailableReceivers[config.Type] err = r.Init(config) if err != nil { - log.Print("[ReceiveManager] SKIP ", r.Name(), " initialization failed: ", err.Error()) + cclog.ComponentError("ReceiveManager", "SKIP", r.Name(), "initialization failed:", err.Error()) return err } rm.inputs = append(rm.inputs, r) rm.config = append(rm.config, config) + cclog.ComponentDebug("ReceiveManager", "ADD RECEIVER", r.Name()) return nil } @@ -135,12 +95,11 @@ func (rm *receiveManager) AddOutput(output chan lp.CCMetric) { func (rm *receiveManager) Close() { for _, r := range rm.inputs { - log.Print("[ReceiveManager] CLOSE ", r.Name()) + cclog.ComponentDebug("ReceiveManager", "CLOSE", r.Name()) r.Close() } rm.wg.Done() - log.Print("[ReceiveManager] CLOSE\n") - log.Print("[ReceiveManager] EXIT\n") + cclog.ComponentDebug("ReceiveManager", "CLOSE") } func New(wg *sync.WaitGroup, receiverConfigFile string) (ReceiveManager, error) { diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index beb0f32..b2d60dc 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -2,11 +2,12 @@ package sinks import ( "encoding/json" - "log" +// "log" "os" "sync" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" ) var AvailableSinks = map[string]Sink{ @@ -41,7 +42,7 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { if len(sinkConfigFile) > 0 { configFile, err := os.Open(sinkConfigFile) if err != nil { - log.Print("[SinkManager] ", err.Error()) + cclog.ComponentError("SinkManager", err.Error()) return err } defer configFile.Close() @@ -49,7 +50,7 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { var rawConfigs []json.RawMessage err = jsonParser.Decode(&rawConfigs) if err != nil { - log.Print("[SinkManager] ", err.Error()) + cclog.ComponentError("SinkManager", err.Error()) return err } for _, raw := range rawConfigs { @@ -73,16 +74,16 @@ func (sm *sinkManager) Start() { for _, s := range sm.outputs { s.Close() } - log.Print("[SinkManager] DONE\n") + cclog.ComponentDebug("SinkManager", "DONE") sm.wg.Done() break SinkManagerLoop case p := <-sm.input: - log.Print("[SinkManager] WRITE ", p) + cclog.ComponentDebug("SinkManager", "WRITE", p) for _, s := range sm.outputs { s.Write(p) } if batchcount == 0 { - log.Print("[SinkManager] FLUSH") + cclog.ComponentDebug("SinkManager", "FLUSH") for _, s := range sm.outputs { s.Flush() } @@ -92,9 +93,8 @@ func (sm *sinkManager) Start() { default: } } - log.Print("[SinkManager] EXIT\n") }() - log.Print("[SinkManager] STARTED\n") + cclog.ComponentDebug("SinkManager", "STARTED") } func (sm *sinkManager) AddInput(input chan lp.CCMetric) { @@ -107,28 +107,29 @@ func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error { if len(rawConfig) > 3 { err = json.Unmarshal(rawConfig, &config) if err != nil { - log.Print("[SinkManager] SKIP ", config.Type, " JSON config error: ", err.Error()) + cclog.ComponentError("SinkManager", "SKIP", config.Type, "JSON config error:", err.Error()) return err } } if _, found := AvailableSinks[config.Type]; !found { - log.Print("[SinkManager] SKIP ", config.Type, " unknown sink: ", err.Error()) + cclog.ComponentError("SinkManager", "SKIP", config.Type, "unknown sink:", err.Error()) return err } s := AvailableSinks[config.Type] err = s.Init(config) if err != nil { - log.Print("[SinkManager] SKIP ", s.Name(), " initialization failed: ", err.Error()) + cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error()) return err } sm.outputs = append(sm.outputs, s) sm.config = append(sm.config, config) + cclog.ComponentDebug("SinkManager", "ADD SINK", s.Name()) return nil } func (sm *sinkManager) Close() { sm.done <- true - log.Print("[SinkManager] CLOSE") + cclog.ComponentDebug("SinkManager", "CLOSE") } func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) {