mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-07-21 04:11:41 +02:00
Compare commits
33 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
36dd440864 | ||
|
7b098e0b1b | ||
|
229a57b16a | ||
|
70a9530aba | ||
|
2f0b6057ca | ||
|
69f7c19659 | ||
|
ecdb4c1bcf | ||
|
4d5b1adbc8 | ||
|
28348bd108 | ||
|
a3b9d8a90b | ||
|
7e43e9171e | ||
|
5d25a7bf12 | ||
|
83b4343310 | ||
|
f1d3cabdc6 | ||
|
4763733d8d | ||
|
2a014b6fba | ||
|
16e898ecca | ||
|
50479f9325 | ||
|
e0e91844bc | ||
|
296225f3a8 | ||
|
43bcce6fb5 | ||
|
622e94ae0e | ||
|
c506114480 | ||
|
657543dded | ||
|
beebcd7145 | ||
|
082eea525a | ||
|
2b8266d1d2 | ||
|
d835724d93 | ||
|
c5082bbffe | ||
|
4c1263312b | ||
|
940623585c | ||
|
87ecb12c6f | ||
|
ae64eddcc8 |
@@ -18,13 +18,18 @@ import (
|
|||||||
|
|
||||||
const IB_BASEPATH = "/sys/class/infiniband/"
|
const IB_BASEPATH = "/sys/class/infiniband/"
|
||||||
|
|
||||||
|
type InfinibandCollectorMetric struct {
|
||||||
|
path string
|
||||||
|
unit string
|
||||||
|
}
|
||||||
|
|
||||||
type InfinibandCollectorInfo struct {
|
type InfinibandCollectorInfo struct {
|
||||||
LID string // IB local Identifier (LID)
|
LID string // IB local Identifier (LID)
|
||||||
device string // IB device
|
device string // IB device
|
||||||
port string // IB device port
|
port string // IB device port
|
||||||
portCounterFiles map[string]string // mapping counter name -> sysfs file
|
portCounterFiles map[string]InfinibandCollectorMetric // mapping counter name -> InfinibandCollectorMetric
|
||||||
tagSet map[string]string // corresponding tag list
|
tagSet map[string]string // corresponding tag list
|
||||||
lastState map[string]int64 // State from last measurement
|
lastState map[string]int64 // State from last measurement
|
||||||
}
|
}
|
||||||
|
|
||||||
type InfinibandCollector struct {
|
type InfinibandCollector struct {
|
||||||
@@ -106,16 +111,16 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
|
|||||||
|
|
||||||
// Check access to counter files
|
// Check access to counter files
|
||||||
countersDir := filepath.Join(path, "counters")
|
countersDir := filepath.Join(path, "counters")
|
||||||
portCounterFiles := map[string]string{
|
portCounterFiles := map[string]InfinibandCollectorMetric{
|
||||||
"ib_recv": filepath.Join(countersDir, "port_rcv_data"),
|
"ib_recv": {path: filepath.Join(countersDir, "port_rcv_data"), unit: "bytes"},
|
||||||
"ib_xmit": filepath.Join(countersDir, "port_xmit_data"),
|
"ib_xmit": {path: filepath.Join(countersDir, "port_xmit_data"), unit: "bytes"},
|
||||||
"ib_recv_pkts": filepath.Join(countersDir, "port_rcv_packets"),
|
"ib_recv_pkts": {path: filepath.Join(countersDir, "port_rcv_packets"), unit: "packets"},
|
||||||
"ib_xmit_pkts": filepath.Join(countersDir, "port_xmit_packets"),
|
"ib_xmit_pkts": {path: filepath.Join(countersDir, "port_xmit_packets"), unit: "packets"},
|
||||||
}
|
}
|
||||||
for _, counterFile := range portCounterFiles {
|
for _, counter := range portCounterFiles {
|
||||||
err := unix.Access(counterFile, unix.R_OK)
|
err := unix.Access(counter.path, unix.R_OK)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to access %s: %v", counterFile, err)
|
return fmt.Errorf("unable to access %s: %v", counter.path, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -165,14 +170,14 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
|
|||||||
m.lastTimestamp = now
|
m.lastTimestamp = now
|
||||||
|
|
||||||
for _, info := range m.info {
|
for _, info := range m.info {
|
||||||
for counterName, counterFile := range info.portCounterFiles {
|
for counterName, counterDef := range info.portCounterFiles {
|
||||||
|
|
||||||
// Read counter file
|
// Read counter file
|
||||||
line, err := ioutil.ReadFile(counterFile)
|
line, err := ioutil.ReadFile(counterDef.path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(
|
cclog.ComponentError(
|
||||||
m.name,
|
m.name,
|
||||||
fmt.Sprintf("Read(): Failed to read from file '%s': %v", counterFile, err))
|
fmt.Sprintf("Read(): Failed to read from file '%s': %v", counterDef.path, err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
data := strings.TrimSpace(string(line))
|
data := strings.TrimSpace(string(line))
|
||||||
@@ -189,6 +194,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
|
|||||||
// Send absolut values
|
// Send absolut values
|
||||||
if m.config.SendAbsoluteValues {
|
if m.config.SendAbsoluteValues {
|
||||||
if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil {
|
if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil {
|
||||||
|
y.AddMeta("unit", counterDef.unit)
|
||||||
output <- y
|
output <- y
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -198,6 +204,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr
|
|||||||
if info.lastState[counterName] >= 0 {
|
if info.lastState[counterName] >= 0 {
|
||||||
rate := float64((v - info.lastState[counterName])) / timeDiff
|
rate := float64((v - info.lastState[counterName])) / timeDiff
|
||||||
if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": rate}, now); err == nil {
|
if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": rate}, now); err == nil {
|
||||||
|
y.AddMeta("unit", counterDef.unit+"/sec")
|
||||||
output <- y
|
output <- y
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -15,8 +15,12 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
@@ -46,6 +50,16 @@ type LikwidCollectorEventsetConfig struct {
|
|||||||
Metrics []LikwidCollectorMetricConfig `json:"metrics"`
|
Metrics []LikwidCollectorMetricConfig `json:"metrics"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type LikwidEventsetConfig struct {
|
||||||
|
internal int
|
||||||
|
gid C.int
|
||||||
|
eorder []*C.char
|
||||||
|
estr *C.char
|
||||||
|
go_estr string
|
||||||
|
results map[int]map[string]interface{}
|
||||||
|
metrics map[int]map[string]float64
|
||||||
|
}
|
||||||
|
|
||||||
type LikwidCollectorConfig struct {
|
type LikwidCollectorConfig struct {
|
||||||
Eventsets []LikwidCollectorEventsetConfig `json:"eventsets"`
|
Eventsets []LikwidCollectorEventsetConfig `json:"eventsets"`
|
||||||
Metrics []LikwidCollectorMetricConfig `json:"globalmetrics,omitempty"`
|
Metrics []LikwidCollectorMetricConfig `json:"globalmetrics,omitempty"`
|
||||||
@@ -58,17 +72,18 @@ type LikwidCollectorConfig struct {
|
|||||||
|
|
||||||
type LikwidCollector struct {
|
type LikwidCollector struct {
|
||||||
metricCollector
|
metricCollector
|
||||||
cpulist []C.int
|
cpulist []C.int
|
||||||
cpu2tid map[int]int
|
cpu2tid map[int]int
|
||||||
sock2tid map[int]int
|
sock2tid map[int]int
|
||||||
metrics map[C.int]map[string]int
|
metrics map[C.int]map[string]int
|
||||||
groups []C.int
|
groups []C.int
|
||||||
config LikwidCollectorConfig
|
config LikwidCollectorConfig
|
||||||
results map[int]map[int]map[string]interface{}
|
gmresults map[int]map[string]float64
|
||||||
mresults map[int]map[int]map[string]float64
|
basefreq float64
|
||||||
gmresults map[int]map[string]float64
|
running bool
|
||||||
basefreq float64
|
initialized bool
|
||||||
running bool
|
likwidGroups map[C.int]LikwidEventsetConfig
|
||||||
|
lock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type LikwidMetric struct {
|
type LikwidMetric struct {
|
||||||
@@ -86,14 +101,60 @@ func eventsToEventStr(events map[string]string) string {
|
|||||||
return strings.Join(elist, ",")
|
return strings.Join(elist, ",")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func genLikwidEventSet(input LikwidCollectorEventsetConfig) LikwidEventsetConfig {
|
||||||
|
tmplist := make([]string, 0)
|
||||||
|
clist := make([]string, 0)
|
||||||
|
for k := range input.Events {
|
||||||
|
clist = append(clist, k)
|
||||||
|
}
|
||||||
|
sort.Strings(clist)
|
||||||
|
elist := make([]*C.char, 0)
|
||||||
|
for _, k := range clist {
|
||||||
|
v := input.Events[k]
|
||||||
|
tmplist = append(tmplist, fmt.Sprintf("%s:%s", v, k))
|
||||||
|
c_counter := C.CString(k)
|
||||||
|
elist = append(elist, c_counter)
|
||||||
|
}
|
||||||
|
estr := strings.Join(tmplist, ",")
|
||||||
|
res := make(map[int]map[string]interface{})
|
||||||
|
met := make(map[int]map[string]float64)
|
||||||
|
for _, i := range topo.CpuList() {
|
||||||
|
res[i] = make(map[string]interface{})
|
||||||
|
for k := range input.Events {
|
||||||
|
res[i][k] = 0.0
|
||||||
|
}
|
||||||
|
met[i] = make(map[string]float64)
|
||||||
|
for _, v := range input.Metrics {
|
||||||
|
res[i][v.Name] = 0.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return LikwidEventsetConfig{
|
||||||
|
gid: -1,
|
||||||
|
eorder: elist,
|
||||||
|
estr: C.CString(estr),
|
||||||
|
go_estr: estr,
|
||||||
|
results: res,
|
||||||
|
metrics: met,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testLikwidMetricFormula(formula string, params []string) bool {
|
||||||
|
myparams := make(map[string]interface{})
|
||||||
|
for _, p := range params {
|
||||||
|
myparams[p] = float64(1.0)
|
||||||
|
}
|
||||||
|
_, err := agg.EvalFloat64Condition(formula, myparams)
|
||||||
|
return err == nil
|
||||||
|
}
|
||||||
|
|
||||||
func getBaseFreq() float64 {
|
func getBaseFreq() float64 {
|
||||||
|
files := []string{
|
||||||
|
"/sys/devices/system/cpu/cpu0/cpufreq/bios_limit",
|
||||||
|
"/sys/devices/system/cpu/cpu0/cpufreq/base_frequency",
|
||||||
|
}
|
||||||
var freq float64 = math.NaN()
|
var freq float64 = math.NaN()
|
||||||
C.power_init(0)
|
for _, f := range files {
|
||||||
info := C.get_powerInfo()
|
buffer, err := ioutil.ReadFile(f)
|
||||||
if float64(info.baseFrequency) != 0 {
|
|
||||||
freq = float64(info.baseFrequency) * 1e6
|
|
||||||
} else {
|
|
||||||
buffer, err := ioutil.ReadFile("/sys/devices/system/cpu/cpu0/cpufreq/bios_limit")
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
data := strings.Replace(string(buffer), "\n", "", -1)
|
data := strings.Replace(string(buffer), "\n", "", -1)
|
||||||
x, err := strconv.ParseInt(data, 0, 64)
|
x, err := strconv.ParseInt(data, 0, 64)
|
||||||
@@ -102,12 +163,22 @@ func getBaseFreq() float64 {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if math.IsNaN(freq) {
|
||||||
|
C.power_init(0)
|
||||||
|
info := C.get_powerInfo()
|
||||||
|
if float64(info.baseFrequency) != 0 {
|
||||||
|
freq = float64(info.baseFrequency) * 1e6
|
||||||
|
}
|
||||||
|
C.power_finalize()
|
||||||
|
}
|
||||||
return freq
|
return freq
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *LikwidCollector) Init(config json.RawMessage) error {
|
func (m *LikwidCollector) Init(config json.RawMessage) error {
|
||||||
var ret C.int
|
|
||||||
m.name = "LikwidCollector"
|
m.name = "LikwidCollector"
|
||||||
|
m.initialized = false
|
||||||
|
m.running = false
|
||||||
m.config.AccessMode = LIKWID_DEF_ACCESSMODE
|
m.config.AccessMode = LIKWID_DEF_ACCESSMODE
|
||||||
m.config.LibraryPath = LIKWID_LIB_NAME
|
m.config.LibraryPath = LIKWID_LIB_NAME
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
@@ -131,7 +202,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
|||||||
}
|
}
|
||||||
m.setup()
|
m.setup()
|
||||||
|
|
||||||
m.meta = map[string]string{"source": m.name, "group": "PerfCounter"}
|
m.meta = map[string]string{"group": "PerfCounter"}
|
||||||
cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists")
|
cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists")
|
||||||
cpulist := topo.CpuList()
|
cpulist := topo.CpuList()
|
||||||
m.cpulist = make([]C.int, len(cpulist))
|
m.cpulist = make([]C.int, len(cpulist))
|
||||||
@@ -140,172 +211,136 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
|
|||||||
m.cpulist[i] = C.int(c)
|
m.cpulist[i] = C.int(c)
|
||||||
m.cpu2tid[c] = i
|
m.cpu2tid[c] = i
|
||||||
}
|
}
|
||||||
m.sock2tid = make(map[int]int)
|
|
||||||
tmp := make([]C.int, 1)
|
m.likwidGroups = make(map[C.int]LikwidEventsetConfig)
|
||||||
for _, sid := range topo.SocketList() {
|
|
||||||
cstr := C.CString(fmt.Sprintf("S%d:0", sid))
|
// m.results = make(map[int]map[int]map[string]interface{})
|
||||||
ret = C.cpustr_to_cpulist(cstr, &tmp[0], 1)
|
// m.mresults = make(map[int]map[int]map[string]float64)
|
||||||
if ret > 0 {
|
|
||||||
m.sock2tid[sid] = m.cpu2tid[int(tmp[0])]
|
|
||||||
}
|
|
||||||
C.free(unsafe.Pointer(cstr))
|
|
||||||
}
|
|
||||||
m.results = make(map[int]map[int]map[string]interface{})
|
|
||||||
m.mresults = make(map[int]map[int]map[string]float64)
|
|
||||||
m.gmresults = make(map[int]map[string]float64)
|
m.gmresults = make(map[int]map[string]float64)
|
||||||
cclog.ComponentDebug(m.name, "initialize LIKWID topology")
|
for _, tid := range m.cpu2tid {
|
||||||
ret = C.topology_init()
|
m.gmresults[tid] = make(map[string]float64)
|
||||||
if ret != 0 {
|
|
||||||
err := errors.New("failed to initialize LIKWID topology")
|
|
||||||
cclog.ComponentError(m.name, err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch m.config.AccessMode {
|
|
||||||
case "direct":
|
|
||||||
C.HPMmode(0)
|
|
||||||
case "accessdaemon":
|
|
||||||
if len(m.config.DaemonPath) > 0 {
|
|
||||||
p := os.Getenv("PATH")
|
|
||||||
os.Setenv("PATH", m.config.DaemonPath+":"+p)
|
|
||||||
}
|
|
||||||
C.HPMmode(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module")
|
|
||||||
ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
|
|
||||||
if ret != 0 {
|
|
||||||
C.topology_finalize()
|
|
||||||
err := errors.New("failed to initialize LIKWID topology")
|
|
||||||
cclog.ComponentError(m.name, err.Error())
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is for the global metrics computation test
|
// This is for the global metrics computation test
|
||||||
globalParams := make(map[string]interface{})
|
totalMetrics := 0
|
||||||
globalParams["time"] = float64(1.0)
|
// Generate parameter list for the metric computing test
|
||||||
globalParams["inverseClock"] = float64(1.0)
|
params := make([]string, 0)
|
||||||
// While adding the events, we test the metrics whether they can be computed at all
|
params = append(params, "time", "inverseClock")
|
||||||
for i, evset := range m.config.Eventsets {
|
// Generate parameter list for the global metric computing test
|
||||||
var gid C.int
|
globalParams := make([]string, 0)
|
||||||
var cstr *C.char
|
globalParams = append(globalParams, "time", "inverseClock")
|
||||||
|
// We test the eventset metrics whether they can be computed at all
|
||||||
|
for _, evset := range m.config.Eventsets {
|
||||||
if len(evset.Events) > 0 {
|
if len(evset.Events) > 0 {
|
||||||
estr := eventsToEventStr(evset.Events)
|
params = params[:2]
|
||||||
// Generate parameter list for the metric computing test
|
|
||||||
params := make(map[string]interface{})
|
|
||||||
params["time"] = float64(1.0)
|
|
||||||
params["inverseClock"] = float64(1.0)
|
|
||||||
for counter := range evset.Events {
|
for counter := range evset.Events {
|
||||||
params[counter] = float64(1.0)
|
params = append(params, counter)
|
||||||
}
|
}
|
||||||
for _, metric := range evset.Metrics {
|
for _, metric := range evset.Metrics {
|
||||||
// Try to evaluate the metric
|
// Try to evaluate the metric
|
||||||
_, err := agg.EvalFloat64Condition(metric.Calc, params)
|
if testLikwidMetricFormula(metric.Calc, params) {
|
||||||
if err != nil {
|
// Add the computable metric to the parameter list for the global metrics
|
||||||
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
|
globalParams = append(globalParams, metric.Name)
|
||||||
continue
|
totalMetrics++
|
||||||
}
|
} else {
|
||||||
// If the metric is not in the parameter list for the global metrics, add it
|
metric.Calc = ""
|
||||||
if _, ok := globalParams[metric.Name]; !ok {
|
|
||||||
globalParams[metric.Name] = float64(1.0)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Now we add the list of events to likwid
|
|
||||||
cstr = C.CString(estr)
|
|
||||||
gid = C.perfmon_addEventSet(cstr)
|
|
||||||
} else {
|
} else {
|
||||||
cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given")
|
cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if gid >= 0 {
|
|
||||||
m.groups = append(m.groups, gid)
|
|
||||||
}
|
|
||||||
C.free(unsafe.Pointer(cstr))
|
|
||||||
m.results[i] = make(map[int]map[string]interface{})
|
|
||||||
m.mresults[i] = make(map[int]map[string]float64)
|
|
||||||
for tid := range m.cpulist {
|
|
||||||
m.results[i][tid] = make(map[string]interface{})
|
|
||||||
m.mresults[i][tid] = make(map[string]float64)
|
|
||||||
if i == 0 {
|
|
||||||
m.gmresults[tid] = make(map[string]float64)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
for _, metric := range m.config.Metrics {
|
for _, metric := range m.config.Metrics {
|
||||||
// Try to evaluate the global metric
|
// Try to evaluate the global metric
|
||||||
_, err := agg.EvalFloat64Condition(metric.Calc, globalParams)
|
if !testLikwidMetricFormula(metric.Calc, globalParams) {
|
||||||
if err != nil {
|
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed")
|
||||||
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
|
metric.Calc = ""
|
||||||
continue
|
} else {
|
||||||
|
totalMetrics++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If no event set could be added, shut down LikwidCollector
|
// If no event set could be added, shut down LikwidCollector
|
||||||
if len(m.groups) == 0 {
|
if totalMetrics == 0 {
|
||||||
C.perfmon_finalize()
|
err := errors.New("no LIKWID eventset or metric usable")
|
||||||
C.topology_finalize()
|
|
||||||
err := errors.New("no LIKWID performance group initialized")
|
|
||||||
cclog.ComponentError(m.name, err.Error())
|
cclog.ComponentError(m.name, err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m.basefreq = getBaseFreq()
|
|
||||||
cclog.ComponentDebug(m.name, "BaseFreq", m.basefreq)
|
|
||||||
m.init = true
|
m.init = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// take a measurement for 'interval' seconds of event set index 'group'
|
// take a measurement for 'interval' seconds of event set index 'group'
|
||||||
func (m *LikwidCollector) takeMeasurement(group int, interval time.Duration) error {
|
func (m *LikwidCollector) takeMeasurement(evset LikwidEventsetConfig, interval time.Duration) (bool, error) {
|
||||||
var ret C.int
|
var ret C.int
|
||||||
gid := m.groups[group]
|
m.lock.Lock()
|
||||||
ret = C.perfmon_setupCounters(gid)
|
if m.initialized {
|
||||||
if ret != 0 {
|
ret = C.perfmon_setupCounters(evset.gid)
|
||||||
gctr := C.GoString(C.perfmon_getGroupName(gid))
|
if ret != 0 {
|
||||||
err := fmt.Errorf("failed to setup performance group %d (%s)", gid, gctr)
|
var err error = nil
|
||||||
return err
|
var skip bool = false
|
||||||
|
if ret == -37 {
|
||||||
|
skip = true
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("failed to setup performance group %d", evset.gid)
|
||||||
|
}
|
||||||
|
m.lock.Unlock()
|
||||||
|
return skip, err
|
||||||
|
}
|
||||||
|
ret = C.perfmon_startCounters()
|
||||||
|
if ret != 0 {
|
||||||
|
var err error = nil
|
||||||
|
var skip bool = false
|
||||||
|
if ret == -37 {
|
||||||
|
skip = true
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("failed to setup performance group %d", evset.gid)
|
||||||
|
}
|
||||||
|
m.lock.Unlock()
|
||||||
|
return skip, err
|
||||||
|
}
|
||||||
|
m.running = true
|
||||||
|
time.Sleep(interval)
|
||||||
|
m.running = false
|
||||||
|
ret = C.perfmon_stopCounters()
|
||||||
|
if ret != 0 {
|
||||||
|
var err error = nil
|
||||||
|
var skip bool = false
|
||||||
|
if ret == -37 {
|
||||||
|
skip = true
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("failed to setup performance group %d", evset.gid)
|
||||||
|
}
|
||||||
|
m.lock.Unlock()
|
||||||
|
return skip, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ret = C.perfmon_startCounters()
|
m.lock.Unlock()
|
||||||
if ret != 0 {
|
return false, nil
|
||||||
gctr := C.GoString(C.perfmon_getGroupName(gid))
|
|
||||||
err := fmt.Errorf("failed to start performance group %d (%s)", gid, gctr)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
m.running = true
|
|
||||||
time.Sleep(interval)
|
|
||||||
m.running = false
|
|
||||||
ret = C.perfmon_stopCounters()
|
|
||||||
if ret != 0 {
|
|
||||||
gctr := C.GoString(C.perfmon_getGroupName(gid))
|
|
||||||
err := fmt.Errorf("failed to stop performance group %d (%s)", gid, gctr)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get all measurement results for an event set, derive the metric values out of the measurement results and send it
|
// Get all measurement results for an event set, derive the metric values out of the measurement results and send it
|
||||||
func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration, output chan lp.CCMetric) error {
|
func (m *LikwidCollector) calcEventsetMetrics(evset LikwidEventsetConfig, interval time.Duration, output chan lp.CCMetric) error {
|
||||||
var eidx C.int
|
|
||||||
evset := m.config.Eventsets[group]
|
|
||||||
gid := m.groups[group]
|
|
||||||
invClock := float64(1.0 / m.basefreq)
|
invClock := float64(1.0 / m.basefreq)
|
||||||
|
|
||||||
// Go over events and get the results
|
// Go over events and get the results
|
||||||
for eidx = 0; int(eidx) < len(evset.Events); eidx++ {
|
for eidx, counter := range evset.eorder {
|
||||||
ctr := C.perfmon_getCounterName(gid, eidx)
|
gctr := C.GoString(counter)
|
||||||
gctr := C.GoString(ctr)
|
|
||||||
|
|
||||||
for _, tid := range m.cpu2tid {
|
for _, tid := range m.cpu2tid {
|
||||||
if tid >= 0 {
|
res := C.perfmon_getLastResult(evset.gid, C.int(eidx), C.int(tid))
|
||||||
m.results[group][tid]["time"] = interval.Seconds()
|
fres := float64(res)
|
||||||
m.results[group][tid]["inverseClock"] = invClock
|
if m.config.InvalidToZero && (math.IsNaN(fres) || math.IsInf(fres, 0)) {
|
||||||
res := C.perfmon_getLastResult(gid, eidx, C.int(tid))
|
fres = 0.0
|
||||||
m.results[group][tid][gctr] = float64(res)
|
|
||||||
}
|
}
|
||||||
|
evset.results[tid][gctr] = fres
|
||||||
|
evset.results[tid]["time"] = interval.Seconds()
|
||||||
|
evset.results[tid]["inverseClock"] = invClock
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Go over the event set metrics, derive the value out of the event:counter values and send it
|
// Go over the event set metrics, derive the value out of the event:counter values and send it
|
||||||
for _, metric := range evset.Metrics {
|
for _, metric := range m.config.Eventsets[evset.internal].Metrics {
|
||||||
// The metric scope is determined in the Init() function
|
// The metric scope is determined in the Init() function
|
||||||
// Get the map scope-id -> tids
|
// Get the map scope-id -> tids
|
||||||
scopemap := m.cpu2tid
|
scopemap := m.cpu2tid
|
||||||
@@ -313,19 +348,16 @@ func (m *LikwidCollector) calcEventsetMetrics(group int, interval time.Duration,
|
|||||||
scopemap = m.sock2tid
|
scopemap = m.sock2tid
|
||||||
}
|
}
|
||||||
for domain, tid := range scopemap {
|
for domain, tid := range scopemap {
|
||||||
if tid >= 0 {
|
if tid >= 0 && len(metric.Calc) > 0 {
|
||||||
value, err := agg.EvalFloat64Condition(metric.Calc, m.results[group][tid])
|
value, err := agg.EvalFloat64Condition(metric.Calc, evset.results[tid])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
|
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
|
||||||
continue
|
|
||||||
}
|
|
||||||
m.mresults[group][tid][metric.Name] = value
|
|
||||||
if m.config.InvalidToZero && math.IsNaN(value) {
|
|
||||||
value = 0.0
|
value = 0.0
|
||||||
}
|
}
|
||||||
if m.config.InvalidToZero && math.IsInf(value, 0) {
|
if m.config.InvalidToZero && (math.IsNaN(value) || math.IsInf(value, 0)) {
|
||||||
value = 0.0
|
value = 0.0
|
||||||
}
|
}
|
||||||
|
evset.metrics[tid][metric.Name] = value
|
||||||
// Now we have the result, send it with the proper tags
|
// Now we have the result, send it with the proper tags
|
||||||
if !math.IsNaN(value) {
|
if !math.IsNaN(value) {
|
||||||
if metric.Publish {
|
if metric.Publish {
|
||||||
@@ -360,8 +392,8 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
|
|||||||
if tid >= 0 {
|
if tid >= 0 {
|
||||||
// Here we generate parameter list
|
// Here we generate parameter list
|
||||||
params := make(map[string]interface{})
|
params := make(map[string]interface{})
|
||||||
for j := range m.groups {
|
for _, evset := range m.likwidGroups {
|
||||||
for mname, mres := range m.mresults[j][tid] {
|
for mname, mres := range evset.metrics[tid] {
|
||||||
params[mname] = mres
|
params[mname] = mres
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -369,15 +401,12 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
|
|||||||
value, err := agg.EvalFloat64Condition(metric.Calc, params)
|
value, err := agg.EvalFloat64Condition(metric.Calc, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
|
cclog.ComponentError(m.name, "Calculation for metric", metric.Name, "failed:", err.Error())
|
||||||
continue
|
value = 0.0
|
||||||
|
}
|
||||||
|
if m.config.InvalidToZero && (math.IsNaN(value) || math.IsInf(value, 0)) {
|
||||||
|
value = 0.0
|
||||||
}
|
}
|
||||||
m.gmresults[tid][metric.Name] = value
|
m.gmresults[tid][metric.Name] = value
|
||||||
if m.config.InvalidToZero && math.IsNaN(value) {
|
|
||||||
value = 0.0
|
|
||||||
}
|
|
||||||
if m.config.InvalidToZero && math.IsInf(value, 0) {
|
|
||||||
value = 0.0
|
|
||||||
}
|
|
||||||
// Now we have the result, send it with the proper tags
|
// Now we have the result, send it with the proper tags
|
||||||
if !math.IsNaN(value) {
|
if !math.IsNaN(value) {
|
||||||
if metric.Publish {
|
if metric.Publish {
|
||||||
@@ -401,38 +430,163 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *LikwidCollector) LateInit() error {
|
||||||
|
var ret C.int
|
||||||
|
if m.initialized {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
switch m.config.AccessMode {
|
||||||
|
case "direct":
|
||||||
|
C.HPMmode(0)
|
||||||
|
case "accessdaemon":
|
||||||
|
if len(m.config.DaemonPath) > 0 {
|
||||||
|
p := os.Getenv("PATH")
|
||||||
|
os.Setenv("PATH", m.config.DaemonPath+":"+p)
|
||||||
|
}
|
||||||
|
C.HPMmode(1)
|
||||||
|
}
|
||||||
|
cclog.ComponentDebug(m.name, "initialize LIKWID topology")
|
||||||
|
ret = C.topology_init()
|
||||||
|
if ret != 0 {
|
||||||
|
err := errors.New("failed to initialize LIKWID topology")
|
||||||
|
cclog.ComponentError(m.name, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
m.sock2tid = make(map[int]int)
|
||||||
|
tmp := make([]C.int, 1)
|
||||||
|
for _, sid := range topo.SocketList() {
|
||||||
|
cstr := C.CString(fmt.Sprintf("S%d:0", sid))
|
||||||
|
ret = C.cpustr_to_cpulist(cstr, &tmp[0], 1)
|
||||||
|
if ret > 0 {
|
||||||
|
m.sock2tid[sid] = m.cpu2tid[int(tmp[0])]
|
||||||
|
}
|
||||||
|
C.free(unsafe.Pointer(cstr))
|
||||||
|
}
|
||||||
|
|
||||||
|
m.basefreq = getBaseFreq()
|
||||||
|
cclog.ComponentDebug(m.name, "BaseFreq", m.basefreq)
|
||||||
|
|
||||||
|
cclog.ComponentDebug(m.name, "initialize LIKWID perfmon module")
|
||||||
|
ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0])
|
||||||
|
if ret != 0 {
|
||||||
|
var err error = nil
|
||||||
|
C.topology_finalize()
|
||||||
|
if ret != -22 {
|
||||||
|
err = errors.New("failed to initialize LIKWID perfmon")
|
||||||
|
cclog.ComponentError(m.name, err.Error())
|
||||||
|
} else {
|
||||||
|
err = errors.New("access to LIKWID perfmon locked")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// While adding the events, we test the metrics whether they can be computed at all
|
||||||
|
for i, evset := range m.config.Eventsets {
|
||||||
|
var gid C.int
|
||||||
|
if len(evset.Events) > 0 {
|
||||||
|
skip := false
|
||||||
|
likwidGroup := genLikwidEventSet(evset)
|
||||||
|
for _, g := range m.likwidGroups {
|
||||||
|
if likwidGroup.go_estr == g.go_estr {
|
||||||
|
skip = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if skip {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Now we add the list of events to likwid
|
||||||
|
gid = C.perfmon_addEventSet(likwidGroup.estr)
|
||||||
|
if gid >= 0 {
|
||||||
|
likwidGroup.gid = gid
|
||||||
|
likwidGroup.internal = i
|
||||||
|
m.likwidGroups[gid] = likwidGroup
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cclog.ComponentError(m.name, "Invalid Likwid eventset config, no events given")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no event set could be added, shut down LikwidCollector
|
||||||
|
if len(m.likwidGroups) == 0 {
|
||||||
|
C.perfmon_finalize()
|
||||||
|
C.topology_finalize()
|
||||||
|
err := errors.New("no LIKWID performance group initialized")
|
||||||
|
cclog.ComponentError(m.name, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sigchan := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigchan, syscall.SIGCHLD)
|
||||||
|
signal.Notify(sigchan, os.Interrupt)
|
||||||
|
go func() {
|
||||||
|
<-sigchan
|
||||||
|
|
||||||
|
signal.Stop(sigchan)
|
||||||
|
m.initialized = false
|
||||||
|
}()
|
||||||
|
m.initialized = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// main read function taking multiple measurement rounds, each 'interval' seconds long
|
// main read function taking multiple measurement rounds, each 'interval' seconds long
|
||||||
func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||||
|
var skip bool = false
|
||||||
|
var err error
|
||||||
if !m.init {
|
if !m.init {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range m.groups {
|
if !m.initialized {
|
||||||
// measure event set 'i' for 'interval' seconds
|
m.lock.Lock()
|
||||||
err := m.takeMeasurement(i, interval)
|
err = m.LateInit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, err.Error())
|
m.lock.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// read measurements and derive event set metrics
|
m.initialized = true
|
||||||
m.calcEventsetMetrics(i, interval, output)
|
m.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.initialized && !skip {
|
||||||
|
for _, evset := range m.likwidGroups {
|
||||||
|
if !skip {
|
||||||
|
// measure event set 'i' for 'interval' seconds
|
||||||
|
skip, err = m.takeMeasurement(evset, interval)
|
||||||
|
if err != nil {
|
||||||
|
cclog.ComponentError(m.name, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !skip {
|
||||||
|
// read measurements and derive event set metrics
|
||||||
|
m.calcEventsetMetrics(evset, interval, output)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !skip {
|
||||||
|
// use the event set metrics to derive the global metrics
|
||||||
|
m.calcGlobalMetrics(interval, output)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// use the event set metrics to derive the global metrics
|
|
||||||
m.calcGlobalMetrics(interval, output)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *LikwidCollector) Close() {
|
func (m *LikwidCollector) Close() {
|
||||||
if m.init {
|
if m.init {
|
||||||
cclog.ComponentDebug(m.name, "Closing ...")
|
|
||||||
m.init = false
|
m.init = false
|
||||||
if m.running {
|
cclog.ComponentDebug(m.name, "Closing ...")
|
||||||
cclog.ComponentDebug(m.name, "Stopping counters")
|
m.lock.Lock()
|
||||||
C.perfmon_stopCounters()
|
if m.initialized {
|
||||||
|
cclog.ComponentDebug(m.name, "Finalize LIKWID perfmon module")
|
||||||
|
C.perfmon_finalize()
|
||||||
|
m.initialized = false
|
||||||
}
|
}
|
||||||
cclog.ComponentDebug(m.name, "Finalize LIKWID perfmon module")
|
m.lock.Unlock()
|
||||||
C.perfmon_finalize()
|
|
||||||
cclog.ComponentDebug(m.name, "Finalize LIKWID topology module")
|
cclog.ComponentDebug(m.name, "Finalize LIKWID topology module")
|
||||||
C.topology_finalize()
|
C.topology_finalize()
|
||||||
|
|
||||||
cclog.ComponentDebug(m.name, "Closing done")
|
cclog.ComponentDebug(m.name, "Closing done")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -40,8 +40,13 @@ type MemstatCollector struct {
|
|||||||
sendMemUsed bool
|
sendMemUsed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func getStats(filename string) map[string]float64 {
|
type MemstatStats struct {
|
||||||
stats := make(map[string]float64)
|
value float64
|
||||||
|
unit string
|
||||||
|
}
|
||||||
|
|
||||||
|
func getStats(filename string) map[string]MemstatStats {
|
||||||
|
stats := make(map[string]MemstatStats)
|
||||||
file, err := os.Open(filename)
|
file, err := os.Open(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.Error(err.Error())
|
cclog.Error(err.Error())
|
||||||
@@ -55,12 +60,18 @@ func getStats(filename string) map[string]float64 {
|
|||||||
if len(linefields) == 3 {
|
if len(linefields) == 3 {
|
||||||
v, err := strconv.ParseFloat(linefields[1], 64)
|
v, err := strconv.ParseFloat(linefields[1], 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
stats[strings.Trim(linefields[0], ":")] = v
|
stats[strings.Trim(linefields[0], ":")] = MemstatStats{
|
||||||
|
value: v,
|
||||||
|
unit: linefields[2],
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if len(linefields) == 5 {
|
} else if len(linefields) == 5 {
|
||||||
v, err := strconv.ParseFloat(linefields[3], 64)
|
v, err := strconv.ParseFloat(linefields[3], 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
stats[strings.Trim(linefields[0], ":")] = v
|
stats[strings.Trim(linefields[0], ":")] = MemstatStats{
|
||||||
|
value: v,
|
||||||
|
unit: linefields[4],
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -78,7 +89,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m.meta = map[string]string{"source": m.name, "group": "Memory", "unit": "GByte"}
|
m.meta = map[string]string{"source": m.name, "group": "Memory"}
|
||||||
m.stats = make(map[string]int64)
|
m.stats = make(map[string]int64)
|
||||||
m.matches = make(map[string]string)
|
m.matches = make(map[string]string)
|
||||||
m.tags = map[string]string{"type": "node"}
|
m.tags = map[string]string{"type": "node"}
|
||||||
@@ -151,30 +162,51 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
sendStats := func(stats map[string]float64, tags map[string]string) {
|
sendStats := func(stats map[string]MemstatStats, tags map[string]string) {
|
||||||
for match, name := range m.matches {
|
for match, name := range m.matches {
|
||||||
var value float64 = 0
|
var value float64 = 0
|
||||||
|
var unit string = ""
|
||||||
if v, ok := stats[match]; ok {
|
if v, ok := stats[match]; ok {
|
||||||
value = v
|
value = v.value
|
||||||
|
if len(v.unit) > 0 {
|
||||||
|
unit = v.unit
|
||||||
|
}
|
||||||
}
|
}
|
||||||
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value * 1e-6}, time.Now())
|
|
||||||
|
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value}, time.Now())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
if len(unit) > 0 {
|
||||||
|
y.AddMeta("unit", unit)
|
||||||
|
}
|
||||||
output <- y
|
output <- y
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if m.sendMemUsed {
|
if m.sendMemUsed {
|
||||||
memUsed := 0.0
|
memUsed := 0.0
|
||||||
|
unit := ""
|
||||||
if totalVal, total := stats["MemTotal"]; total {
|
if totalVal, total := stats["MemTotal"]; total {
|
||||||
if freeVal, free := stats["MemFree"]; free {
|
if freeVal, free := stats["MemFree"]; free {
|
||||||
if bufVal, buffers := stats["Buffers"]; buffers {
|
if bufVal, buffers := stats["Buffers"]; buffers {
|
||||||
if cacheVal, cached := stats["Cached"]; cached {
|
if cacheVal, cached := stats["Cached"]; cached {
|
||||||
memUsed = totalVal - (freeVal + bufVal + cacheVal)
|
memUsed = totalVal.value - (freeVal.value + bufVal.value + cacheVal.value)
|
||||||
|
if len(totalVal.unit) > 0 {
|
||||||
|
unit = totalVal.unit
|
||||||
|
} else if len(freeVal.unit) > 0 {
|
||||||
|
unit = freeVal.unit
|
||||||
|
} else if len(bufVal.unit) > 0 {
|
||||||
|
unit = bufVal.unit
|
||||||
|
} else if len(cacheVal.unit) > 0 {
|
||||||
|
unit = cacheVal.unit
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
y, err := lp.New("mem_used", tags, m.meta, map[string]interface{}{"value": memUsed * 1e-6}, time.Now())
|
y, err := lp.New("mem_used", tags, m.meta, map[string]interface{}{"value": memUsed}, time.Now())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
if len(unit) > 0 {
|
||||||
|
y.AddMeta("unit", unit)
|
||||||
|
}
|
||||||
output <- y
|
output <- y
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -36,7 +36,7 @@ type nfsCollector struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *nfsCollector) initStats() error {
|
func (m *nfsCollector) initStats() error {
|
||||||
cmd := exec.Command(m.config.Nfsstats, `-l`)
|
cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`)
|
||||||
cmd.Wait()
|
cmd.Wait()
|
||||||
buffer, err := cmd.Output()
|
buffer, err := cmd.Output()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@@ -52,7 +52,7 @@ func (m *nfsCollector) initStats() error {
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
x := m.data[name]
|
x := m.data[name]
|
||||||
x.current = value
|
x.current = value
|
||||||
x.last = 0
|
x.last = value
|
||||||
m.data[name] = x
|
m.data[name] = x
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -63,7 +63,7 @@ func (m *nfsCollector) initStats() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *nfsCollector) updateStats() error {
|
func (m *nfsCollector) updateStats() error {
|
||||||
cmd := exec.Command(m.config.Nfsstats, `-l`)
|
cmd := exec.Command(m.config.Nfsstats, `-l`, `--all`)
|
||||||
cmd.Wait()
|
cmd.Wait()
|
||||||
buffer, err := cmd.Output()
|
buffer, err := cmd.Output()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@@ -169,7 +169,10 @@ func DieList() []int {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return dielist
|
if len(dielist) > 0 {
|
||||||
|
return dielist
|
||||||
|
}
|
||||||
|
return SocketList()
|
||||||
}
|
}
|
||||||
|
|
||||||
type CpuEntry struct {
|
type CpuEntry struct {
|
||||||
@@ -261,7 +264,7 @@ func CpuData() []CpuEntry {
|
|||||||
for _, c := range CpuList() {
|
for _, c := range CpuList() {
|
||||||
clist = append(clist, CpuEntry{Cpuid: c})
|
clist = append(clist, CpuEntry{Cpuid: c})
|
||||||
}
|
}
|
||||||
for _, centry := range clist {
|
for i, centry := range clist {
|
||||||
centry.Socket = -1
|
centry.Socket = -1
|
||||||
centry.Numadomain = -1
|
centry.Numadomain = -1
|
||||||
centry.Die = -1
|
centry.Die = -1
|
||||||
@@ -289,6 +292,8 @@ func CpuData() []CpuEntry {
|
|||||||
// Lookup NUMA domain id
|
// Lookup NUMA domain id
|
||||||
centry.Numadomain = getNumaDomain(base)
|
centry.Numadomain = getNumaDomain(base)
|
||||||
|
|
||||||
|
// Update values in output list
|
||||||
|
clist[i] = centry
|
||||||
}
|
}
|
||||||
return clist
|
return clist
|
||||||
}
|
}
|
||||||
|
@@ -8,6 +8,8 @@ The CCMetric router sits in between the collectors and the sinks and can be used
|
|||||||
{
|
{
|
||||||
"num_cache_intervals" : 1,
|
"num_cache_intervals" : 1,
|
||||||
"interval_timestamp" : true,
|
"interval_timestamp" : true,
|
||||||
|
"hostname_tag" : "hostname",
|
||||||
|
"max_forward" : 50,
|
||||||
"add_tags" : [
|
"add_tags" : [
|
||||||
{
|
{
|
||||||
"key" : "cluster",
|
"key" : "cluster",
|
||||||
@@ -55,6 +57,20 @@ The CCMetric router sits in between the collectors and the sinks and can be used
|
|||||||
```
|
```
|
||||||
|
|
||||||
There are three main options `add_tags`, `delete_tags` and `interval_timestamp`. `add_tags` and `delete_tags` are lists consisting of dicts with `key`, `value` and `if`. The `value` can be omitted in the `delete_tags` part as it only uses the `key` for removal. The `interval_timestamp` setting means that a unique timestamp is applied to all metrics traversing the router during an interval.
|
There are three main options `add_tags`, `delete_tags` and `interval_timestamp`. `add_tags` and `delete_tags` are lists consisting of dicts with `key`, `value` and `if`. The `value` can be omitted in the `delete_tags` part as it only uses the `key` for removal. The `interval_timestamp` setting means that a unique timestamp is applied to all metrics traversing the router during an interval.
|
||||||
|
|
||||||
|
# Processing order in the router
|
||||||
|
|
||||||
|
- Add the `hostname_tag` tag (if sent by collectors or cache)
|
||||||
|
- If `interval_timestamp == true`, change time of metrics
|
||||||
|
- Check if metric should be dropped (`drop_metrics` and `drop_metrics_if`)
|
||||||
|
- Add tags from `add_tags`
|
||||||
|
- Delete tags from `del_tags`
|
||||||
|
- Rename metric based on `rename_metrics` and store old name as `oldname` in meta information
|
||||||
|
- Add tags from `add_tags` (if you used the new name in the `if` condition)
|
||||||
|
- Delete tags from `del_tags` (if you used the new name in the `if` condition)
|
||||||
|
- Send to sinks
|
||||||
|
- Move to cache (if `num_cache_intervals > 0`)
|
||||||
|
|
||||||
# The `interval_timestamp` option
|
# The `interval_timestamp` option
|
||||||
|
|
||||||
The collectors' `Read()` functions are not called simultaneously and therefore the metrics gathered in an interval can have different timestamps. If you want to avoid that and have a common timestamp (the beginning of the interval), set this option to `true` and the MetricRouter sets the time.
|
The collectors' `Read()` functions are not called simultaneously and therefore the metrics gathered in an interval can have different timestamps. If you want to avoid that and have a common timestamp (the beginning of the interval), set this option to `true` and the MetricRouter sets the time.
|
||||||
@@ -65,6 +81,14 @@ If the MetricRouter should buffer metrics of intervals in a MetricCache, this op
|
|||||||
|
|
||||||
A `num_cache_intervals > 0` is required to use the `interval_aggregates` option.
|
A `num_cache_intervals > 0` is required to use the `interval_aggregates` option.
|
||||||
|
|
||||||
|
# The `hostname_tag` option
|
||||||
|
|
||||||
|
By default, the router tags metrics with the hostname for all locally created metrics. The default tag name is `hostname`, but it can be changed if your organization wants anything else
|
||||||
|
|
||||||
|
# The `max_forward` option
|
||||||
|
|
||||||
|
Every time the router receives a metric through any of the channels, it tries to directly read up to `max_forward` metrics from the same channel. This was done as the router thread would go to sleep and wake up with every arriving metric. The default are `50` metrics at once and `max_forward` needs to greater than `1`.
|
||||||
|
|
||||||
# The `rename_metrics` option
|
# The `rename_metrics` option
|
||||||
|
|
||||||
In the ClusterCockpit world we specified a set of standard metrics. Since some collectors determine the metric names based on files, execuables and libraries, they might change from system to system (or installation to installtion, OS to OS, ...). In order to get the common names, you can rename incoming metrics before sending them to the sink. If the metric name matches the `oldname`, it is changed to `newname`
|
In the ClusterCockpit world we specified a set of standard metrics. Since some collectors determine the metric names based on files, execuables and libraries, they might change from system to system (or installation to installtion, OS to OS, ...). In order to get the common names, you can rename incoming metrics before sending them to the sink. If the metric name matches the `oldname`, it is changed to `newname`
|
||||||
|
@@ -48,7 +48,6 @@ type metricRouter struct {
|
|||||||
done chan bool // channel to finish / stop metric router
|
done chan bool // channel to finish / stop metric router
|
||||||
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
|
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
|
||||||
timestamp time.Time // timestamp periodically updated by ticker each interval
|
timestamp time.Time // timestamp periodically updated by ticker each interval
|
||||||
timerdone chan bool // channel to finish / stop timestamp updater
|
|
||||||
ticker mct.MultiChanTicker // periodically ticking once each interval
|
ticker mct.MultiChanTicker // periodically ticking once each interval
|
||||||
config metricRouterConfig // json encoded config for metric router
|
config metricRouterConfig // json encoded config for metric router
|
||||||
cache MetricCache // pointer to MetricCache
|
cache MetricCache // pointer to MetricCache
|
||||||
@@ -103,7 +102,10 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
|
|||||||
cclog.ComponentError("MetricRouter", err.Error())
|
cclog.ComponentError("MetricRouter", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
r.maxForward = r.config.MaxForward
|
r.maxForward = 1
|
||||||
|
if r.config.MaxForward > r.maxForward {
|
||||||
|
r.maxForward = r.config.MaxForward
|
||||||
|
}
|
||||||
if r.config.NumCacheIntervals > 0 {
|
if r.config.NumCacheIntervals > 0 {
|
||||||
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals)
|
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, r.config.NumCacheIntervals)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -121,29 +123,6 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartTimer starts a timer which updates timestamp periodically
|
|
||||||
func (r *metricRouter) StartTimer() {
|
|
||||||
m := make(chan time.Time)
|
|
||||||
r.ticker.AddChannel(m)
|
|
||||||
r.timerdone = make(chan bool)
|
|
||||||
|
|
||||||
r.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer r.wg.Done()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-r.timerdone:
|
|
||||||
close(r.timerdone)
|
|
||||||
cclog.ComponentDebug("MetricRouter", "TIMER DONE")
|
|
||||||
return
|
|
||||||
case t := <-m:
|
|
||||||
r.timestamp = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
cclog.ComponentDebug("MetricRouter", "TIMER START")
|
|
||||||
}
|
|
||||||
|
|
||||||
func getParamMap(point lp.CCMetric) map[string]interface{} {
|
func getParamMap(point lp.CCMetric) map[string]interface{} {
|
||||||
params := make(map[string]interface{})
|
params := make(map[string]interface{})
|
||||||
params["metric"] = point
|
params["metric"] = point
|
||||||
@@ -232,8 +211,9 @@ func (r *metricRouter) dropMetric(point lp.CCMetric) bool {
|
|||||||
func (r *metricRouter) Start() {
|
func (r *metricRouter) Start() {
|
||||||
// start timer if configured
|
// start timer if configured
|
||||||
r.timestamp = time.Now()
|
r.timestamp = time.Now()
|
||||||
|
timeChan := make(chan time.Time)
|
||||||
if r.config.IntervalStamp {
|
if r.config.IntervalStamp {
|
||||||
r.StartTimer()
|
r.ticker.AddChannel(timeChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Router manager is done
|
// Router manager is done
|
||||||
@@ -313,6 +293,10 @@ func (r *metricRouter) Start() {
|
|||||||
done()
|
done()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
case timestamp := <-timeChan:
|
||||||
|
r.timestamp = timestamp
|
||||||
|
cclog.ComponentDebug("MetricRouter", "Update timestamp", r.timestamp.UnixNano())
|
||||||
|
|
||||||
case p := <-r.coll_input:
|
case p := <-r.coll_input:
|
||||||
coll_forward(p)
|
coll_forward(p)
|
||||||
for i := 0; len(r.coll_input) > 0 && i < (r.maxForward-1); i++ {
|
for i := 0; len(r.coll_input) > 0 && i < (r.maxForward-1); i++ {
|
||||||
@@ -358,14 +342,6 @@ func (r *metricRouter) Close() {
|
|||||||
// wait for close of channel r.done
|
// wait for close of channel r.done
|
||||||
<-r.done
|
<-r.done
|
||||||
|
|
||||||
// stop timer
|
|
||||||
if r.config.IntervalStamp {
|
|
||||||
cclog.ComponentDebug("MetricRouter", "TIMER CLOSE")
|
|
||||||
r.timerdone <- true
|
|
||||||
// wait for close of channel r.timerdone
|
|
||||||
<-r.timerdone
|
|
||||||
}
|
|
||||||
|
|
||||||
// stop metric cache
|
// stop metric cache
|
||||||
if r.config.NumCacheIntervals > 0 {
|
if r.config.NumCacheIntervals > 0 {
|
||||||
cclog.ComponentDebug("MetricRouter", "CACHE CLOSE")
|
cclog.ComponentDebug("MetricRouter", "CACHE CLOSE")
|
||||||
|
@@ -42,13 +42,13 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
|
|||||||
if s.buffer.Len() == 0 && s.flushDelay != 0 {
|
if s.buffer.Len() == 0 && s.flushDelay != 0 {
|
||||||
// This is the first write since the last flush, start the flushTimer!
|
// This is the first write since the last flush, start the flushTimer!
|
||||||
if s.flushTimer != nil && s.flushTimer.Stop() {
|
if s.flushTimer != nil && s.flushTimer.Stop() {
|
||||||
cclog.ComponentDebug("HttpSink", "unexpected: the flushTimer was already running?")
|
cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run a batched flush for all lines that have arrived in the last second
|
// Run a batched flush for all lines that have arrived in the last second
|
||||||
s.flushTimer = time.AfterFunc(s.flushDelay, func() {
|
s.flushTimer = time.AfterFunc(s.flushDelay, func() {
|
||||||
if err := s.Flush(); err != nil {
|
if err := s.Flush(); err != nil {
|
||||||
cclog.ComponentError("HttpSink", "flush failed:", err.Error())
|
cclog.ComponentError(s.name, "flush failed:", err.Error())
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -60,6 +60,7 @@ func (s *HttpSink) Write(m lp.CCMetric) error {
|
|||||||
s.lock.Unlock() // defer does not work here as Flush() takes the lock as well
|
s.lock.Unlock() // defer does not work here as Flush() takes the lock as well
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cclog.ComponentError(s.name, "encoding failed:", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -84,6 +85,7 @@ func (s *HttpSink) Flush() error {
|
|||||||
// Create new request to send buffer
|
// Create new request to send buffer
|
||||||
req, err := http.NewRequest(http.MethodPost, s.config.URL, s.buffer)
|
req, err := http.NewRequest(http.MethodPost, s.config.URL, s.buffer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cclog.ComponentError(s.name, "failed to create request:", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -100,12 +102,15 @@ func (s *HttpSink) Flush() error {
|
|||||||
|
|
||||||
// Handle transport/tcp errors
|
// Handle transport/tcp errors
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cclog.ComponentError(s.name, "transport/tcp error:", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle application errors
|
// Handle application errors
|
||||||
if res.StatusCode != http.StatusOK {
|
if res.StatusCode != http.StatusOK {
|
||||||
return errors.New(res.Status)
|
err = errors.New(res.Status)
|
||||||
|
cclog.ComponentError(s.name, "application error:", err.Error())
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -114,7 +119,7 @@ func (s *HttpSink) Flush() error {
|
|||||||
func (s *HttpSink) Close() {
|
func (s *HttpSink) Close() {
|
||||||
s.flushTimer.Stop()
|
s.flushTimer.Stop()
|
||||||
if err := s.Flush(); err != nil {
|
if err := s.Flush(); err != nil {
|
||||||
cclog.ComponentError("HttpSink", "flush failed:", err.Error())
|
cclog.ComponentError(s.name, "flush failed:", err.Error())
|
||||||
}
|
}
|
||||||
s.client.CloseIdleConnections()
|
s.client.CloseIdleConnections()
|
||||||
}
|
}
|
||||||
|
@@ -6,12 +6,14 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||||
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||||
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
|
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
|
||||||
|
influxdb2ApiHttp "github.com/influxdata/influxdb-client-go/v2/api/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
type InfluxAsyncSinkConfig struct {
|
type InfluxAsyncSinkConfig struct {
|
||||||
@@ -28,10 +30,12 @@ type InfluxAsyncSinkConfig struct {
|
|||||||
BatchSize uint `json:"batch_size,omitempty"`
|
BatchSize uint `json:"batch_size,omitempty"`
|
||||||
// Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms
|
// Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms
|
||||||
FlushInterval uint `json:"flush_interval,omitempty"`
|
FlushInterval uint `json:"flush_interval,omitempty"`
|
||||||
InfluxRetryInterval string `json:"retry_interval"`
|
InfluxRetryInterval string `json:"retry_interval,omitempty"`
|
||||||
InfluxExponentialBase uint `json:"retry_exponential_base"`
|
InfluxExponentialBase uint `json:"retry_exponential_base,omitempty"`
|
||||||
InfluxMaxRetries uint `json:"max_retries"`
|
InfluxMaxRetries uint `json:"max_retries,omitempty"`
|
||||||
InfluxMaxRetryTime string `json:"max_retry_time"`
|
InfluxMaxRetryTime string `json:"max_retry_time,omitempty"`
|
||||||
|
CustomFlushInterval string `json:"custom_flush_interval,omitempty"`
|
||||||
|
MaxRetryAttempts uint `json:"max_retry_attempts,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type InfluxAsyncSink struct {
|
type InfluxAsyncSink struct {
|
||||||
@@ -42,6 +46,8 @@ type InfluxAsyncSink struct {
|
|||||||
config InfluxAsyncSinkConfig
|
config InfluxAsyncSinkConfig
|
||||||
influxRetryInterval uint
|
influxRetryInterval uint
|
||||||
influxMaxRetryTime uint
|
influxMaxRetryTime uint
|
||||||
|
customFlushInterval time.Duration
|
||||||
|
flushTimer *time.Timer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *InfluxAsyncSink) connect() error {
|
func (s *InfluxAsyncSink) connect() error {
|
||||||
@@ -60,20 +66,34 @@ func (s *InfluxAsyncSink) connect() error {
|
|||||||
cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database)
|
cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database)
|
||||||
clientOptions := influxdb2.DefaultOptions()
|
clientOptions := influxdb2.DefaultOptions()
|
||||||
if s.config.BatchSize != 0 {
|
if s.config.BatchSize != 0 {
|
||||||
|
cclog.ComponentDebug(s.name, "Batch size", s.config.BatchSize)
|
||||||
clientOptions.SetBatchSize(s.config.BatchSize)
|
clientOptions.SetBatchSize(s.config.BatchSize)
|
||||||
}
|
}
|
||||||
if s.config.FlushInterval != 0 {
|
if s.config.FlushInterval != 0 {
|
||||||
|
cclog.ComponentDebug(s.name, "Flush interval", s.config.FlushInterval)
|
||||||
clientOptions.SetFlushInterval(s.config.FlushInterval)
|
clientOptions.SetFlushInterval(s.config.FlushInterval)
|
||||||
}
|
}
|
||||||
|
if s.influxRetryInterval != 0 {
|
||||||
|
cclog.ComponentDebug(s.name, "MaxRetryInterval", s.influxRetryInterval)
|
||||||
|
clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
|
||||||
|
}
|
||||||
|
if s.influxMaxRetryTime != 0 {
|
||||||
|
cclog.ComponentDebug(s.name, "MaxRetryTime", s.influxMaxRetryTime)
|
||||||
|
clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
|
||||||
|
}
|
||||||
|
if s.config.InfluxExponentialBase != 0 {
|
||||||
|
cclog.ComponentDebug(s.name, "Exponential Base", s.config.InfluxExponentialBase)
|
||||||
|
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
|
||||||
|
}
|
||||||
|
if s.config.InfluxMaxRetries != 0 {
|
||||||
|
cclog.ComponentDebug(s.name, "Max Retries", s.config.InfluxMaxRetries)
|
||||||
|
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
|
||||||
|
}
|
||||||
clientOptions.SetTLSConfig(
|
clientOptions.SetTLSConfig(
|
||||||
&tls.Config{
|
&tls.Config{
|
||||||
InsecureSkipVerify: true,
|
InsecureSkipVerify: true,
|
||||||
},
|
},
|
||||||
)
|
).SetPrecision(time.Second)
|
||||||
clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
|
|
||||||
clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
|
|
||||||
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
|
|
||||||
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
|
|
||||||
|
|
||||||
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
|
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
|
||||||
s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database)
|
s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database)
|
||||||
@@ -84,10 +104,23 @@ func (s *InfluxAsyncSink) connect() error {
|
|||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("connection to %s not healthy", uri)
|
return fmt.Errorf("connection to %s not healthy", uri)
|
||||||
}
|
}
|
||||||
|
s.writeApi.SetWriteFailedCallback(func(batch string, err influxdb2ApiHttp.Error, retryAttempts uint) bool {
|
||||||
|
mlist := strings.Split(batch, "\n")
|
||||||
|
cclog.ComponentError(s.name, fmt.Sprintf("Failed to write batch with %d metrics %d times (max: %d): %s", len(mlist), retryAttempts, s.config.MaxRetryAttempts, err.Error()))
|
||||||
|
return retryAttempts <= s.config.MaxRetryAttempts
|
||||||
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
|
func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
|
||||||
|
if s.customFlushInterval != 0 && s.flushTimer == nil {
|
||||||
|
// Run a batched flush for all lines that have arrived in the defined interval
|
||||||
|
s.flushTimer = time.AfterFunc(s.customFlushInterval, func() {
|
||||||
|
if err := s.Flush(); err != nil {
|
||||||
|
cclog.ComponentError(s.name, "flush failed:", err.Error())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
s.writeApi.WritePoint(
|
s.writeApi.WritePoint(
|
||||||
m.ToPoint(s.meta_as_tags),
|
m.ToPoint(s.meta_as_tags),
|
||||||
)
|
)
|
||||||
@@ -95,7 +128,11 @@ func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *InfluxAsyncSink) Flush() error {
|
func (s *InfluxAsyncSink) Flush() error {
|
||||||
|
cclog.ComponentDebug(s.name, "Flushing")
|
||||||
s.writeApi.Flush()
|
s.writeApi.Flush()
|
||||||
|
if s.customFlushInterval != 0 && s.flushTimer != nil {
|
||||||
|
s.flushTimer = nil
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -110,13 +147,17 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
s.name = fmt.Sprintf("InfluxSink(%s)", name)
|
s.name = fmt.Sprintf("InfluxSink(%s)", name)
|
||||||
|
|
||||||
// Set default for maximum number of points sent to server in single request.
|
// Set default for maximum number of points sent to server in single request.
|
||||||
s.config.BatchSize = 100
|
s.config.BatchSize = 0
|
||||||
s.influxRetryInterval = uint(time.Duration(1) * time.Second)
|
s.influxRetryInterval = 0
|
||||||
s.config.InfluxRetryInterval = "1s"
|
//s.config.InfluxRetryInterval = "1s"
|
||||||
s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour)
|
s.influxMaxRetryTime = 0
|
||||||
s.config.InfluxMaxRetryTime = "168h"
|
//s.config.InfluxMaxRetryTime = "168h"
|
||||||
s.config.InfluxMaxRetries = 20
|
s.config.InfluxMaxRetries = 0
|
||||||
s.config.InfluxExponentialBase = 2
|
s.config.InfluxExponentialBase = 0
|
||||||
|
s.config.FlushInterval = 0
|
||||||
|
s.config.CustomFlushInterval = ""
|
||||||
|
s.customFlushInterval = time.Duration(0)
|
||||||
|
s.config.MaxRetryAttempts = 1
|
||||||
|
|
||||||
// Default retry intervals (in seconds)
|
// Default retry intervals (in seconds)
|
||||||
// 1 2
|
// 1 2
|
||||||
@@ -168,6 +209,15 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
|
s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
|
||||||
s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
|
s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
|
||||||
|
|
||||||
|
// Use a own timer for calling Flush()
|
||||||
|
if len(s.config.CustomFlushInterval) > 0 {
|
||||||
|
t, err := time.ParseDuration(s.config.CustomFlushInterval)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid duration in 'custom_flush_interval': %v", err)
|
||||||
|
}
|
||||||
|
s.customFlushInterval = t
|
||||||
|
}
|
||||||
|
|
||||||
// Connect to InfluxDB server
|
// Connect to InfluxDB server
|
||||||
if err := s.connect(); err != nil {
|
if err := s.connect(); err != nil {
|
||||||
return nil, fmt.Errorf("unable to connect: %v", err)
|
return nil, fmt.Errorf("unable to connect: %v", err)
|
||||||
|
@@ -6,28 +6,32 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||||
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||||
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
|
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
|
||||||
|
"github.com/influxdata/influxdb-client-go/v2/api/write"
|
||||||
)
|
)
|
||||||
|
|
||||||
type InfluxSinkConfig struct {
|
type InfluxSinkConfig struct {
|
||||||
defaultSinkConfig
|
defaultSinkConfig
|
||||||
Host string `json:"host,omitempty"`
|
Host string `json:"host,omitempty"`
|
||||||
Port string `json:"port,omitempty"`
|
Port string `json:"port,omitempty"`
|
||||||
Database string `json:"database,omitempty"`
|
Database string `json:"database,omitempty"`
|
||||||
User string `json:"user,omitempty"`
|
User string `json:"user,omitempty"`
|
||||||
Password string `json:"password,omitempty"`
|
Password string `json:"password,omitempty"`
|
||||||
Organization string `json:"organization,omitempty"`
|
Organization string `json:"organization,omitempty"`
|
||||||
SSL bool `json:"ssl,omitempty"`
|
SSL bool `json:"ssl,omitempty"`
|
||||||
RetentionPol string `json:"retention_policy,omitempty"`
|
FlushDelay string `json:"flush_delay,omitempty"`
|
||||||
InfluxRetryInterval string `json:"retry_interval"`
|
BatchSize int `json:"batch_size,omitempty"`
|
||||||
InfluxExponentialBase uint `json:"retry_exponential_base"`
|
RetentionPol string `json:"retention_policy,omitempty"`
|
||||||
InfluxMaxRetries uint `json:"max_retries"`
|
// InfluxRetryInterval string `json:"retry_interval"`
|
||||||
InfluxMaxRetryTime string `json:"max_retry_time"`
|
// InfluxExponentialBase uint `json:"retry_exponential_base"`
|
||||||
|
// InfluxMaxRetries uint `json:"max_retries"`
|
||||||
|
// InfluxMaxRetryTime string `json:"max_retry_time"`
|
||||||
//InfluxMaxRetryDelay string `json:"max_retry_delay"` // It is mentioned in the docs but there is no way to set it
|
//InfluxMaxRetryDelay string `json:"max_retry_delay"` // It is mentioned in the docs but there is no way to set it
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -38,6 +42,10 @@ type InfluxSink struct {
|
|||||||
config InfluxSinkConfig
|
config InfluxSinkConfig
|
||||||
influxRetryInterval uint
|
influxRetryInterval uint
|
||||||
influxMaxRetryTime uint
|
influxMaxRetryTime uint
|
||||||
|
batch []*write.Point
|
||||||
|
flushTimer *time.Timer
|
||||||
|
flushDelay time.Duration
|
||||||
|
lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer
|
||||||
//influxMaxRetryDelay uint
|
//influxMaxRetryDelay uint
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -56,16 +64,31 @@ func (s *InfluxSink) connect() error {
|
|||||||
}
|
}
|
||||||
cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database)
|
cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database)
|
||||||
clientOptions := influxdb2.DefaultOptions()
|
clientOptions := influxdb2.DefaultOptions()
|
||||||
|
|
||||||
|
// if s.influxRetryInterval != 0 {
|
||||||
|
// cclog.ComponentDebug(s.name, "MaxRetryInterval", s.influxRetryInterval)
|
||||||
|
// clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
|
||||||
|
// }
|
||||||
|
// if s.influxMaxRetryTime != 0 {
|
||||||
|
// cclog.ComponentDebug(s.name, "MaxRetryTime", s.influxMaxRetryTime)
|
||||||
|
// clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
|
||||||
|
// }
|
||||||
|
// if s.config.InfluxExponentialBase != 0 {
|
||||||
|
// cclog.ComponentDebug(s.name, "Exponential Base", s.config.InfluxExponentialBase)
|
||||||
|
// clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
|
||||||
|
// }
|
||||||
|
// if s.config.InfluxMaxRetries != 0 {
|
||||||
|
// cclog.ComponentDebug(s.name, "Max Retries", s.config.InfluxMaxRetries)
|
||||||
|
// clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
|
||||||
|
// }
|
||||||
|
|
||||||
clientOptions.SetTLSConfig(
|
clientOptions.SetTLSConfig(
|
||||||
&tls.Config{
|
&tls.Config{
|
||||||
InsecureSkipVerify: true,
|
InsecureSkipVerify: true,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
|
clientOptions.SetPrecision(time.Second)
|
||||||
clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
|
|
||||||
clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
|
|
||||||
clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
|
|
||||||
|
|
||||||
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
|
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
|
||||||
s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database)
|
s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database)
|
||||||
@@ -80,38 +103,76 @@ func (s *InfluxSink) connect() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *InfluxSink) Write(m lp.CCMetric) error {
|
func (s *InfluxSink) Write(m lp.CCMetric) error {
|
||||||
err :=
|
// err :=
|
||||||
s.writeApi.WritePoint(
|
// s.writeApi.WritePoint(
|
||||||
context.Background(),
|
// context.Background(),
|
||||||
m.ToPoint(s.meta_as_tags),
|
// m.ToPoint(s.meta_as_tags),
|
||||||
)
|
// )
|
||||||
return err
|
if len(s.batch) == 0 && s.flushDelay != 0 {
|
||||||
|
// This is the first write since the last flush, start the flushTimer!
|
||||||
|
if s.flushTimer != nil && s.flushTimer.Stop() {
|
||||||
|
cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run a batched flush for all lines that have arrived in the last second
|
||||||
|
s.flushTimer = time.AfterFunc(s.flushDelay, func() {
|
||||||
|
if err := s.Flush(); err != nil {
|
||||||
|
cclog.ComponentError(s.name, "flush failed:", err.Error())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
p := m.ToPoint(s.meta_as_tags)
|
||||||
|
s.lock.Lock()
|
||||||
|
s.batch = append(s.batch, p)
|
||||||
|
s.lock.Unlock()
|
||||||
|
|
||||||
|
// Flush synchronously if "flush_delay" is zero
|
||||||
|
if s.flushDelay == 0 {
|
||||||
|
return s.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *InfluxSink) Flush() error {
|
func (s *InfluxSink) Flush() error {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
if len(s.batch) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
err := s.writeApi.WritePoint(context.Background(), s.batch...)
|
||||||
|
if err != nil {
|
||||||
|
cclog.ComponentError(s.name, "flush failed:", err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.batch = s.batch[:0]
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *InfluxSink) Close() {
|
func (s *InfluxSink) Close() {
|
||||||
cclog.ComponentDebug(s.name, "Closing InfluxDB connection")
|
cclog.ComponentDebug(s.name, "Closing InfluxDB connection")
|
||||||
|
s.flushTimer.Stop()
|
||||||
|
s.Flush()
|
||||||
s.client.Close()
|
s.client.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
|
func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
|
||||||
s := new(InfluxSink)
|
s := new(InfluxSink)
|
||||||
s.name = fmt.Sprintf("InfluxSink(%s)", name)
|
s.name = fmt.Sprintf("InfluxSink(%s)", name)
|
||||||
|
s.config.BatchSize = 100
|
||||||
|
s.config.FlushDelay = "1s"
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err := json.Unmarshal(config, &s.config)
|
err := json.Unmarshal(config, &s.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.influxRetryInterval = uint(time.Duration(1) * time.Second)
|
s.influxRetryInterval = 0
|
||||||
s.config.InfluxRetryInterval = "1s"
|
s.influxMaxRetryTime = 0
|
||||||
s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour)
|
// s.config.InfluxRetryInterval = ""
|
||||||
s.config.InfluxMaxRetryTime = "168h"
|
// s.config.InfluxMaxRetryTime = ""
|
||||||
s.config.InfluxMaxRetries = 20
|
// s.config.InfluxMaxRetries = 0
|
||||||
s.config.InfluxExponentialBase = 2
|
// s.config.InfluxExponentialBase = 0
|
||||||
|
|
||||||
if len(s.config.Host) == 0 ||
|
if len(s.config.Host) == 0 ||
|
||||||
len(s.config.Port) == 0 ||
|
len(s.config.Port) == 0 ||
|
||||||
@@ -126,15 +187,25 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
s.meta_as_tags[k] = true
|
s.meta_as_tags[k] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
toUint := func(duration string, def uint) uint {
|
// toUint := func(duration string, def uint) uint {
|
||||||
t, err := time.ParseDuration(duration)
|
// if len(duration) > 0 {
|
||||||
|
// t, err := time.ParseDuration(duration)
|
||||||
|
// if err == nil {
|
||||||
|
// return uint(t.Milliseconds())
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// return def
|
||||||
|
// }
|
||||||
|
// s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
|
||||||
|
// s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
|
||||||
|
|
||||||
|
if len(s.config.FlushDelay) > 0 {
|
||||||
|
t, err := time.ParseDuration(s.config.FlushDelay)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return uint(t.Milliseconds())
|
s.flushDelay = t
|
||||||
}
|
}
|
||||||
return def
|
|
||||||
}
|
}
|
||||||
s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
|
s.batch = make([]*write.Point, 0, s.config.BatchSize)
|
||||||
s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
|
|
||||||
|
|
||||||
// Connect to InfluxDB server
|
// Connect to InfluxDB server
|
||||||
if err := s.connect(); err != nil {
|
if err := s.connect(); err != nil {
|
||||||
|
@@ -17,10 +17,8 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de
|
|||||||
"password" : "examplepw",
|
"password" : "examplepw",
|
||||||
"organization": "myorg",
|
"organization": "myorg",
|
||||||
"ssl": true,
|
"ssl": true,
|
||||||
"retry_interval" : "1s",
|
"flush_delay" : "1s",
|
||||||
"retry_exponential_base" : 2,
|
"batch_size" : 100
|
||||||
"max_retries": 20,
|
|
||||||
"max_retry_time" : "168h"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
@@ -34,9 +32,6 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de
|
|||||||
- `password`: Password for basic authentification
|
- `password`: Password for basic authentification
|
||||||
- `organization`: Organization in the InfluxDB
|
- `organization`: Organization in the InfluxDB
|
||||||
- `ssl`: Use SSL connection
|
- `ssl`: Use SSL connection
|
||||||
- `retry_interval`: Base retry interval for failed write requests, default 1s
|
- `flush_delay`: Group metrics coming in to a single batch
|
||||||
- `retry_exponential_base`: The retry interval is exponentially increased with this base, default 2
|
- `batch_size`: Maximal batch size
|
||||||
- `max_retries`: Maximal number of retry attempts
|
|
||||||
- `max_retry_time`: Maximal time to retry failed writes, default 168h (one week)
|
|
||||||
|
|
||||||
For information about the calculation of the retry interval settings, see [offical influxdb-client-go documentation](https://github.com/influxdata/influxdb-client-go#handling-of-failed-async-writes)
|
|
Reference in New Issue
Block a user