diff --git a/collectors/cpustatMetric.go b/collectors/cpustatMetric.go index 7593e8f..de8323d 100644 --- a/collectors/cpustatMetric.go +++ b/collectors/cpustatMetric.go @@ -7,24 +7,40 @@ import ( "strconv" "strings" "time" + "encoding/json" ) const CPUSTATFILE = `/proc/stat` -type CpustatCollector struct { - MetricCollector +type CpustatCollectorConfig struct { + ExcludeMetrics []string `json:"exclude_metrics, omitempty"` } -func (m *CpustatCollector) Init() error { +type CpustatCollector struct { + MetricCollector + config CpustatCollectorConfig +} + +func (m *CpustatCollector) Init(config []byte) error { m.name = "CpustatCollector" m.setup() + if len(config) > 0 { + err := json.Unmarshal(config, &m.config) + if err != nil { + return err + } + } m.init = true return nil } -func ParseStatLine(line string, cpu int, out *[]lp.MutableMetric) { + +func ParseStatLine(line string, cpu int, exclude []string, out *[]lp.MutableMetric) { ls := strings.Fields(line) matches := []string{"", "cpu_user", "cpu_nice", "cpu_system", "cpu_idle", "cpu_iowait", "cpu_irq", "cpu_softirq", "cpu_steal", "cpu_guest", "cpu_guest_nice"} + for _, ex := range exclude { + matches, _ = RemoveFromStringList(matches, ex) + } var tags map[string]string if cpu < 0 { @@ -46,6 +62,9 @@ func ParseStatLine(line string, cpu int, out *[]lp.MutableMetric) { } func (m *CpustatCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { + if (!m.init) { + return + } buffer, err := ioutil.ReadFile(string(CPUSTATFILE)) if err != nil { @@ -59,11 +78,11 @@ func (m *CpustatCollector) Read(interval time.Duration, out *[]lp.MutableMetric) } ls := strings.Fields(line) if strings.Compare(ls[0], "cpu") == 0 { - ParseStatLine(line, -1, out) + ParseStatLine(line, -1, m.config.ExcludeMetrics, out) } else if strings.HasPrefix(ls[0], "cpu") { cpustr := strings.TrimLeft(ls[0], "cpu") cpu, _ := strconv.Atoi(cpustr) - ParseStatLine(line, cpu, out) + ParseStatLine(line, cpu, m.config.ExcludeMetrics, out) } } } diff --git a/collectors/customCmdMetric.go b/collectors/customCmdMetric.go index fbb2d87..3c0a433 100644 --- a/collectors/customCmdMetric.go +++ b/collectors/customCmdMetric.go @@ -1,25 +1,65 @@ package collectors import ( - "fmt" + "errors" lp "github.com/influxdata/line-protocol" "io/ioutil" "log" "os/exec" "time" + "encoding/json" + "strings" ) const CUSTOMCMDPATH = `/home/unrz139/Work/cc-metric-collector/collectors/custom` +type CustomCmdCollectorConfig struct { + commands []string `json:"commands"` + files []string `json:"files"` + ExcludeMetrics []string `json:"exclude_metrics"` +} + type CustomCmdCollector struct { MetricCollector handler *lp.MetricHandler parser *lp.Parser + config CustomCmdCollectorConfig + commands []string + files []string } -func (m *CustomCmdCollector) Init() error { +func (m *CustomCmdCollector) Init(config []byte) error { + var err error m.name = "CustomCmdCollector" + if len(config) > 0 { + err = json.Unmarshal(config, &m.config) + if err != nil { + log.Print(err.Error()) + return err + } + } m.setup() + for _, c := range m.config.commands { + cmdfields := strings.Fields(c) + command := exec.Command(cmdfields[0], strings.Join(cmdfields[1:], " ")) + command.Wait() + _, err = command.Output() + if err != nil { + m.commands = append(m.commands, c) + } + } + for _, f := range m.config.files { + _, err = ioutil.ReadFile(f) + if err == nil { + m.files = append(m.files, f) + } else { + log.Print(err.Error()) + continue + } + } + if len(m.files) == 0 && len(m.commands) == 0 { + return errors.New("No metrics to collect") + } m.handler = lp.NewMetricHandler() m.parser = lp.NewParser(m.handler) m.parser.SetTimeFunc(DefaultTime) @@ -32,73 +72,55 @@ var DefaultTime = func() time.Time { } func (m *CustomCmdCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { - files, err := ioutil.ReadDir(string(CUSTOMCMDPATH)) - if err != nil { - log.Print(err) - return - } - for _, file := range files { - // stat, err := os.Stat(file) - // if err != nil { - // log.Print(err) - // continue - // } - // mode := stat.Mode() - // if mode & 0o555 { - path := fmt.Sprintf("%s/%s", string(CUSTOMCMDPATH), file.Name()) - command := exec.Command(path, "") + if !m.init { + return + } + for _, cmd := range m.commands { + cmdfields := strings.Fields(cmd) + command := exec.Command(cmdfields[0], strings.Join(cmdfields[1:], " ")) command.Wait() stdout, err := command.Output() if err != nil { log.Print(err) continue } - metrics, err := m.parser.Parse(stdout) + cmdmetrics, err := m.parser.Parse(stdout) if err != nil { log.Print(err) continue } - for _, m := range metrics { - y, err := lp.New(m.Name(), Tags2Map(m), Fields2Map(m), m.Time()) - if err == nil { - *out = append(*out, y) - } - // switch m.Name() { - // case "node": - // for k, v := range m.FieldList() { - // m.node[k] = float64(v) - // } - // case "socket": - // tlist := m.TagList() - // if id, found := tlist["socket"]; found { - // for k, v := range m.FieldList() { - // m.socket[id][k] = float64(v) - // } - // } - // case "cpu": - // tlist := m.TagList() - // if id, found := tlist["cpu"]; found { - // for k, v := range m.FieldList() { - // m.cpu[id][k] = float64(v) - // } - // } - // case "network": - // tlist := m.TagList() - // if id, found := tlist["device"]; found { - // for k, v := range m.FieldList() { - // m.network[id][k] = float64(v) - // } - // } - // case "accelerator": - // tlist := m.TagList() - // if id, found := tlist["device"]; found { - // for k, v := range m.FieldList() { - // m.accelerator[id][k] = float64(v) - // } - // } - // } + for _, c := range cmdmetrics { + _, skip := stringArrayContains(m.config.ExcludeMetrics, c.Name()) + if skip { + continue + } + y, err := lp.New(c.Name(), Tags2Map(c), Fields2Map(c), c.Time()) + if err == nil { + *out = append(*out, y) + } + } + } + for _, file := range m.files { + buffer, err := ioutil.ReadFile(file) + if err != nil { + log.Print(err) + return + } + fmetrics, err := m.parser.Parse(buffer) + if err != nil { + log.Print(err) + continue } - // } if file is executable check + for _, f := range fmetrics { + _, skip := stringArrayContains(m.config.ExcludeMetrics, f.Name()) + if skip { + continue + } + y, err := lp.New(f.Name(), Tags2Map(f), Fields2Map(f), f.Time()) + if err == nil { + *out = append(*out, y) + } + } } } diff --git a/collectors/diskstatMetric.go b/collectors/diskstatMetric.go index fbce9e1..812e0b6 100644 --- a/collectors/diskstatMetric.go +++ b/collectors/diskstatMetric.go @@ -1,28 +1,42 @@ package collectors import ( - // "errors" - // "fmt" lp "github.com/influxdata/line-protocol" "io/ioutil" - "log" +// "log" "strconv" "strings" "time" + "encoding/json" + "errors" ) const DISKSTATFILE = `/proc/diskstats` +const DISKSTAT_SYSFSPATH = `/sys/block` + +type DiskstatCollectorConfig struct { + ExcludeMetrics []string `json:"exclude_metrics, omitempty"` +} type DiskstatCollector struct { MetricCollector matches map[int]string + config DiskstatCollectorConfig } -func (m *DiskstatCollector) Init() error { + +func (m *DiskstatCollector) Init(config []byte) error { + var err error m.name = "DiskstatCollector" m.setup() + if len(config) > 0 { + err = json.Unmarshal(config, &m.config) + if err != nil { + return err + } + } // https://www.kernel.org/doc/html/latest/admin-guide/iostats.html - m.matches = map[int]string{ + matches := map[int]string{ 3: "reads", 4: "reads_merged", 5: "read_sectors", @@ -41,46 +55,60 @@ func (m *DiskstatCollector) Init() error { 18: "flushes", 19: "flushes_ms", } - _, err := ioutil.ReadFile(string(DISKSTATFILE)) - if err == nil { - m.init = true + m.matches = make(map[int]string) + for k, v := range matches { + _, skip := stringArrayContains(m.config.ExcludeMetrics, v) + if (!skip) { + m.matches[k] = v + } } + if len(m.matches) == 0 { + return errors.New("No metrics to collect") + } + _, err = ioutil.ReadFile(string(DISKSTATFILE)) + if err == nil { + m.init = true + } return err } + func (m *DiskstatCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { + var lines []string + if !m.init { + return + } - buffer, err := ioutil.ReadFile(string(DISKSTATFILE)) + buffer, err := ioutil.ReadFile(string(DISKSTATFILE)) + if err != nil { + return + } + lines = strings.Split(string(buffer), "\n") - if err != nil { - log.Print(err) - return - } - - ll := strings.Split(string(buffer), "\n") - - for _, line := range ll { - if len(line) == 0 { - continue - } - f := strings.Fields(line) - if strings.Contains(f[2], "loop") { - continue - } - tags := map[string]string{ - "device": f[2], - "type": "node", - } - for idx, name := range m.matches { - x, err := strconv.ParseInt(f[idx], 0, 64) - if err == nil { - y, err := lp.New(name, tags, map[string]interface{}{"value": int(x)}, time.Now()) - if err == nil { - *out = append(*out, y) - } - } - } - } + for _, line := range lines { + if len(line) == 0 { + continue + } + f := strings.Fields(line) + if strings.Contains(f[2], "loop") { + continue + } + tags := map[string]string{ + "device": f[2], + "type": "node", + } + for idx, name := range m.matches { + if idx < len(f) { + x, err := strconv.ParseInt(f[idx], 0, 64) + if err == nil { + y, err := lp.New(name, tags, map[string]interface{}{"value": int(x)}, time.Now()) + if err == nil { + *out = append(*out, y) + } + } + } + } + } return } diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index 8bd6fd3..ebd3713 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -6,59 +6,104 @@ import ( "io/ioutil" "log" "os/exec" +// "os" + "path/filepath" "strconv" "strings" "time" + "encoding/json" + "errors" ) +const BASEPATH = `/sys/class/infiniband/` const LIDFILE = `/sys/class/infiniband/mlx4_0/ports/1/lid` const PERFQUERY = `/usr/sbin/perfquery` +type InfinibandCollectorConfig struct { + ExcludeDevices []string `json:"exclude_devices, omitempty"` +} + type InfinibandCollector struct { MetricCollector tags map[string]string + lids map[string]map[string]string + config NetstatCollectorConfig + use_perfquery bool } -func (m *InfinibandCollector) Init() error { +func (m *InfinibandCollector) Init(config []byte) error { + var err error m.name = "InfinibandCollector" + m.use_perfquery = false m.setup() m.tags = map[string]string{"type": "node"} - _, err := ioutil.ReadFile(string(LIDFILE)) - if err == nil { - _, err = ioutil.ReadFile(string(PERFQUERY)) - if err == nil { - m.init = true - } + if len(config) > 0 { + err = json.Unmarshal(config, &m.config) + if err != nil { + return err + } } + m.lids = make(map[string]map[string]string) + p := fmt.Sprintf("%s/*/ports/*/lid", string(BASEPATH)) + files, err := filepath.Glob(p) + for _, f := range(files) { + lid, err := ioutil.ReadFile(f) + if err == nil { + plist := strings.Split(strings.Replace(f, string(BASEPATH), "", -1), "/") + skip := false + for _, d := range m.config.ExcludeDevices { + if d == plist[0] { + skip = true + } + } + if !skip { + m.lids[plist[0]] = make(map[string]string) + m.lids[plist[0]][plist[2]] = string(lid) + } + } + } + + for _, ports := range m.lids { + for port, lid := range ports { + args := fmt.Sprintf("-r %s %s 0xf000", lid, port) + command := exec.Command(PERFQUERY, args) + command.Wait() + _, err := command.Output() + if (err == nil) { + m.use_perfquery = true + } + break + } + break + } + + if len(m.lids) > 0 { + m.init = true + } else { + err = errors.New("No usable devices") + } + return err } -func (m *InfinibandCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { - buffer, err := ioutil.ReadFile(string(LIDFILE)) +func DoPerfQuery(dev string, lid string, port string, tags map[string]string, out *[]lp.MutableMetric) error { - if err != nil { - log.Print(err) - return - } - - args := fmt.Sprintf("-r %s 1 0xf000", string(buffer)) - - command := exec.Command(PERFQUERY, args) - command.Wait() - stdout, err := command.Output() - if err != nil { - log.Print(err) - return - } - - ll := strings.Split(string(stdout), "\n") + args := fmt.Sprintf("-r %s %s 0xf000", lid, port) + command := exec.Command(PERFQUERY, args) + command.Wait() + stdout, err := command.Output() + if err != nil { + log.Print(err) + return err + } + ll := strings.Split(string(stdout), "\n") for _, line := range ll { if strings.HasPrefix(line, "PortRcvData") || strings.HasPrefix(line, "RcvData") { lv := strings.Fields(line) v, err := strconv.ParseFloat(lv[1], 64) if err == nil { - y, err := lp.New("ib_recv", m.tags, map[string]interface{}{"value": float64(v)}, time.Now()) + y, err := lp.New("ib_recv", tags, map[string]interface{}{"value": float64(v)}, time.Now()) if err == nil { *out = append(*out, y) } @@ -68,13 +113,100 @@ func (m *InfinibandCollector) Read(interval time.Duration, out *[]lp.MutableMetr lv := strings.Fields(line) v, err := strconv.ParseFloat(lv[1], 64) if err == nil { - y, err := lp.New("ib_xmit", m.tags, map[string]interface{}{"value": float64(v)}, time.Now()) + y, err := lp.New("ib_xmit", tags, map[string]interface{}{"value": float64(v)}, time.Now()) if err == nil { *out = append(*out, y) } } } } + return nil +} + +func DoSysfsRead(dev string, lid string, port string, tags map[string]string, out *[]lp.MutableMetric) error { + path := fmt.Sprintf("%s/%s/ports/%s/counters/", string(BASEPATH), dev, port) + buffer, err := ioutil.ReadFile(fmt.Sprintf("%s/port_rcv_data", path)) + if err == nil { + data := strings.Replace(string(buffer), "\n", "", -1) + v, err := strconv.ParseFloat(data, 64) + if err == nil { + y, err := lp.New("ib_recv", tags, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + *out = append(*out, y) + } + } + } + buffer, err = ioutil.ReadFile(fmt.Sprintf("%s/port_xmit_data", path)) + if err == nil { + data := strings.Replace(string(buffer), "\n", "", -1) + v, err := strconv.ParseFloat(data, 64) + if err == nil { + y, err := lp.New("ib_xmit", tags, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + *out = append(*out, y) + } + } + } + return nil +} + +func (m *InfinibandCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { + + if m.init { + for dev, ports := range m.lids { + for port, lid := range ports { + tags := map[string]string{"type" : "node", "device" : dev, "port" : port} + if m.use_perfquery { + DoPerfQuery(dev, lid, port, tags, out) + } else { + DoSysfsRead(dev, lid, port, tags, out) + } + } + } + } + + +// buffer, err := ioutil.ReadFile(string(LIDFILE)) + +// if err != nil { +// log.Print(err) +// return +// } + +// args := fmt.Sprintf("-r %s 1 0xf000", string(buffer)) + +// command := exec.Command(PERFQUERY, args) +// command.Wait() +// stdout, err := command.Output() +// if err != nil { +// log.Print(err) +// return +// } + +// ll := strings.Split(string(stdout), "\n") + +// for _, line := range ll { +// if strings.HasPrefix(line, "PortRcvData") || strings.HasPrefix(line, "RcvData") { +// lv := strings.Fields(line) +// v, err := strconv.ParseFloat(lv[1], 64) +// if err == nil { +// y, err := lp.New("ib_recv", m.tags, map[string]interface{}{"value": float64(v)}, time.Now()) +// if err == nil { +// *out = append(*out, y) +// } +// } +// } +// if strings.HasPrefix(line, "PortXmitData") || strings.HasPrefix(line, "XmtData") { +// lv := strings.Fields(line) +// v, err := strconv.ParseFloat(lv[1], 64) +// if err == nil { +// y, err := lp.New("ib_xmit", m.tags, map[string]interface{}{"value": float64(v)}, time.Now()) +// if err == nil { +// *out = append(*out, y) +// } +// } +// } +// } } func (m *InfinibandCollector) Close() { diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index 3af50c8..a783876 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -15,15 +15,44 @@ import ( "log" "strings" "time" + "os" "unsafe" + "math" + "encoding/json" + "gopkg.in/Knetic/govaluate.v2" + "io/ioutil" + "strconv" ) +type LikwidCollectorMetricConfig struct { + Name string `json:"name"` + Calc string `json:"calc"` + Socket_scope bool `json:"socket_scope"` + Publish bool `json:"publish"` +} + +type LikwidCollectorEventsetConfig struct { + Events map[string]string `json:"events"` + Metrics []LikwidCollectorMetricConfig `json:"metrics"` +} + +type LikwidCollectorConfig struct { + Eventsets []LikwidCollectorEventsetConfig `json:"eventsets"` + Metrics []LikwidCollectorMetricConfig `json:"globalmetrics"` + ExcludeMetrics []string `json:"exclude_metrics"` +} + type LikwidCollector struct { MetricCollector cpulist []C.int sock2tid map[int]int metrics map[C.int]map[string]int - groups map[string]C.int + groups []C.int + config LikwidCollectorConfig + results map[int]map[int]map[string]interface{} + mresults map[int]map[int]map[string]float64 + gmresults map[int]map[string]float64 + basefreq float64 } type LikwidMetric struct { @@ -33,7 +62,7 @@ type LikwidMetric struct { group_idx int } -const GROUPPATH = `/home/unrz139/Work/cc-metric-collector/collectors/likwid/groups` +const GROUPPATH = `/apps/likwid/5.2.0/share/likwid/perfgroups` var likwid_metrics = map[string][]LikwidMetric{ "MEM_DP": {LikwidMetric{name: "mem_bw", search: "Memory bandwidth [MBytes/s]", socket_scope: true}, @@ -57,6 +86,33 @@ func getMetricId(group C.int, search string) (int, error) { return -1, errors.New(fmt.Sprintf("Cannot find metric for search string '%s' in group %d", search, int(group))) } +func eventsToEventStr(events map[string]string) string { + elist := make([]string, 0) + for k, v := range events { + elist = append(elist, fmt.Sprintf("%s:%s", v, k)) + } + return strings.Join(elist, ",") +} + +func getBaseFreq() float64 { + var freq float64 = math.NaN() + C.power_init(0) + info := C.get_powerInfo() + if float64(info.baseFrequency) != 0 { + freq = float64(info.baseFrequency) + } else { + buffer, err := ioutil.ReadFile("/sys/devices/system/cpu/cpu0/cpufreq/bios_limit") + if err == nil { + data := strings.Replace(string(buffer), "\n", "", -1) + x, err := strconv.ParseInt(data, 0, 64) + if err == nil { + freq = float64(x)*1E3 + } + } + } + return freq +} + func getSocketCpus() map[C.int]int { slist := SocketList() var cpu C.int @@ -71,9 +127,15 @@ func getSocketCpus() map[C.int]int { return outmap } -func (m *LikwidCollector) Init() error { +func (m *LikwidCollector) Init(config []byte) error { var ret C.int m.name = "LikwidCollector" + if len(config) > 0 { + err := json.Unmarshal(config, &m.config) + if err != nil { + return err + } + } m.setup() cpulist := CpuList() m.cpulist = make([]C.int, len(cpulist)) @@ -86,161 +148,183 @@ func (m *LikwidCollector) Init() error { m.sock2tid[sid] = i } } - m.metrics = make(map[C.int]map[string]int) - m.groups = make(map[string]C.int) + m.results = make(map[int]map[int]map[string]interface{}) + m.mresults = make(map[int]map[int]map[string]float64) + m.gmresults = make(map[int]map[string]float64) ret = C.topology_init() if ret != 0 { return errors.New("Failed to initialize LIKWID topology") } + os.Setenv("LIKWID_FORCE", "1") ret = C.perfmon_init(C.int(len(m.cpulist)), &m.cpulist[0]) if ret != 0 { C.topology_finalize() return errors.New("Failed to initialize LIKWID topology") } - gpath := C.CString(GROUPPATH) - C.config_setGroupPath(gpath) - C.free(unsafe.Pointer(gpath)) - - for g, metrics := range likwid_metrics { - cstr := C.CString(g) - gid := C.perfmon_addEventSet(cstr) - if gid >= 0 { - gmetrics := 0 - for i, metric := range metrics { - idx, err := getMetricId(gid, metric.search) - if err != nil { - log.Print(err) - } else { - likwid_metrics[g][i].group_idx = idx - gmetrics++ - } - } - if gmetrics > 0 { - m.groups[g] = gid - } - } else { - log.Print("Failed to add events set ", g) - } - C.free(unsafe.Pointer(cstr)) + + for i, evset := range m.config.Eventsets { + estr := eventsToEventStr(evset.Events) + cstr := C.CString(estr) + gid := C.perfmon_addEventSet(cstr) + if gid >= 0 { + m.groups = append(m.groups, gid) + } + C.free(unsafe.Pointer(cstr)) + m.results[i] = make(map[int]map[string]interface{}) + m.mresults[i] = make(map[int]map[string]float64) + for tid, _ := range m.cpulist { + m.results[i][tid] = make(map[string]interface{}) + m.mresults[i][tid] = make(map[string]float64) + m.gmresults[tid] = make(map[string]float64) + } } + if len(m.groups) == 0 { C.perfmon_finalize() C.topology_finalize() return errors.New("No LIKWID performance group initialized") } + m.basefreq = getBaseFreq() + log.Print(m.basefreq) m.init = true return nil } func (m *LikwidCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { - if m.init { - var ret C.int - core_fp_any := make(map[int]float64, len(m.cpulist)) - for _, cpu := range m.cpulist { - core_fp_any[int(cpu)] = 0.0 + if !m.init { + return + } + var ret C.int + + for i, gid := range m.groups { + evset := m.config.Eventsets[i] + ret = C.perfmon_setupCounters(gid) + if ret != 0 { + log.Print("Failed to setup performance group ", C.perfmon_getGroupName(gid)) + continue } - for gname, gid := range m.groups { - ret = C.perfmon_setupCounters(gid) - if ret != 0 { - log.Print("Failed to setup performance group ", gname) - continue - } - ret = C.perfmon_startCounters() - if ret != 0 { - log.Print("Failed to start performance group ", gname) - continue - } - time.Sleep(interval) - ret = C.perfmon_stopCounters() - if ret != 0 { - log.Print("Failed to stop performance group ", gname) - continue - } - - for _, lmetric := range likwid_metrics[gname] { - if lmetric.name == "pwr1" || lmetric.name == "pwr2" { - continue - } - mname := lmetric.name - inverse := false - if mname == "cpi" { - mname = "ipc" - inverse = true - } - if lmetric.socket_scope { - for sid, tid := range m.sock2tid { - res := C.perfmon_getLastMetric(gid, C.int(lmetric.group_idx), C.int(tid)) - y, err := lp.New(lmetric.name, - map[string]string{"type": "socket", "type-id": fmt.Sprintf("%d", int(sid))}, - map[string]interface{}{"value": float64(res)}, - time.Now()) - if err == nil { - *out = append(*out, y) - } - // log.Print("Metric '", lmetric.name,"' on Socket ",int(sid)," returns ", m.sockets[int(sid)][lmetric.name]) - } - } else { - for tid, cpu := range m.cpulist { - res := C.perfmon_getLastMetric(gid, C.int(lmetric.group_idx), C.int(tid)) - value := float64(res) - if inverse { - value = 1.0 / value - } - y, err := lp.New(mname, - map[string]string{"type": "cpu", "type-id": fmt.Sprintf("%d", int(cpu))}, - map[string]interface{}{"value": value}, - time.Now()) - if err == nil { - *out = append(*out, y) - } - if lmetric.name == "flops_dp" { - core_fp_any[int(cpu)] += 2 * float64(res) - } - if lmetric.name == "flops_sp" { - core_fp_any[int(cpu)] += float64(res) - } - // log.Print("Metric '", lmetric.name,"' on CPU ",int(cpu)," returns ", m.cpus[int(cpu)][lmetric.name]) - } - } - } - for sid, tid := range m.sock2tid { - sum := 0.0 - valid := false - for _, lmetric := range likwid_metrics[gname] { - if lmetric.name == "pwr1" || lmetric.name == "pwr2" { - res := C.perfmon_getLastMetric(gid, C.int(lmetric.group_idx), C.int(tid)) - sum += float64(res) - valid = true - } - } - if valid { - y, err := lp.New("power", - map[string]string{"type": "socket", "type-id": fmt.Sprintf("%d", int(sid))}, - map[string]interface{}{"value": float64(sum)}, - time.Now()) - if err == nil { - *out = append(*out, y) - } - } - } - for cpu := range m.cpulist { - y, err := lp.New("flops_any", - map[string]string{"type": "cpu", "type-id": fmt.Sprintf("%d", int(cpu))}, - map[string]interface{}{"value": float64(core_fp_any[int(cpu)])}, - time.Now()) - if err == nil { - *out = append(*out, y) - } - } + ret = C.perfmon_startCounters() + if ret != 0 { + log.Print("Failed to start performance group ", C.perfmon_getGroupName(gid)) + continue + } + time.Sleep(interval) + ret = C.perfmon_stopCounters() + if ret != 0 { + log.Print("Failed to stop performance group ", C.perfmon_getGroupName(gid)) + continue + } + var eidx C.int + for tid, _ := range m.cpulist { + for eidx = 0; int(eidx) < len(evset.Events); eidx++ { + ctr := C.perfmon_getCounterName(gid, eidx) + gctr := C.GoString(ctr) + res := C.perfmon_getLastResult(gid, eidx, C.int(tid)) + m.results[i][tid][gctr] = float64(res) + } + m.results[i][tid]["time"] = float64(interval) + m.results[i][tid]["inverseClock"] = float64(1.0/m.basefreq) + for _, metric := range evset.Metrics { + expression, err := govaluate.NewEvaluableExpression(metric.Calc) + if err != nil { + log.Print(err.Error()) + continue + } + result, err := expression.Evaluate(m.results[i][tid]); + if err != nil { + log.Print(err.Error()) + continue + } + m.mresults[i][tid][metric.Name] = float64(result.(float64)) + } } } + + for _, metric := range m.config.Metrics { + for tid, _ := range m.cpulist { + var params map[string]interface{} + expression, err := govaluate.NewEvaluableExpression(metric.Calc) + if err != nil { + log.Print(err.Error()) + continue + } + params = make(map[string]interface{}) + for j, _ := range m.groups { + for mname, mres := range m.mresults[j][tid] { + params[mname] = mres + } + } + result, err := expression.Evaluate(params); + if err != nil { + log.Print(err.Error()) + continue + } + m.gmresults[tid][metric.Name] = float64(result.(float64)) + } + } + for i, _ := range m.groups { + evset := m.config.Eventsets[i] + for _, metric := range evset.Metrics { + _, skip := stringArrayContains(m.config.ExcludeMetrics, metric.Name) + if metric.Publish && !skip { + if metric.Socket_scope { + for sid, tid := range m.sock2tid { + y, err := lp.New(metric.Name, + map[string]string{"type": "socket", "type-id": fmt.Sprintf("%d", int(sid))}, + map[string]interface{}{"value": m.mresults[i][tid][metric.Name]}, + time.Now()) + if err == nil { + *out = append(*out, y) + } + } + } else { + for tid, cpu := range m.cpulist { + y, err := lp.New(metric.Name, + map[string]string{"type": "cpu", "type-id": fmt.Sprintf("%d", int(cpu))}, + map[string]interface{}{"value": m.mresults[i][tid][metric.Name]}, + time.Now()) + if err == nil { + *out = append(*out, y) + } + } + } + } + } + } + for _, metric := range m.config.Metrics { + _, skip := stringArrayContains(m.config.ExcludeMetrics, metric.Name) + if metric.Publish && !skip { + if metric.Socket_scope { + for sid, tid := range m.sock2tid { + y, err := lp.New(metric.Name, + map[string]string{"type": "socket", "type-id": fmt.Sprintf("%d", int(sid))}, + map[string]interface{}{"value": m.gmresults[tid][metric.Name]}, + time.Now()) + if err == nil { + *out = append(*out, y) + } + } + } else { + for tid, cpu := range m.cpulist { + y, err := lp.New(metric.Name, + map[string]string{"type": "cpu", "type-id": fmt.Sprintf("%d", int(cpu))}, + map[string]interface{}{"value": m.gmresults[tid][metric.Name]}, + time.Now()) + if err == nil { + *out = append(*out, y) + } + } + } + } + } } + func (m *LikwidCollector) Close() { if m.init { + m.init = false C.perfmon_finalize() C.topology_finalize() - m.init = false } return } diff --git a/collectors/loadavgMetric.go b/collectors/loadavgMetric.go index 3b91d75..a17ab06 100644 --- a/collectors/loadavgMetric.go +++ b/collectors/loadavgMetric.go @@ -6,20 +6,32 @@ import ( "strconv" "strings" "time" + "encoding/json" ) const LOADAVGFILE = `/proc/loadavg` +type LoadavgCollectorConfig struct { + ExcludeMetrics []string `json:"exclude_metrics, omitempty"` +} + type LoadavgCollector struct { MetricCollector tags map[string]string load_matches []string proc_matches []string + config LoadavgCollectorConfig } -func (m *LoadavgCollector) Init() error { +func (m *LoadavgCollector) Init(config []byte) error { m.name = "LoadavgCollector" m.setup() + if len(config) > 0 { + err := json.Unmarshal(config, &m.config) + if err != nil { + return err + } + } m.tags = map[string]string{"type": "node"} m.load_matches = []string{"load_one", "load_five", "load_fifteen"} m.proc_matches = []string{"proc_run", "proc_total"} @@ -28,7 +40,10 @@ func (m *LoadavgCollector) Init() error { } func (m *LoadavgCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { - + var skip bool + if !m.init { + return + } buffer, err := ioutil.ReadFile(string(LOADAVGFILE)) if err != nil { @@ -39,8 +54,9 @@ func (m *LoadavgCollector) Read(interval time.Duration, out *[]lp.MutableMetric) for i, name := range m.load_matches { x, err := strconv.ParseFloat(ls[i], 64) if err == nil { + _, skip = stringArrayContains(m.config.ExcludeMetrics, name) y, err := lp.New(name, m.tags, map[string]interface{}{"value": float64(x)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } @@ -49,8 +65,9 @@ func (m *LoadavgCollector) Read(interval time.Duration, out *[]lp.MutableMetric) for i, name := range m.proc_matches { x, err := strconv.ParseFloat(lv[i], 64) if err == nil { + _, skip = stringArrayContains(m.config.ExcludeMetrics, name) y, err := lp.New(name, m.tags, map[string]interface{}{"value": float64(x)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } diff --git a/collectors/lustreMetric.go b/collectors/lustreMetric.go index ce52d54..004f7c1 100644 --- a/collectors/lustreMetric.go +++ b/collectors/lustreMetric.go @@ -7,18 +7,34 @@ import ( "strconv" "strings" "time" + "encoding/json" + "errors" ) const LUSTREFILE = `/proc/fs/lustre/llite/lnec-XXXXXX/stats` +type LustreCollectorConfig struct { + procfiles []string `json:"procfiles"` + ExcludeMetrics []string `json:"exclude_metrics"` +} + type LustreCollector struct { MetricCollector tags map[string]string matches map[string]map[string]int + devices []string + config LustreCollectorConfig } -func (m *LustreCollector) Init() error { +func (m *LustreCollector) Init(config []byte) error { + var err error m.name = "LustreCollector" + if len(config) > 0 { + err = json.Unmarshal(config, &m.config) + if err != nil { + return err + } + } m.setup() m.tags = map[string]string{"type": "node"} m.matches = map[string]map[string]int{"read_bytes": {"read_bytes": 6, "read_requests": 1}, @@ -29,38 +45,58 @@ func (m *LustreCollector) Init() error { "getattr": {"getattr": 1}, "statfs": {"statfs": 1}, "inode_permission": {"inode_permission": 1}} - _, err := ioutil.ReadFile(string(LUSTREFILE)) - if err == nil { - m.init = true + m.devices = make([]string, 0) + for _, p := range m.config.procfiles { + _, err := ioutil.ReadFile(p) + if err == nil { + m.devices = append(m.devices, p) + } else { + log.Print(err.Error()) + continue + } } - return err + + if len(m.devices) == 0 { + return errors.New("No metrics to collect") + } + m.init = true + return nil } func (m *LustreCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { - buffer, err := ioutil.ReadFile(string(LUSTREFILE)) + if !m.init { + return + } + for _, p := range m.devices { + buffer, err := ioutil.ReadFile(p) - if err != nil { - log.Print(err) - return - } + if err != nil { + log.Print(err) + return + } - for _, line := range strings.Split(string(buffer), "\n") { - lf := strings.Fields(line) - if len(lf) > 1 { - for match, fields := range m.matches { - if lf[0] == match { - for name, idx := range fields { - x, err := strconv.ParseInt(lf[idx], 0, 64) - if err == nil { - y, err := lp.New(name, m.tags, map[string]interface{}{"value": x}, time.Now()) - if err == nil { - *out = append(*out, y) - } - } - } - } - } - } + for _, line := range strings.Split(string(buffer), "\n") { + lf := strings.Fields(line) + if len(lf) > 1 { + for match, fields := range m.matches { + if lf[0] == match { + for name, idx := range fields { + _, skip := stringArrayContains(m.config.ExcludeMetrics, name) + if skip { + continue + } + x, err := strconv.ParseInt(lf[idx], 0, 64) + if err == nil { + y, err := lp.New(name, m.tags, map[string]interface{}{"value": x}, time.Now()) + if err == nil { + *out = append(*out, y) + } + } + } + } + } + } + } } } diff --git a/collectors/memstatMetric.go b/collectors/memstatMetric.go index e163879..821ccfb 100644 --- a/collectors/memstatMetric.go +++ b/collectors/memstatMetric.go @@ -9,22 +9,36 @@ import ( "strconv" "strings" "time" + "encoding/json" ) const MEMSTATFILE = `/proc/meminfo` +type MemstatCollectorConfig struct { + ExcludeMetrics []string `json:"exclude_metrics"` +} + type MemstatCollector struct { MetricCollector stats map[string]int64 tags map[string]string matches map[string]string + config MemstatCollectorConfig } -func (m *MemstatCollector) Init() error { +func (m *MemstatCollector) Init(config []byte) error { + var err error m.name = "MemstatCollector" + if len(config) > 0 { + err = json.Unmarshal(config, &m.config) + if err != nil { + return err + } + } m.stats = make(map[string]int64) + m.matches = make(map[string]string) m.tags = map[string]string{"type": "node"} - m.matches = map[string]string{`MemTotal`: "mem_total", + matches := map[string]string{`MemTotal`: "mem_total", "SwapTotal": "swap_total", "SReclaimable": "mem_sreclaimable", "Slab": "mem_slab", @@ -33,8 +47,17 @@ func (m *MemstatCollector) Init() error { "Cached": "mem_cached", "MemAvailable": "mem_available", "SwapFree": "swap_free"} + for k, v := range matches { + _, skip := stringArrayContains(m.config.ExcludeMetrics, k) + if (!skip) { + m.matches[k] = v + } + } + if len(m.matches) == 0 { + return errors.New("No metrics to collect") + } m.setup() - _, err := ioutil.ReadFile(string(MEMSTATFILE)) + _, err = ioutil.ReadFile(string(MEMSTATFILE)) if err == nil { m.init = true } @@ -42,15 +65,17 @@ func (m *MemstatCollector) Init() error { } func (m *MemstatCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { - buffer, err := ioutil.ReadFile(string(MEMSTATFILE)) + if !m.init { + return + } + buffer, err := ioutil.ReadFile(string(MEMSTATFILE)) if err != nil { log.Print(err) return } ll := strings.Split(string(buffer), "\n") - for _, line := range ll { ls := strings.Split(line, `:`) if len(ls) > 1 { @@ -81,16 +106,18 @@ func (m *MemstatCollector) Read(interval time.Duration, out *[]lp.MutableMetric) if _, buffers := m.stats[`Buffers`]; buffers { if _, cached := m.stats[`Cached`]; cached { memUsed := m.stats[`MemTotal`] - (m.stats[`MemFree`] + m.stats[`Buffers`] + m.stats[`Cached`]) + _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used") y, err := lp.New("mem_used", m.tags, map[string]interface{}{"value": int(float64(memUsed) * 1.0e-3)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } } } if _, found := m.stats[`MemShared`]; found { + _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_shared") y, err := lp.New("mem_shared", m.tags, map[string]interface{}{"value": int(float64(m.stats[`MemShared`]) * 1.0e-3)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } diff --git a/collectors/metricCollector.go b/collectors/metricCollector.go index 14c72ec..42871b7 100644 --- a/collectors/metricCollector.go +++ b/collectors/metricCollector.go @@ -7,11 +7,12 @@ import ( "strconv" "strings" "time" + "errors" ) type MetricGetter interface { Name() string - Init() error + Init(config []byte) error Read(time.Duration, *[]lp.MutableMetric) Close() // GetNodeMetric() map[string]interface{} @@ -67,6 +68,15 @@ func intArrayContains(array []int, str int) (int, bool) { return -1, false } +func stringArrayContains(array []string, str string) (int, bool) { + for i, a := range array { + if a == str { + return i, true + } + } + return -1, false +} + func SocketList() []int { buffer, err := ioutil.ReadFile("/proc/cpuinfo") if err != nil { @@ -132,3 +142,13 @@ func Fields2Map(metric lp.Metric) map[string]interface{} { } return fields } + +func RemoveFromStringList(s []string, r string) ([]string, error) { + for i, item := range s { + if r == item { + return append(s[:i], s[i+1:]...), nil + } + } + return s, errors.New("No such string in list") +} + diff --git a/collectors/netstatMetric.go b/collectors/netstatMetric.go index b696f74..8acc266 100644 --- a/collectors/netstatMetric.go +++ b/collectors/netstatMetric.go @@ -7,27 +7,36 @@ import ( "strconv" "strings" "time" + "encoding/json" ) const NETSTATFILE = `/proc/net/dev` -type NetstatCollector struct { - MetricCollector - matches map[int]string - tags map[string]string +type NetstatCollectorConfig struct { + ExcludeDevices []string `json:"exclude_devices, omitempty"` } -func (m *NetstatCollector) Init() error { +type NetstatCollector struct { + MetricCollector + config NetstatCollectorConfig + matches map[int]string +} + +func (m *NetstatCollector) Init(config []byte) error { m.name = "NetstatCollector" m.setup() - m.tags = map[string]string{"type": "node"} m.matches = map[int]string{ 1: "bytes_in", 9: "bytes_out", 2: "pkts_in", 10: "pkts_out", } - _, err := ioutil.ReadFile(string(NETSTATFILE)) + err := json.Unmarshal(config, &m.config) + if err != nil { + log.Print(err.Error()) + return err + } + _, err = ioutil.ReadFile(string(NETSTATFILE)) if err == nil { m.init = true } @@ -48,13 +57,20 @@ func (m *NetstatCollector) Read(interval time.Duration, out *[]lp.MutableMetric) } f := strings.Fields(l) dev := f[0][0 : len(f[0])-1] - if dev == "lo" { + cont := false + for _, d := range m.config.ExcludeDevices { + if d == dev { + cont = true + } + } + if cont { continue } + tags := map[string]string{"device" : dev, "type": "node"} for i, name := range m.matches { v, err := strconv.ParseInt(f[i], 10, 0) if err == nil { - y, err := lp.New(name, m.tags, map[string]interface{}{"value": int(float64(v) * 1.0e-3)}, time.Now()) + y, err := lp.New(name, tags, map[string]interface{}{"value": int(float64(v) * 1.0e-3)}, time.Now()) if err == nil { *out = append(*out, y) } diff --git a/collectors/nvidiaMetric.go b/collectors/nvidiaMetric.go index 12312d5..4c6e61f 100644 --- a/collectors/nvidiaMetric.go +++ b/collectors/nvidiaMetric.go @@ -7,11 +7,18 @@ import ( lp "github.com/influxdata/line-protocol" "log" "time" + "encoding/json" ) +type NvidiaCollectorConfig struct { + ExcludeMetrics []string `json:"exclude_metrics, omitempty"` + ExcludeDevices []string `json:"exclude_devices, omitempty"` +} + type NvidiaCollector struct { MetricCollector num_gpus int + config NvidiaCollectorConfig } func (m *NvidiaCollector) CatchPanic() error { @@ -24,20 +31,26 @@ func (m *NvidiaCollector) CatchPanic() error { return nil } -func (m *NvidiaCollector) Init() error { +func (m *NvidiaCollector) Init(config []byte) error { + var err error m.name = "NvidiaCollector" m.setup() + if len(config) > 0 { + err = json.Unmarshal(config, &m.config) + if err != nil { + return err + } + } m.num_gpus = 0 defer m.CatchPanic() ret := nvml.Init() if ret != nvml.SUCCESS { - err := errors.New(nvml.ErrorString(ret)) - log.Print(err) + err = errors.New(nvml.ErrorString(ret)) return err } m.num_gpus, ret = nvml.DeviceGetCount() if ret != nvml.SUCCESS { - err := errors.New(nvml.ErrorString(ret)) + err = errors.New(nvml.ErrorString(ret)) return err } m.init = true @@ -45,23 +58,31 @@ func (m *NvidiaCollector) Init() error { } func (m *NvidiaCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { - + if (!m.init) { + return + } for i := 0; i < m.num_gpus; i++ { device, ret := nvml.DeviceGetHandleByIndex(i) if ret != nvml.SUCCESS { log.Fatalf("Unable to get device at index %d: %v", i, nvml.ErrorString(ret)) return } + _, skip := stringArrayContains(m.config.ExcludeDevices, fmt.Sprintf("%d", i)) + if skip { + continue + } tags := map[string]string{"type": "accelerator", "type-id": fmt.Sprintf("%d", i)} util, ret := nvml.DeviceGetUtilizationRates(device) if ret == nvml.SUCCESS { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "util") y, err := lp.New("util", tags, map[string]interface{}{"value": float64(util.Gpu)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } + _, skip = stringArrayContains(m.config.ExcludeMetrics, "mem_util") y, err = lp.New("mem_util", tags, map[string]interface{}{"value": float64(util.Memory)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } @@ -69,29 +90,33 @@ func (m *NvidiaCollector) Read(interval time.Duration, out *[]lp.MutableMetric) meminfo, ret := nvml.DeviceGetMemoryInfo(device) if ret == nvml.SUCCESS { t := float64(meminfo.Total) / (1024 * 1024) + _, skip = stringArrayContains(m.config.ExcludeMetrics, "mem_total") y, err := lp.New("mem_total", tags, map[string]interface{}{"value": t}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } f := float64(meminfo.Used) / (1024 * 1024) + _, skip = stringArrayContains(m.config.ExcludeMetrics, "fb_memory") y, err = lp.New("fb_memory", tags, map[string]interface{}{"value": f}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } temp, ret := nvml.DeviceGetTemperature(device, nvml.TEMPERATURE_GPU) if ret == nvml.SUCCESS { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "temp") y, err := lp.New("temp", tags, map[string]interface{}{"value": float64(temp)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } fan, ret := nvml.DeviceGetFanSpeed(device) if ret == nvml.SUCCESS { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "fan") y, err := lp.New("fan", tags, map[string]interface{}{"value": float64(fan)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } @@ -108,116 +133,131 @@ func (m *NvidiaCollector) Read(interval time.Duration, out *[]lp.MutableMetric) default: y, err = lp.New("ecc_mode", tags, map[string]interface{}{"value": string("UNKNOWN")}, time.Now()) } - if err == nil { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "ecc_mode") + if err == nil && !skip { *out = append(*out, y) } } else if ret == nvml.ERROR_NOT_SUPPORTED { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "ecc_mode") y, err := lp.New("ecc_mode", tags, map[string]interface{}{"value": string("N/A")}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } pstate, ret := nvml.DeviceGetPerformanceState(device) if ret == nvml.SUCCESS { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "perf_state") y, err := lp.New("perf_state", tags, map[string]interface{}{"value": fmt.Sprintf("P%d", int(pstate))}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } power, ret := nvml.DeviceGetPowerUsage(device) if ret == nvml.SUCCESS { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "power_usage_report") y, err := lp.New("power_usage_report", tags, map[string]interface{}{"value": float64(power) / 1000}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } gclk, ret := nvml.DeviceGetClockInfo(device, nvml.CLOCK_GRAPHICS) if ret == nvml.SUCCESS { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "graphics_clock_report") y, err := lp.New("graphics_clock_report", tags, map[string]interface{}{"value": float64(gclk)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } smclk, ret := nvml.DeviceGetClockInfo(device, nvml.CLOCK_SM) if ret == nvml.SUCCESS { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "sm_clock_report") y, err := lp.New("sm_clock_report", tags, map[string]interface{}{"value": float64(smclk)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } memclk, ret := nvml.DeviceGetClockInfo(device, nvml.CLOCK_MEM) if ret == nvml.SUCCESS { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "mem_clock_report") y, err := lp.New("mem_clock_report", tags, map[string]interface{}{"value": float64(memclk)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } max_gclk, ret := nvml.DeviceGetMaxClockInfo(device, nvml.CLOCK_GRAPHICS) if ret == nvml.SUCCESS { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "max_graphics_clock") y, err := lp.New("max_graphics_clock", tags, map[string]interface{}{"value": float64(max_gclk)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } max_smclk, ret := nvml.DeviceGetClockInfo(device, nvml.CLOCK_SM) if ret == nvml.SUCCESS { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "max_sm_clock") y, err := lp.New("max_sm_clock", tags, map[string]interface{}{"value": float64(max_smclk)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } max_memclk, ret := nvml.DeviceGetClockInfo(device, nvml.CLOCK_MEM) if ret == nvml.SUCCESS { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "max_mem_clock") y, err := lp.New("max_mem_clock", tags, map[string]interface{}{"value": float64(max_memclk)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } ecc_db, ret := nvml.DeviceGetTotalEccErrors(device, 1, 1) if ret == nvml.SUCCESS { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "ecc_db_error") y, err := lp.New("ecc_db_error", tags, map[string]interface{}{"value": float64(ecc_db)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } ecc_sb, ret := nvml.DeviceGetTotalEccErrors(device, 0, 1) if ret == nvml.SUCCESS { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "ecc_sb_error") y, err := lp.New("ecc_sb_error", tags, map[string]interface{}{"value": float64(ecc_sb)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } pwr_limit, ret := nvml.DeviceGetPowerManagementLimit(device) if ret == nvml.SUCCESS { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "power_man_limit") y, err := lp.New("power_man_limit", tags, map[string]interface{}{"value": float64(pwr_limit)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } enc_util, _, ret := nvml.DeviceGetEncoderUtilization(device) if ret == nvml.SUCCESS { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "encoder_util") y, err := lp.New("encoder_util", tags, map[string]interface{}{"value": float64(enc_util)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } dec_util, _, ret := nvml.DeviceGetDecoderUtilization(device) if ret == nvml.SUCCESS { + _, skip = stringArrayContains(m.config.ExcludeMetrics, "decoder_util") y, err := lp.New("decoder_util", tags, map[string]interface{}{"value": float64(dec_util)}, time.Now()) - if err == nil { + if err == nil && !skip { *out = append(*out, y) } } diff --git a/collectors/topprocsMetric.go b/collectors/topprocsMetric.go index 13f547f..fbd5d8c 100644 --- a/collectors/topprocsMetric.go +++ b/collectors/topprocsMetric.go @@ -7,29 +7,50 @@ import ( "os/exec" "strings" "time" + "encoding/json" + "errors" ) -const NUM_PROCS = 5 +const MAX_NUM_PROCS = 10 + +type TopProcsCollectorConfig struct { + num_procs int `json:"num_procs"` +} type TopProcsCollector struct { MetricCollector tags map[string]string + config TopProcsCollectorConfig } -func (m *TopProcsCollector) Init() error { +func (m *TopProcsCollector) Init(config []byte) error { + var err error m.name = "TopProcsCollector" m.tags = map[string]string{"type": "node"} + if len(config) > 0 { + err = json.Unmarshal(config, &m.config) + if err != nil { + return err + } + } + if m.config.num_procs <= 0 || m.config.num_procs > MAX_NUM_PROCS { + return errors.New(fmt.Sprintf("num_procs option must be set in 'topprocs' config (range: 1-%d)", MAX_NUM_PROCS)) + } m.setup() command := exec.Command("ps", "-Ao", "comm", "--sort=-pcpu") command.Wait() - _, err := command.Output() - if err == nil { - m.init = true + _, err = command.Output() + if err != nil { + return errors.New("Failed to execute command") } + m.init = true return nil } func (m *TopProcsCollector) Read(interval time.Duration, out *[]lp.MutableMetric) { + if !m.init { + return + } command := exec.Command("ps", "-Ao", "comm", "--sort=-pcpu") command.Wait() stdout, err := command.Output() @@ -39,7 +60,7 @@ func (m *TopProcsCollector) Read(interval time.Duration, out *[]lp.MutableMetric } lines := strings.Split(string(stdout), "\n") - for i := 1; i < NUM_PROCS+1; i++ { + for i := 1; i < m.config.num_procs+1; i++ { name := fmt.Sprintf("topproc%d", i) y, err := lp.New(name, m.tags, map[string]interface{}{"value": string(lines[i])}, time.Now()) if err == nil { diff --git a/config.json b/config.json index d4b8ca8..d05aeaf 100644 --- a/config.json +++ b/config.json @@ -1,30 +1,142 @@ { - "sink": { - "user": "testuser", - "password": "testpass", - "host": "127.0.0.1", - "port": "9090", - "database": "testdb", - "organization": "testorg", - "type": "stdout" + "sink": { + "user": "testuser", + "password": "testpass", + "host": "127.0.0.1", + "port": "9090", + "database": "testdb", + "organization": "testorg", + "type": "stdout" + }, + "interval": 3, + "duration": 1, + "collectors": [ + "loadavg", + "likwid" + ], + "default_tags": { + "cluster": "testcluster" + }, + "receiver": { + "type": "none" + }, + "collect_config": { + "netstat": { + "exclude_devices": [ + "enp195s0f1", + "lo" + ] }, - "interval" : 3, - "duration" : 1, - "collectors": [ - "cpustat", - "loadavg", - "memstat", - "netstat", - "topprocs", - "lustrestat", - "ibstat", - "nvidia", - "likwid" - ], - "default_tags": { - "cluster": "testcluster" + "ibstat": { + "exclude_devices": [ + "mlx5_0", + "mlx5_1" + ] }, - "receiver": { - "type": "none" + "cpustat": { + "exclude_metrics": [ + "cpu_softirq" + ] + }, + "diskstat": { + "exclude_metrics": [ + "writes_merged" + ] + }, + "nvidia": { + "exclude_metrics": [ + "util" + ] + }, + "loadavg": { + "exclude_metrics": [ + "load_fifteen" + ] + }, + "likwid": { + "exclude_metrics": [ + "load_fifteen" + ], + "eventsets": [ + { + "events": { + "FIXC1": "ACTUAL_CPU_CLOCK", + "FIXC2": "MAX_CPU_CLOCK", + "PMC0": "RETIRED_INSTRUCTIONS", + "PMC1": "CPU_CLOCKS_UNHALTED", + "PMC2": "RETIRED_SSE_AVX_FLOPS_ALL", + "PMC3": "MERGE", + "DFC0": "DRAM_CHANNEL_0", + "DFC1": "DRAM_CHANNEL_1", + "DFC2": "DRAM_CHANNEL_2", + "DFC3": "DRAM_CHANNEL_3" + }, + "metrics": [ + { + "name": "ipc", + "calc": "PMC0/PMC1", + "socket_scope": false, + "publish": true + }, + { + "name": "flops_any", + "calc": "0.000001*PMC2/time", + "socket_scope": false, + "publish": true + }, + { + "name": "clock_mhz", + "calc": "0.000001*(FIXC1/FIXC2)/inverseClock", + "socket_scope": false, + "publish": true + }, + { + "name": "mem1", + "calc": "0.000001*(DFC0+DFC1+DFC2+DFC3)*64.0/time", + "socket_scope": true, + "publish": false + } + ] + }, + { + "events": { + "DFC0": "DRAM_CHANNEL_4", + "DFC1": "DRAM_CHANNEL_5", + "DFC2": "DRAM_CHANNEL_6", + "DFC3": "DRAM_CHANNEL_7", + "PWR0": "RAPL_CORE_ENERGY", + "PWR1": "RAPL_PKG_ENERGY" + }, + "metrics": [ + { + "name": "pwr_core", + "calc": "PWR0/time", + "socket_scope": false, + "publish": true + }, + { + "name": "pwr_pkg", + "calc": "PWR1/time", + "socket_scope": true, + "publish": true + }, + { + "name": "mem2", + "calc": "0.000001*(DFC0+DFC1+DFC2+DFC3)*64.0/time", + "socket_scope": true, + "publish": false + } + ] + } + ], + "globalmetrics": [ + { + "name": "mem_bw", + "calc": "mem1+mem2", + "socket_scope": true, + "publish": true + } + ] } + } } diff --git a/metric-collector.go b/metric-collector.go index ff67f3c..a0f26ae 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -51,6 +51,7 @@ type GlobalConfig struct { Collectors []string `json:"collectors"` Receiver receivers.ReceiverConfig `json:"receiver"` DefTags map[string]string `json:"default_tags"` + CollectConfigs map[string]json.RawMessage `json:"collect_config"` } // Load JSON configuration file @@ -62,7 +63,7 @@ func LoadConfiguration(file string, config *GlobalConfig) error { return err } jsonParser := json.NewDecoder(configFile) - jsonParser.Decode(config) + err = jsonParser.Decode(config) return err } @@ -71,11 +72,17 @@ func ReadCli() map[string]string { cfg := flag.String("config", "./config.json", "Path to configuration file") logfile := flag.String("log", "stderr", "Path for logfile") pidfile := flag.String("pidfile", "/var/run/cc-metric-collector.pid", "Path for PID file") + once := flag.Bool("once", false, "Run all collectors only once") flag.Parse() m = make(map[string]string) m["configfile"] = *cfg m["logfile"] = *logfile m["pidfile"] = *pidfile + if *once { + m["once"] = "true" + } else { + m["once"] = "false" + } return m } @@ -114,27 +121,33 @@ func RemovePidfile(pidfile string) error { return nil } +// General shutdown function that gets executed in case of interrupt or graceful shutdown +func shutdown(wg *sync.WaitGroup, collectors []string, sink sinks.SinkFuncs, recv receivers.ReceiverFuncs, pidfile string) { + log.Print("Shutdown...") + for _, c := range collectors { + col := Collectors[c] + log.Print("Stop ", col.Name()) + col.Close() + } + time.Sleep(1 * time.Second) + if recv != nil { + recv.Close() + } + sink.Close() + RemovePidfile(pidfile) + wg.Done() +} + // Register an interrupt handler for Ctrl+C and similar. At signal, // all collectors are closed -func shutdown(wg *sync.WaitGroup, config *GlobalConfig, sink sinks.SinkFuncs, recv receivers.ReceiverFuncs, pidfile string) { +func prepare_shutdown(wg *sync.WaitGroup, config *GlobalConfig, sink sinks.SinkFuncs, recv receivers.ReceiverFuncs, pidfile string) { sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt) go func(wg *sync.WaitGroup) { <-sigs log.Print("Shutdown...") - for _, c := range config.Collectors { - col := Collectors[c] - log.Print("Stop ", col.Name()) - col.Close() - } - time.Sleep(1 * time.Second) - if recv != nil { - recv.Close() - } - sink.Close() - RemovePidfile(pidfile) - wg.Done() + shutdown(wg, config.Collectors, sink, recv, pidfile) }(wg) } @@ -162,6 +175,7 @@ func main() { err = LoadConfiguration(clicfg["configfile"], &config) if err != nil { log.Print("Error reading configuration file ", clicfg["configfile"]) + log.Print(err.Error()) return } if config.Interval <= 0 || time.Duration(config.Interval)*time.Second <= 0 { @@ -214,15 +228,19 @@ func main() { } // Register interrupt handler - shutdown(&wg, &config, sink, recv, clicfg["pidfile"]) + prepare_shutdown(&wg, &config, sink, recv, clicfg["pidfile"]) // Initialize all collectors tmp := make([]string, 0) for _, c := range config.Collectors { col := Collectors[c] - err = col.Init() + conf, found := config.CollectConfigs[c] + if !found { + conf = json.RawMessage("") + } + err = col.Init([]byte(conf)) if err != nil { - log.Print("SKIP ", col.Name()) + log.Print("SKIP ", col.Name(), " (", err.Error(),")") } else { log.Print("Start ", col.Name()) tmp = append(tmp, c) @@ -232,7 +250,11 @@ func main() { config.DefTags["hostname"] = host // Setup up ticker loop - log.Print("Running loop every ", time.Duration(config.Interval)*time.Second) + if clicfg["once"] != "true" { + log.Print("Running loop every ", time.Duration(config.Interval)*time.Second) + } else { + log.Print("Running loop only once") + } ticker := time.NewTicker(time.Duration(config.Interval) * time.Second) done := make(chan bool) @@ -274,6 +296,10 @@ func main() { if err := sink.Flush(); err != nil { log.Printf("sink error: %s\n", err) } + if clicfg["once"] == "true" { + shutdown(&wg, config.Collectors, sink, recv, clicfg["pidfile"]) + return + } } } }()