diff --git a/collectors/memstatMetric.go b/collectors/memstatMetric.go index 3998537..bd7af5d 100644 --- a/collectors/memstatMetric.go +++ b/collectors/memstatMetric.go @@ -32,11 +32,12 @@ type MemstatCollectorNode struct { type MemstatCollector struct { metricCollector - stats map[string]int64 - tags map[string]string - matches map[string]string - config MemstatCollectorConfig - nodefiles map[int]MemstatCollectorNode + stats map[string]int64 + tags map[string]string + matches map[string]string + config MemstatCollectorConfig + nodefiles map[int]MemstatCollectorNode + sendMemUsed bool } func getStats(filename string) map[string]float64 { @@ -77,7 +78,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error { return err } } - m.meta = map[string]string{"source": m.name, "group": "Memory", "unit": "kByte"} + m.meta = map[string]string{"source": m.name, "group": "Memory", "unit": "GByte"} m.stats = make(map[string]int64) m.matches = make(map[string]string) m.tags = map[string]string{"type": "node"} @@ -99,6 +100,10 @@ func (m *MemstatCollector) Init(config json.RawMessage) error { m.matches[k] = v } } + m.sendMemUsed = false + if _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used"); !skip { + m.sendMemUsed = true + } if len(m.matches) == 0 { return errors.New("no metrics to collect") } @@ -152,23 +157,26 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric) if v, ok := stats[match]; ok { value = v } - y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value}, time.Now()) + y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value * 1e-6}, time.Now()) if err == nil { output <- y } } - if _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used"); !skip { - if freeVal, free := stats["MemFree"]; free { - if bufVal, buffers := stats["Buffers"]; buffers { - if cacheVal, cached := stats["Cached"]; cached { - memUsed := stats["MemTotal"] - (freeVal + bufVal + cacheVal) - y, err := lp.New("mem_used", tags, m.meta, map[string]interface{}{"value": memUsed}, time.Now()) - if err == nil { - output <- y + if m.sendMemUsed { + memUsed := 0.0 + if totalVal, total := stats["MemTotal"]; total { + if freeVal, free := stats["MemFree"]; free { + if bufVal, buffers := stats["Buffers"]; buffers { + if cacheVal, cached := stats["Cached"]; cached { + memUsed = totalVal - (freeVal + bufVal + cacheVal) } } } } + y, err := lp.New("mem_used", tags, m.meta, map[string]interface{}{"value": memUsed * 1e-6}, time.Now()) + if err == nil { + output <- y + } } } diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index 90650ea..7ad1e7f 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -25,6 +25,7 @@ type metricRouterTagConfig struct { // Metric router configuration type metricRouterConfig struct { + HostnameTagName string `json:"hostname_tag"` // Key name used when adding the hostname to a metric (default 'hostname') AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met IntervalAgg []agg.MetricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval @@ -33,6 +34,7 @@ type metricRouterConfig struct { RenameMetrics map[string]string `json:"rename_metrics"` // Map to rename metric name from key to value IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval? NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation + MaxForward int `json:"max_forward"` // Number of maximal forwarded metrics at one select dropMetrics map[string]bool // Internal map for O(1) lookup } @@ -76,7 +78,8 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout r.cache_input = make(chan lp.CCMetric) r.wg = wg r.ticker = ticker - r.maxForward = ROUTER_MAX_FORWARD + r.config.MaxForward = ROUTER_MAX_FORWARD + r.config.HostnameTagName = "hostname" // Set hostname hostname, err := os.Hostname() @@ -100,6 +103,7 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout cclog.ComponentError("MetricRouter", err.Error()) return err } + r.maxForward = r.config.MaxForward if r.config.NumCacheIntervals > 0 { r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals) if err != nil { @@ -244,8 +248,10 @@ func (r *metricRouter) Start() { cclog.ComponentDebug("MetricRouter", "FORWARD", point) r.DoAddTags(point) r.DoDelTags(point) - if new, ok := r.config.RenameMetrics[point.Name()]; ok { + name := point.Name() + if new, ok := r.config.RenameMetrics[name]; ok { point.SetName(new) + point.AddMeta("oldname", name) } r.DoAddTags(point) r.DoDelTags(point) @@ -258,7 +264,7 @@ func (r *metricRouter) Start() { // Foward message received from collector channel coll_forward := func(p lp.CCMetric) { // receive from metric collector - p.AddTag("hostname", r.hostname) + p.AddTag(r.config.HostnameTagName, r.hostname) if r.config.IntervalStamp { p.SetTime(r.timestamp) } @@ -287,7 +293,7 @@ func (r *metricRouter) Start() { cache_forward := func(p lp.CCMetric) { // receive from metric collector if !r.dropMetric(p) { - p.AddTag("hostname", r.hostname) + p.AddTag(r.config.HostnameTagName, r.hostname) forward(p) } } @@ -309,19 +315,19 @@ func (r *metricRouter) Start() { case p := <-r.coll_input: coll_forward(p) - for i := 0; len(r.coll_input) > 0 && i < r.maxForward; i++ { + for i := 0; len(r.coll_input) > 0 && i < (r.maxForward-1); i++ { coll_forward(<-r.coll_input) } case p := <-r.recv_input: recv_forward(p) - for i := 0; len(r.recv_input) > 0 && i < r.maxForward; i++ { + for i := 0; len(r.recv_input) > 0 && i < (r.maxForward-1); i++ { recv_forward(<-r.recv_input) } case p := <-r.cache_input: cache_forward(p) - for i := 0; len(r.cache_input) > 0 && i < r.maxForward; i++ { + for i := 0; len(r.cache_input) > 0 && i < (r.maxForward-1); i++ { cache_forward(<-r.cache_input) } }