package collectors import ( "bufio" "bytes" "encoding/json" "fmt" "io/ioutil" "log" "os" "os/exec" "os/user" "strconv" "strings" "time" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) type GpfsCollector struct { metricCollector tags map[string]string config struct { Mmpmon string `json:"mmpmon"` } } func (m *GpfsCollector) Init(config json.RawMessage) error { var err error m.name = "GpfsCollector" m.setup() // Set default mmpmon binary m.config.Mmpmon = "/usr/lpp/mmfs/bin/mmpmon" // Read JSON configuration if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { log.Print(err.Error()) return err } } m.meta = map[string]string{ "source": m.name, "group": "GPFS", } m.tags = map[string]string{ "type": "node", "filesystem": "", } // GPFS / IBM Spectrum Scale file system statistics can only be queried by user root user, err := user.Current() if err != nil { return fmt.Errorf("GpfsCollector.Init(): Failed to get current user: %v", err) } if user.Uid != "0" { return fmt.Errorf("GpfsCollector.Init(): GPFS file system statistics can only be queried by user root") } // Check if mmpmon is in executable search path _, err = exec.LookPath(m.config.Mmpmon) if err != nil { return fmt.Errorf("GpfsCollector.Init(): Failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err) } m.init = true return nil } func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { if !m.init { return } // mmpmon: // -p: generate output that can be parsed // -s: suppress the prompt on input // fs_io_s: Displays I/O statistics per mounted file system cmd := exec.Command(m.config.Mmpmon, "-p", "-s") cmd.Stdin = strings.NewReader("once fs_io_s\n") cmdStdout := new(bytes.Buffer) cmdStderr := new(bytes.Buffer) cmd.Stdout = cmdStdout cmd.Stderr = cmdStderr err := cmd.Run() if err != nil { fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): Failed to execute command \"%s\": %s\n", cmd.String(), err.Error()) fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): command exit code: \"%d\"\n", cmd.ProcessState.ExitCode()) data, _ := ioutil.ReadAll(cmdStderr) fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): command stderr: \"%s\"\n", string(data)) data, _ = ioutil.ReadAll(cmdStdout) fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): command stdout: \"%s\"\n", string(data)) return } // Read I/O statistics scanner := bufio.NewScanner(cmdStdout) for scanner.Scan() { lineSplit := strings.Fields(scanner.Text()) if lineSplit[0] == "_fs_io_s_" { key_value := make(map[string]string) for i := 1; i < len(lineSplit); i += 2 { key_value[lineSplit[i]] = lineSplit[i+1] } // Ignore keys: // _n_: node IP address, // _nn_: node name, // _cl_: cluster name, // _d_: number of disks filesystem, ok := key_value["_fs_"] if !ok { fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): Failed to get filesystem name.\n") continue } m.tags["filesystem"] = filesystem // return code rc, err := strconv.Atoi(key_value["_rc_"]) if err != nil { fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): Failed to convert return code: %s\n", err.Error()) continue } if rc != 0 { fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): Filesystem %s not ok.", filesystem) continue } sec, err := strconv.ParseInt(key_value["_t_"], 10, 64) if err != nil { fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): Failed to convert seconds to int '%s': %v\n", key_value["_t_"], err) continue } msec, err := strconv.ParseInt(key_value["_tu_"], 10, 64) if err != nil { fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): Failed to convert micro seconds to int '%s': %v\n", key_value["_tu_"], err) continue } timestamp := time.Unix(sec, msec*1000) // bytes read bytesRead, err := strconv.ParseInt(key_value["_br_"], 10, 64) if err != nil { fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): Failed to convert bytes read '%s': %s\n", key_value["_br_"], err.Error()) continue } y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp) if err == nil { output <- y } // bytes written bytesWritten, err := strconv.ParseInt(key_value["_bw_"], 10, 64) if err != nil { fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): Failed to convert bytes written '%s': %s\n", key_value["_bw_"], err.Error()) continue } y, err = lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp) if err == nil { output <- y } // number of opens numOpens, err := strconv.ParseInt(key_value["_oc_"], 10, 64) if err != nil { fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): Failed to convert number of opens '%s': %s\n", key_value["_oc_"], err.Error()) continue } y, err = lp.New("gpfs_num_opens", m.tags, m.meta, map[string]interface{}{"value": numOpens}, timestamp) if err == nil { output <- y } // number of closes numCloses, err := strconv.ParseInt(key_value["_cc_"], 10, 64) if err != nil { fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): Failed to convert number of closes: %s\n", err.Error()) continue } y, err = lp.New("gpfs_num_closes", m.tags, m.meta, map[string]interface{}{"value": numCloses}, timestamp) if err == nil { output <- y } // number of reads numReads, err := strconv.ParseInt(key_value["_rdc_"], 10, 64) if err != nil { fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): Failed to convert number of reads: %s\n", err.Error()) continue } y, err = lp.New("gpfs_num_reads", m.tags, m.meta, map[string]interface{}{"value": numReads}, timestamp) if err == nil { output <- y } // number of writes numWrites, err := strconv.ParseInt(key_value["_wc_"], 10, 64) if err != nil { fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): Failed to convert number of writes: %s\n", err.Error()) continue } y, err = lp.New("gpfs_num_writes", m.tags, m.meta, map[string]interface{}{"value": numWrites}, timestamp) if err == nil { output <- y } // number of read directories numReaddirs, err := strconv.ParseInt(key_value["_dir_"], 10, 64) if err != nil { fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): Failed to convert number of read directories: %s\n", err.Error()) continue } y, err = lp.New("gpfs_num_readdirs", m.tags, m.meta, map[string]interface{}{"value": numReaddirs}, timestamp) if err == nil { output <- y } // Number of inode updates numInodeUpdates, err := strconv.ParseInt(key_value["_iu_"], 10, 64) if err != nil { fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): Failed to convert Number of inode updates: %s\n", err.Error()) continue } y, err = lp.New("gpfs_num_inode_updates", m.tags, m.meta, map[string]interface{}{"value": numInodeUpdates}, timestamp) if err == nil { output <- y } } } } func (m *GpfsCollector) Close() { m.init = false }