Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop

This commit is contained in:
Thomas Roehl 2022-03-09 17:02:58 +01:00
commit d835724d93
2 changed files with 36 additions and 22 deletions

View File

@ -37,6 +37,7 @@ type MemstatCollector struct {
matches map[string]string matches map[string]string
config MemstatCollectorConfig config MemstatCollectorConfig
nodefiles map[int]MemstatCollectorNode nodefiles map[int]MemstatCollectorNode
sendMemUsed bool
} }
func getStats(filename string) map[string]float64 { func getStats(filename string) map[string]float64 {
@ -77,7 +78,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
return err return err
} }
} }
m.meta = map[string]string{"source": m.name, "group": "Memory", "unit": "kByte"} m.meta = map[string]string{"source": m.name, "group": "Memory", "unit": "GByte"}
m.stats = make(map[string]int64) m.stats = make(map[string]int64)
m.matches = make(map[string]string) m.matches = make(map[string]string)
m.tags = map[string]string{"type": "node"} m.tags = map[string]string{"type": "node"}
@ -99,6 +100,10 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
m.matches[k] = v m.matches[k] = v
} }
} }
m.sendMemUsed = false
if _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used"); !skip {
m.sendMemUsed = true
}
if len(m.matches) == 0 { if len(m.matches) == 0 {
return errors.New("no metrics to collect") return errors.New("no metrics to collect")
} }
@ -152,25 +157,28 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
if v, ok := stats[match]; ok { if v, ok := stats[match]; ok {
value = v value = v
} }
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value}, time.Now()) y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value * 1e-6}, time.Now())
if err == nil { if err == nil {
output <- y output <- y
} }
} }
if _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used"); !skip { if m.sendMemUsed {
memUsed := 0.0
if totalVal, total := stats["MemTotal"]; total {
if freeVal, free := stats["MemFree"]; free { if freeVal, free := stats["MemFree"]; free {
if bufVal, buffers := stats["Buffers"]; buffers { if bufVal, buffers := stats["Buffers"]; buffers {
if cacheVal, cached := stats["Cached"]; cached { if cacheVal, cached := stats["Cached"]; cached {
memUsed := stats["MemTotal"] - (freeVal + bufVal + cacheVal) memUsed = totalVal - (freeVal + bufVal + cacheVal)
y, err := lp.New("mem_used", tags, m.meta, map[string]interface{}{"value": memUsed}, time.Now()) }
}
}
}
y, err := lp.New("mem_used", tags, m.meta, map[string]interface{}{"value": memUsed * 1e-6}, time.Now())
if err == nil { if err == nil {
output <- y output <- y
} }
} }
} }
}
}
}
if m.config.NodeStats { if m.config.NodeStats {
nodestats := getStats(MEMSTATFILE) nodestats := getStats(MEMSTATFILE)

View File

@ -25,6 +25,7 @@ type metricRouterTagConfig struct {
// Metric router configuration // Metric router configuration
type metricRouterConfig struct { type metricRouterConfig struct {
HostnameTagName string `json:"hostname_tag"` // Key name used when adding the hostname to a metric (default 'hostname')
AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met
DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met
IntervalAgg []agg.MetricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval IntervalAgg []agg.MetricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval
@ -33,6 +34,7 @@ type metricRouterConfig struct {
RenameMetrics map[string]string `json:"rename_metrics"` // Map to rename metric name from key to value RenameMetrics map[string]string `json:"rename_metrics"` // Map to rename metric name from key to value
IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval? IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval?
NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation
MaxForward int `json:"max_forward"` // Number of maximal forwarded metrics at one select
dropMetrics map[string]bool // Internal map for O(1) lookup dropMetrics map[string]bool // Internal map for O(1) lookup
} }
@ -76,7 +78,8 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
r.cache_input = make(chan lp.CCMetric) r.cache_input = make(chan lp.CCMetric)
r.wg = wg r.wg = wg
r.ticker = ticker r.ticker = ticker
r.maxForward = ROUTER_MAX_FORWARD r.config.MaxForward = ROUTER_MAX_FORWARD
r.config.HostnameTagName = "hostname"
// Set hostname // Set hostname
hostname, err := os.Hostname() hostname, err := os.Hostname()
@ -100,6 +103,7 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
cclog.ComponentError("MetricRouter", err.Error()) cclog.ComponentError("MetricRouter", err.Error())
return err return err
} }
r.maxForward = r.config.MaxForward
if r.config.NumCacheIntervals > 0 { if r.config.NumCacheIntervals > 0 {
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals) r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals)
if err != nil { if err != nil {
@ -244,8 +248,10 @@ func (r *metricRouter) Start() {
cclog.ComponentDebug("MetricRouter", "FORWARD", point) cclog.ComponentDebug("MetricRouter", "FORWARD", point)
r.DoAddTags(point) r.DoAddTags(point)
r.DoDelTags(point) r.DoDelTags(point)
if new, ok := r.config.RenameMetrics[point.Name()]; ok { name := point.Name()
if new, ok := r.config.RenameMetrics[name]; ok {
point.SetName(new) point.SetName(new)
point.AddMeta("oldname", name)
} }
r.DoAddTags(point) r.DoAddTags(point)
r.DoDelTags(point) r.DoDelTags(point)
@ -258,7 +264,7 @@ func (r *metricRouter) Start() {
// Foward message received from collector channel // Foward message received from collector channel
coll_forward := func(p lp.CCMetric) { coll_forward := func(p lp.CCMetric) {
// receive from metric collector // receive from metric collector
p.AddTag("hostname", r.hostname) p.AddTag(r.config.HostnameTagName, r.hostname)
if r.config.IntervalStamp { if r.config.IntervalStamp {
p.SetTime(r.timestamp) p.SetTime(r.timestamp)
} }
@ -287,7 +293,7 @@ func (r *metricRouter) Start() {
cache_forward := func(p lp.CCMetric) { cache_forward := func(p lp.CCMetric) {
// receive from metric collector // receive from metric collector
if !r.dropMetric(p) { if !r.dropMetric(p) {
p.AddTag("hostname", r.hostname) p.AddTag(r.config.HostnameTagName, r.hostname)
forward(p) forward(p)
} }
} }
@ -309,19 +315,19 @@ func (r *metricRouter) Start() {
case p := <-r.coll_input: case p := <-r.coll_input:
coll_forward(p) coll_forward(p)
for i := 0; len(r.coll_input) > 0 && i < r.maxForward; i++ { for i := 0; len(r.coll_input) > 0 && i < (r.maxForward-1); i++ {
coll_forward(<-r.coll_input) coll_forward(<-r.coll_input)
} }
case p := <-r.recv_input: case p := <-r.recv_input:
recv_forward(p) recv_forward(p)
for i := 0; len(r.recv_input) > 0 && i < r.maxForward; i++ { for i := 0; len(r.recv_input) > 0 && i < (r.maxForward-1); i++ {
recv_forward(<-r.recv_input) recv_forward(<-r.recv_input)
} }
case p := <-r.cache_input: case p := <-r.cache_input:
cache_forward(p) cache_forward(p)
for i := 0; len(r.cache_input) > 0 && i < r.maxForward; i++ { for i := 0; len(r.cache_input) > 0 && i < (r.maxForward-1); i++ {
cache_forward(<-r.cache_input) cache_forward(<-r.cache_input)
} }
} }