diff --git a/cc-metric-collector.go b/cc-metric-collector.go index e6388df..bb5d0a0 100644 --- a/cc-metric-collector.go +++ b/cc-metric-collector.go @@ -28,6 +28,7 @@ type CentralConfigFile struct { RouterConfigFile string `json:"router"` SinkConfigFile string `json:"sinks"` ReceiverConfigFile string `json:"receivers,omitempty"` + StatsApiConfigFile string `json:"stats_api,omitempty"` } func LoadCentralConfiguration(file string, config *CentralConfigFile) error { @@ -52,6 +53,7 @@ type RuntimeConfig struct { CollectManager collectors.CollectorManager SinkManager sinks.SinkManager ReceiveManager receivers.ReceiveManager + StatsApi mr.StatsApi MultiChanTicker mct.MultiChanTicker Channels []chan lp.CCMetric @@ -152,11 +154,16 @@ func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) { cclog.Debug("Shutdown SinkManager...") config.SinkManager.Close() } + if config.StatsApi != nil { + cclog.Debug("Shutdown StatsApi...") + config.StatsApi.Close() + } } func mainFunc() int { var err error use_recv := false + use_api := false // Initialize runtime configuration rcfg := RuntimeConfig{ @@ -164,6 +171,7 @@ func mainFunc() int { CollectManager: nil, SinkManager: nil, ReceiveManager: nil, + StatsApi: nil, CliArgs: ReadCli(), } @@ -253,6 +261,16 @@ func mainFunc() int { use_recv = true } + // Create new statistics API manager + if len(rcfg.ConfigFile.StatsApiConfigFile) > 0 { + rcfg.StatsApi, err = mr.NewStatsApi(rcfg.MultiChanTicker, &rcfg.Sync, rcfg.ConfigFile.StatsApiConfigFile) + if err != nil { + cclog.Error(err.Error()) + return 1 + } + use_api = true + } + // Create shutdown handler shutdownSignal := make(chan os.Signal, 1) signal.Notify(shutdownSignal, os.Interrupt) @@ -260,6 +278,11 @@ func mainFunc() int { rcfg.Sync.Add(1) go shutdownHandler(&rcfg, shutdownSignal) + // Start the stats api early to be prepared for init settings + if use_api { + rcfg.StatsApi.Start() + } + // Start the managers rcfg.MetricRouter.Start() rcfg.SinkManager.Start() diff --git a/internal/metricRouter/StatsApi.md b/internal/metricRouter/StatsApi.md new file mode 100644 index 0000000..64fd106 --- /dev/null +++ b/internal/metricRouter/StatsApi.md @@ -0,0 +1,17 @@ +# Stats API + +The Stats API can be used for debugging. It publishes counts at an HTTP endpoint as JSON from different componenets of the CC Metric Collector. + +# Configuration + +The Stats API has an own configuration file to specify the listen host and port. The defaults are `localhost` and `8080`. + +```json +{ + "bindhost" : "", + "port" : "8080", + "publish_collectorstate" : true +} +``` + +The `bindhost` and `port` can be used to specify the listen host and port. The `publish_collectorstate` needs to be `true`, otherwise nothing is presented. This option is for future use if we need to publish more infos using different domains. \ No newline at end of file diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index f9b3faa..9c27c81 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -40,20 +40,26 @@ type metricRouterConfig struct { // Metric router data structure type metricRouter struct { - hostname string // Hostname used in tags - coll_input chan lp.CCMetric // Input channel from CollectorManager - recv_input chan lp.CCMetric // Input channel from ReceiveManager - cache_input chan lp.CCMetric // Input channel from MetricCache - outputs []chan lp.CCMetric // List of all output channels - done chan bool // channel to finish / stop metric router - wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector - timestamp time.Time // timestamp periodically updated by ticker each interval - timerdone chan bool // channel to finish / stop timestamp updater - ticker mct.MultiChanTicker // periodically ticking once each interval - config metricRouterConfig // json encoded config for metric router - cache MetricCache // pointer to MetricCache - cachewg sync.WaitGroup // wait group for MetricCache - maxForward int // number of metrics to forward maximally in one iteration + hostname string // Hostname used in tags + coll_input chan lp.CCMetric // Input channel from CollectorManager + recv_input chan lp.CCMetric // Input channel from ReceiveManager + cache_input chan lp.CCMetric // Input channel from MetricCache + outputs []chan lp.CCMetric // List of all output channels + done chan bool // channel to finish / stop metric router + wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector + timestamp time.Time // timestamp periodically updated by ticker each interval + timerdone chan bool // channel to finish / stop timestamp updater + ticker mct.MultiChanTicker // periodically ticking once each interval + config metricRouterConfig // json encoded config for metric router + cache MetricCache // pointer to MetricCache + cachewg sync.WaitGroup // wait group for MetricCache + maxForward int // number of metrics to forward maximally in one iteration + statsCollForward int64 + statsRecvForward int64 + statsCacheForward int64 + statsTotalForward int64 + statsDropped int64 + statsRenamed int64 } // MetricRouter access functions @@ -121,6 +127,12 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout for _, mname := range r.config.DropMetrics { r.config.dropMetrics[mname] = true } + r.statsCollForward = 0 + r.statsRecvForward = 0 + r.statsCacheForward = 0 + r.statsTotalForward = 0 + r.statsDropped = 0 + r.statsRenamed = 0 return nil } @@ -140,6 +152,7 @@ func (r *metricRouter) StartTimer() { cclog.ComponentDebug("MetricRouter", "TIMER DONE") return case t := <-m: + cclog.ComponentDebug("MetricRouter", "INTERVAL_TICK", t.Unix()) r.timestamp = t } } @@ -253,6 +266,8 @@ func (r *metricRouter) Start() { r.DoDelTags(point) name := point.Name() if new, ok := r.config.RenameMetrics[name]; ok { + r.statsRenamed++ + ComponentStatInt("MetricRouter", "renamed", r.statsRenamed) point.SetName(new) point.AddMeta("oldname", name) } @@ -272,7 +287,14 @@ func (r *metricRouter) Start() { p.SetTime(r.timestamp) } if !r.dropMetric(p) { + r.statsCollForward++ + r.statsTotalForward++ + ComponentStatInt("MetricRouter", "collector_forward", r.statsCollForward) + ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward) forward(p) + } else { + r.statsDropped++ + ComponentStatInt("MetricRouter", "dropped", r.statsDropped) } // even if the metric is dropped, it is stored in the cache for // aggregations @@ -288,7 +310,14 @@ func (r *metricRouter) Start() { p.SetTime(r.timestamp) } if !r.dropMetric(p) { + r.statsRecvForward++ + r.statsTotalForward++ + ComponentStatInt("MetricRouter", "receiver_forward", r.statsRecvForward) + ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward) forward(p) + } else { + r.statsDropped++ + ComponentStatInt("MetricRouter", "dropped", r.statsDropped) } } @@ -297,7 +326,14 @@ func (r *metricRouter) Start() { // receive from metric collector if !r.dropMetric(p) { p.AddTag(r.config.HostnameTagName, r.hostname) + r.statsCacheForward++ + r.statsTotalForward++ + ComponentStatInt("MetricRouter", "cache_forward", r.statsCacheForward) + ComponentStatInt("MetricRouter", "total_forward", r.statsTotalForward) forward(p) + } else { + r.statsDropped++ + ComponentStatInt("MetricRouter", "dropped", r.statsDropped) } }