diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index 6140dbf..98b6115 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -14,23 +14,24 @@ import ( // Map of all available metric collectors var AvailableCollectors = map[string]MetricCollector{ - "likwid": new(LikwidCollector), - "loadavg": new(LoadavgCollector), - "memstat": new(MemstatCollector), - "netstat": new(NetstatCollector), - "ibstat": new(InfinibandCollector), - "lustrestat": new(LustreCollector), - "cpustat": new(CpustatCollector), - "topprocs": new(TopProcsCollector), - "nvidia": new(NvidiaCollector), - "customcmd": new(CustomCmdCollector), - "diskstat": new(DiskstatCollector), - "tempstat": new(TempCollector), - "ipmistat": new(IpmiCollector), - "gpfs": new(GpfsCollector), - "cpufreq": new(CPUFreqCollector), - "cpufreq_cpuinfo": new(CPUFreqCpuInfoCollector), - "nfsstat": new(NfsCollector), + "likwid": new(LikwidCollector), + "loadavg": new(LoadavgCollector), + "memstat": new(MemstatCollector), + "netstat": new(NetstatCollector), + "ibstat": new(InfinibandCollector), + "ibstat_perfquery": new(InfinibandPerfQueryCollector), + "lustrestat": new(LustreCollector), + "cpustat": new(CpustatCollector), + "topprocs": new(TopProcsCollector), + "nvidia": new(NvidiaCollector), + "customcmd": new(CustomCmdCollector), + "diskstat": new(DiskstatCollector), + "tempstat": new(TempCollector), + "ipmistat": new(IpmiCollector), + "gpfs": new(GpfsCollector), + "cpufreq": new(CPUFreqCollector), + "cpufreq_cpuinfo": new(CPUFreqCpuInfoCollector), + "nfsstat": new(NfsCollector), } type collectorManager struct { diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index af4e579..f506f37 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -3,10 +3,9 @@ package collectors import ( "fmt" "io/ioutil" - "log" - "os/exec" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - // "os" + "encoding/json" "errors" "path/filepath" @@ -15,35 +14,25 @@ import ( "time" ) -const ( - IBBASEPATH = `/sys/class/infiniband/` - PERFQUERY = `/usr/sbin/perfquery` -) - -type InfinibandCollectorConfig struct { - ExcludeDevices []string `json:"exclude_devices,omitempty"` - PerfQueryPath string `json:"perfquery_path"` -} +const IB_BASEPATH = `/sys/class/infiniband/` type InfinibandCollector struct { metricCollector - tags map[string]string - lids map[string]map[string]string - config InfinibandCollectorConfig - use_perfquery bool + tags map[string]string + lids map[string]map[string]string + config struct { + ExcludeDevices []string `json:"exclude_devices,omitempty"` + } } func (m *InfinibandCollector) Help() { - fmt.Println("This collector includes all devices that can be found below ", IBBASEPATH) - fmt.Println("and where any of the ports provides a 'lid' file (glob ", IBBASEPATH, "//ports//lid).") + fmt.Println("This collector includes all devices that can be found below ", IB_BASEPATH) + fmt.Println("and where any of the ports provides a 'lid' file (glob ", IB_BASEPATH, "//ports//lid).") fmt.Println("The devices can be filtered with the 'exclude_devices' option in the configuration.") fmt.Println("For each found LIDs the collector calls the 'perfquery' command") - fmt.Println("The path to the 'perfquery' command can be configured with the 'perfquery_path' option") - fmt.Println("in the configuration") fmt.Println("") fmt.Println("Full configuration object:") fmt.Println("\"ibstat\" : {") - fmt.Println(" \"perfquery_path\" : \"path/to/perfquery\" # if omitted, it searches in $PATH") fmt.Println(" \"exclude_devices\" : [\"dev1\"]") fmt.Println("}") fmt.Println("") @@ -57,7 +46,6 @@ func (m *InfinibandCollector) Help() { func (m *InfinibandCollector) Init(config json.RawMessage) error { var err error m.name = "InfinibandCollector" - m.use_perfquery = false m.setup() m.meta = map[string]string{"source": m.name, "group": "Network"} m.tags = map[string]string{"type": "node"} @@ -67,19 +55,13 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { return err } } - if len(m.config.PerfQueryPath) == 0 { - path, err := exec.LookPath("perfquery") - if err == nil { - m.config.PerfQueryPath = path - } - } m.lids = make(map[string]map[string]string) - p := fmt.Sprintf("%s/*/ports/*/lid", string(IBBASEPATH)) + p := fmt.Sprintf("%s/*/ports/*/lid", string(IB_BASEPATH)) files, err := filepath.Glob(p) for _, f := range files { lid, err := ioutil.ReadFile(f) if err == nil { - plist := strings.Split(strings.Replace(f, string(IBBASEPATH), "", -1), "/") + plist := strings.Split(strings.Replace(f, string(IB_BASEPATH), "", -1), "/") skip := false for _, d := range m.config.ExcludeDevices { if d == plist[0] { @@ -93,152 +75,11 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { } } - for _, ports := range m.lids { - for port, lid := range ports { - args := fmt.Sprintf("-r %s %s 0xf000", lid, port) - command := exec.Command(m.config.PerfQueryPath, args) - command.Wait() - _, err := command.Output() - if err == nil { - m.use_perfquery = true - } - break - } - break + if len(m.lids) == 0 { + return errors.New("No usable IB devices") } - if len(m.lids) > 0 { - m.init = true - } else { - err = errors.New("No usable devices") - } - - return err -} - -func (m *InfinibandCollector) doPerfQuery(cmd string, dev string, lid string, port string, tags map[string]string, output chan lp.CCMetric) error { - - args := fmt.Sprintf("-r %s %s 0xf000", lid, port) - command := exec.Command(cmd, args) - command.Wait() - stdout, err := command.Output() - if err != nil { - log.Print(err) - return err - } - ll := strings.Split(string(stdout), "\n") - - for _, line := range ll { - if strings.HasPrefix(line, "PortRcvData") || strings.HasPrefix(line, "RcvData") { - lv := strings.Fields(line) - v, err := strconv.ParseFloat(lv[1], 64) - if err == nil { - y, err := lp.New("ib_recv", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - if strings.HasPrefix(line, "PortXmitData") || strings.HasPrefix(line, "XmtData") { - lv := strings.Fields(line) - v, err := strconv.ParseFloat(lv[1], 64) - if err == nil { - y, err := lp.New("ib_xmit", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - if strings.HasPrefix(line, "PortRcvPkts") || strings.HasPrefix(line, "RcvPkts") { - lv := strings.Fields(line) - v, err := strconv.ParseFloat(lv[1], 64) - if err == nil { - y, err := lp.New("ib_recv_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - if strings.HasPrefix(line, "PortXmitPkts") || strings.HasPrefix(line, "XmtPkts") { - lv := strings.Fields(line) - v, err := strconv.ParseFloat(lv[1], 64) - if err == nil { - y, err := lp.New("ib_xmit_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - if strings.HasPrefix(line, "PortRcvPkts") || strings.HasPrefix(line, "RcvPkts") { - lv := strings.Fields(line) - v, err := strconv.ParseFloat(lv[1], 64) - if err == nil { - y, err := lp.New("ib_recv_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - if strings.HasPrefix(line, "PortXmitPkts") || strings.HasPrefix(line, "XmtPkts") { - lv := strings.Fields(line) - v, err := strconv.ParseFloat(lv[1], 64) - if err == nil { - y, err := lp.New("ib_xmit_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - } - return nil -} - -func (m *InfinibandCollector) doSysfsRead(dev string, lid string, port string, tags map[string]string, output chan lp.CCMetric) error { - path := fmt.Sprintf("%s/%s/ports/%s/counters/", string(IBBASEPATH), dev, port) - buffer, err := ioutil.ReadFile(fmt.Sprintf("%s/port_rcv_data", path)) - if err == nil { - data := strings.Replace(string(buffer), "\n", "", -1) - v, err := strconv.ParseFloat(data, 64) - if err == nil { - y, err := lp.New("ib_recv", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - buffer, err = ioutil.ReadFile(fmt.Sprintf("%s/port_xmit_data", path)) - if err == nil { - data := strings.Replace(string(buffer), "\n", "", -1) - v, err := strconv.ParseFloat(data, 64) - if err == nil { - y, err := lp.New("ib_xmit", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - buffer, err = ioutil.ReadFile(fmt.Sprintf("%s/port_rcv_packets", path)) - if err == nil { - data := strings.Replace(string(buffer), "\n", "", -1) - v, err := strconv.ParseFloat(data, 64) - if err == nil { - y, err := lp.New("ib_recv_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } - buffer, err = ioutil.ReadFile(fmt.Sprintf("%s/port_xmit_packets", path)) - if err == nil { - data := strings.Replace(string(buffer), "\n", "", -1) - v, err := strconv.ParseFloat(data, 64) - if err == nil { - y, err := lp.New("ib_xmit_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) - if err == nil { - output <- y - } - } - } + m.init = true return nil } @@ -247,11 +88,55 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr if m.init { for dev, ports := range m.lids { for port, lid := range ports { - tags := map[string]string{"type": "node", "device": dev, "port": port} - if m.use_perfquery { - m.doPerfQuery(m.config.PerfQueryPath, dev, lid, port, tags, output) - } else { - m.doSysfsRead(dev, lid, port, tags, output) + tags := map[string]string{ + "type": "node", + "device": dev, + "port": port, + "lid": lid} + path := fmt.Sprintf("%s/%s/ports/%s/counters/", string(IB_BASEPATH), dev, port) + buffer, err := ioutil.ReadFile(fmt.Sprintf("%s/port_rcv_data", path)) + if err == nil { + data := strings.Replace(string(buffer), "\n", "", -1) + v, err := strconv.ParseFloat(data, 64) + if err == nil { + y, err := lp.New("ib_recv", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + output <- y + } + } + } + buffer, err = ioutil.ReadFile(fmt.Sprintf("%s/port_xmit_data", path)) + if err == nil { + data := strings.Replace(string(buffer), "\n", "", -1) + v, err := strconv.ParseFloat(data, 64) + if err == nil { + y, err := lp.New("ib_xmit", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + output <- y + } + } + } + buffer, err = ioutil.ReadFile(fmt.Sprintf("%s/port_rcv_packets", path)) + if err == nil { + data := strings.Replace(string(buffer), "\n", "", -1) + v, err := strconv.ParseFloat(data, 64) + if err == nil { + y, err := lp.New("ib_recv_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + output <- y + } + } + } + buffer, err = ioutil.ReadFile(fmt.Sprintf("%s/port_xmit_packets", path)) + if err == nil { + data := strings.Replace(string(buffer), "\n", "", -1) + v, err := strconv.ParseFloat(data, 64) + if err == nil { + y, err := lp.New("ib_xmit_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + output <- y + } + } } } } diff --git a/collectors/infinibandPerfQueryMetric.go b/collectors/infinibandPerfQueryMetric.go new file mode 100644 index 0000000..d8f7bf4 --- /dev/null +++ b/collectors/infinibandPerfQueryMetric.go @@ -0,0 +1,250 @@ +package collectors + +import ( + "fmt" + "io/ioutil" + "log" + "os/exec" + + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + + // "os" + "encoding/json" + "errors" + "path/filepath" + "strconv" + "strings" + "time" +) + +const PERFQUERY = `/usr/sbin/perfquery` + +type InfinibandPerfQueryCollector struct { + metricCollector + tags map[string]string + lids map[string]map[string]string + config struct { + ExcludeDevices []string `json:"exclude_devices,omitempty"` + PerfQueryPath string `json:"perfquery_path"` + } +} + +func (m *InfinibandPerfQueryCollector) Help() { + fmt.Println("This collector includes all devices that can be found below ", IB_BASEPATH) + fmt.Println("and where any of the ports provides a 'lid' file (glob ", IB_BASEPATH, "//ports//lid).") + fmt.Println("The devices can be filtered with the 'exclude_devices' option in the configuration.") + fmt.Println("For each found LIDs the collector calls the 'perfquery' command") + fmt.Println("The path to the 'perfquery' command can be configured with the 'perfquery_path' option") + fmt.Println("in the configuration") + fmt.Println("") + fmt.Println("Full configuration object:") + fmt.Println("\"ibstat\" : {") + fmt.Println(" \"perfquery_path\" : \"path/to/perfquery\" # if omitted, it searches in $PATH") + fmt.Println(" \"exclude_devices\" : [\"dev1\"]") + fmt.Println("}") + fmt.Println("") + fmt.Println("Metrics:") + fmt.Println("- ib_recv") + fmt.Println("- ib_xmit") + fmt.Println("- ib_recv_pkts") + fmt.Println("- ib_xmit_pkts") +} + +func (m *InfinibandPerfQueryCollector) Init(config json.RawMessage) error { + var err error + m.name = "InfinibandCollectorPerfQuery" + m.setup() + m.meta = map[string]string{"source": m.name, "group": "Network"} + m.tags = map[string]string{"type": "node"} + if len(config) > 0 { + err = json.Unmarshal(config, &m.config) + if err != nil { + return err + } + } + if len(m.config.PerfQueryPath) == 0 { + path, err := exec.LookPath("perfquery") + if err == nil { + m.config.PerfQueryPath = path + } + } + m.lids = make(map[string]map[string]string) + p := fmt.Sprintf("%s/*/ports/*/lid", string(IB_BASEPATH)) + files, err := filepath.Glob(p) + for _, f := range files { + lid, err := ioutil.ReadFile(f) + if err == nil { + plist := strings.Split(strings.Replace(f, string(IB_BASEPATH), "", -1), "/") + skip := false + for _, d := range m.config.ExcludeDevices { + if d == plist[0] { + skip = true + } + } + if !skip { + m.lids[plist[0]] = make(map[string]string) + m.lids[plist[0]][plist[2]] = string(lid) + } + } + } + + for _, ports := range m.lids { + for port, lid := range ports { + args := fmt.Sprintf("-r %s %s 0xf000", lid, port) + command := exec.Command(m.config.PerfQueryPath, args) + command.Wait() + _, err := command.Output() + if err != nil { + return fmt.Errorf("Failed to execute %s: %v", m.config.PerfQueryPath, err) + } + } + } + + if len(m.lids) == 0 { + return errors.New("No usable IB devices") + } + + m.init = true + return nil +} + +func (m *InfinibandPerfQueryCollector) doPerfQuery(cmd string, dev string, lid string, port string, tags map[string]string, output chan lp.CCMetric) error { + + args := fmt.Sprintf("-r %s %s 0xf000", lid, port) + command := exec.Command(cmd, args) + command.Wait() + stdout, err := command.Output() + if err != nil { + log.Print(err) + return err + } + ll := strings.Split(string(stdout), "\n") + + for _, line := range ll { + if strings.HasPrefix(line, "PortRcvData") || strings.HasPrefix(line, "RcvData") { + lv := strings.Fields(line) + v, err := strconv.ParseFloat(lv[1], 64) + if err == nil { + y, err := lp.New("ib_recv", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + output <- y + } + } + } + if strings.HasPrefix(line, "PortXmitData") || strings.HasPrefix(line, "XmtData") { + lv := strings.Fields(line) + v, err := strconv.ParseFloat(lv[1], 64) + if err == nil { + y, err := lp.New("ib_xmit", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + output <- y + } + } + } + if strings.HasPrefix(line, "PortRcvPkts") || strings.HasPrefix(line, "RcvPkts") { + lv := strings.Fields(line) + v, err := strconv.ParseFloat(lv[1], 64) + if err == nil { + y, err := lp.New("ib_recv_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + output <- y + } + } + } + if strings.HasPrefix(line, "PortXmitPkts") || strings.HasPrefix(line, "XmtPkts") { + lv := strings.Fields(line) + v, err := strconv.ParseFloat(lv[1], 64) + if err == nil { + y, err := lp.New("ib_xmit_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + output <- y + } + } + } + if strings.HasPrefix(line, "PortRcvPkts") || strings.HasPrefix(line, "RcvPkts") { + lv := strings.Fields(line) + v, err := strconv.ParseFloat(lv[1], 64) + if err == nil { + y, err := lp.New("ib_recv_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + output <- y + } + } + } + if strings.HasPrefix(line, "PortXmitPkts") || strings.HasPrefix(line, "XmtPkts") { + lv := strings.Fields(line) + v, err := strconv.ParseFloat(lv[1], 64) + if err == nil { + y, err := lp.New("ib_xmit_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + output <- y + } + } + } + } + return nil +} + +func (m *InfinibandPerfQueryCollector) Read(interval time.Duration, output chan lp.CCMetric) { + + if m.init { + for dev, ports := range m.lids { + for port, lid := range ports { + tags := map[string]string{ + "type": "node", + "device": dev, + "port": port, + "lid": lid} + path := fmt.Sprintf("%s/%s/ports/%s/counters/", string(IB_BASEPATH), dev, port) + buffer, err := ioutil.ReadFile(fmt.Sprintf("%s/port_rcv_data", path)) + if err == nil { + data := strings.Replace(string(buffer), "\n", "", -1) + v, err := strconv.ParseFloat(data, 64) + if err == nil { + y, err := lp.New("ib_recv", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + output <- y + } + } + } + buffer, err = ioutil.ReadFile(fmt.Sprintf("%s/port_xmit_data", path)) + if err == nil { + data := strings.Replace(string(buffer), "\n", "", -1) + v, err := strconv.ParseFloat(data, 64) + if err == nil { + y, err := lp.New("ib_xmit", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + output <- y + } + } + } + buffer, err = ioutil.ReadFile(fmt.Sprintf("%s/port_rcv_packets", path)) + if err == nil { + data := strings.Replace(string(buffer), "\n", "", -1) + v, err := strconv.ParseFloat(data, 64) + if err == nil { + y, err := lp.New("ib_recv_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + output <- y + } + } + } + buffer, err = ioutil.ReadFile(fmt.Sprintf("%s/port_xmit_packets", path)) + if err == nil { + data := strings.Replace(string(buffer), "\n", "", -1) + v, err := strconv.ParseFloat(data, 64) + if err == nil { + y, err := lp.New("ib_xmit_pkts", tags, m.meta, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + output <- y + } + } + } + } + } + } +} + +func (m *InfinibandPerfQueryCollector) Close() { + m.init = false +}