package collectors import ( "bufio" "bytes" "encoding/json" "fmt" "io/ioutil" "log" "os/exec" "os/user" "strconv" "strings" "time" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) type GpfsCollector struct { metricCollector tags map[string]string config struct { Mmpmon string `json:"mmpmon_path,omitempty"` ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"` } skipFS map[string]struct{} } func (m *GpfsCollector) Init(config json.RawMessage) error { // Check if already initialized if m.init { return nil } var err error m.name = "GpfsCollector" m.setup() // Set default mmpmon binary m.config.Mmpmon = "/usr/lpp/mmfs/bin/mmpmon" // Read JSON configuration if len(config) > 0 { err = json.Unmarshal(config, &m.config) if err != nil { log.Print(err.Error()) return err } } m.meta = map[string]string{ "source": m.name, "group": "GPFS", } m.tags = map[string]string{ "type": "node", "filesystem": "", } m.skipFS = make(map[string]struct{}) for _, fs := range m.config.ExcludeFilesystem { m.skipFS[fs] = struct{}{} } // GPFS / IBM Spectrum Scale file system statistics can only be queried by user root user, err := user.Current() if err != nil { return fmt.Errorf("Failed to get current user: %v", err) } if user.Uid != "0" { return fmt.Errorf("GPFS file system statistics can only be queried by user root") } // Check if mmpmon is in executable search path _, err = exec.LookPath(m.config.Mmpmon) if err != nil { return fmt.Errorf("Failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err) } m.init = true return nil } func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { // Check if already initialized if !m.init { return } // mmpmon: // -p: generate output that can be parsed // -s: suppress the prompt on input // fs_io_s: Displays I/O statistics per mounted file system cmd := exec.Command(m.config.Mmpmon, "-p", "-s") cmd.Stdin = strings.NewReader("once fs_io_s\n") cmdStdout := new(bytes.Buffer) cmdStderr := new(bytes.Buffer) cmd.Stdout = cmdStdout cmd.Stderr = cmdStderr err := cmd.Run() if err != nil { dataStdErr, _ := ioutil.ReadAll(cmdStderr) dataStdOut, _ := ioutil.ReadAll(cmdStdout) cclog.ComponentError( m.name, fmt.Sprintf("Read(): Failed to execute command \"%s\": %v\n", cmd.String(), err), fmt.Sprintf("Read(): command exit code: \"%d\"\n", cmd.ProcessState.ExitCode()), fmt.Sprintf("Read(): command stderr: \"%s\"\n", string(dataStdErr)), fmt.Sprintf("Read(): command stdout: \"%s\"\n", string(dataStdOut)), ) return } // Read I/O statistics scanner := bufio.NewScanner(cmdStdout) for scanner.Scan() { lineSplit := strings.Fields(scanner.Text()) // Only process lines starting with _fs_io_s_ if lineSplit[0] != "_fs_io_s_" { continue } key_value := make(map[string]string) for i := 1; i < len(lineSplit); i += 2 { key_value[lineSplit[i]] = lineSplit[i+1] } // Ignore keys: // _n_: node IP address, // _nn_: node name, // _cl_: cluster name, // _d_: number of disks filesystem, ok := key_value["_fs_"] if !ok { cclog.ComponentError( m.name, "Read(): Failed to get filesystem name.") continue } // Skip excluded filesystems if _, skip := m.skipFS[filesystem]; skip { continue } m.tags["filesystem"] = filesystem // return code rc, err := strconv.Atoi(key_value["_rc_"]) if err != nil { cclog.ComponentError( m.name, fmt.Sprintf("Read(): Failed to convert return code '%s' to int: %v", key_value["_rc_"], err)) continue } if rc != 0 { cclog.ComponentError( m.name, fmt.Sprintf("Read(): Filesystem '%s' is not ok.", filesystem)) continue } sec, err := strconv.ParseInt(key_value["_t_"], 10, 64) if err != nil { cclog.ComponentError( m.name, fmt.Sprintf("Read(): Failed to convert seconds '%s' to int64: %v", key_value["_t_"], err)) continue } msec, err := strconv.ParseInt(key_value["_tu_"], 10, 64) if err != nil { cclog.ComponentError( m.name, fmt.Sprintf("Read(): Failed to convert micro seconds '%s' to int64: %v", key_value["_tu_"], err)) continue } timestamp := time.Unix(sec, msec*1000) // bytes read bytesRead, err := strconv.ParseInt(key_value["_br_"], 10, 64) if err != nil { cclog.ComponentError( m.name, fmt.Sprintf("Read(): Failed to convert bytes read '%s' to int64: %v", key_value["_br_"], err)) continue } if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil { output <- y } // bytes written bytesWritten, err := strconv.ParseInt(key_value["_bw_"], 10, 64) if err != nil { cclog.ComponentError( m.name, fmt.Sprintf("Read(): Failed to convert bytes written '%s' to int64: %v", key_value["_bw_"], err)) continue } if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil { output <- y } // number of opens numOpens, err := strconv.ParseInt(key_value["_oc_"], 10, 64) if err != nil { cclog.ComponentError( m.name, fmt.Sprintf("Read(): Failed to convert number of opens '%s' to int64: %v", key_value["_oc_"], err)) continue } if y, err := lp.New("gpfs_num_opens", m.tags, m.meta, map[string]interface{}{"value": numOpens}, timestamp); err == nil { output <- y } // number of closes numCloses, err := strconv.ParseInt(key_value["_cc_"], 10, 64) if err != nil { cclog.ComponentError( m.name, fmt.Sprintf("Read(): Failed to convert number of closes: '%s' to int64: %v", key_value["_cc_"], err)) continue } if y, err := lp.New("gpfs_num_closes", m.tags, m.meta, map[string]interface{}{"value": numCloses}, timestamp); err == nil { output <- y } // number of reads numReads, err := strconv.ParseInt(key_value["_rdc_"], 10, 64) if err != nil { cclog.ComponentError( m.name, fmt.Sprintf("Read(): Failed to convert number of reads: '%s' to int64: %v", key_value["_rdc_"], err)) continue } if y, err := lp.New("gpfs_num_reads", m.tags, m.meta, map[string]interface{}{"value": numReads}, timestamp); err == nil { output <- y } // number of writes numWrites, err := strconv.ParseInt(key_value["_wc_"], 10, 64) if err != nil { cclog.ComponentError( m.name, fmt.Sprintf("Read(): Failed to convert number of writes: '%s' to int64: %v", key_value["_wc_"], err)) continue } if y, err := lp.New("gpfs_num_writes", m.tags, m.meta, map[string]interface{}{"value": numWrites}, timestamp); err == nil { output <- y } // number of read directories numReaddirs, err := strconv.ParseInt(key_value["_dir_"], 10, 64) if err != nil { cclog.ComponentError( m.name, fmt.Sprintf("Read(): Failed to convert number of read directories: '%s' to int64: %v", key_value["_dir_"], err)) continue } if y, err := lp.New("gpfs_num_readdirs", m.tags, m.meta, map[string]interface{}{"value": numReaddirs}, timestamp); err == nil { output <- y } // Number of inode updates numInodeUpdates, err := strconv.ParseInt(key_value["_iu_"], 10, 64) if err != nil { cclog.ComponentError( m.name, fmt.Sprintf("Read(): Failed to convert number of inode updates: '%s' to int: %v", key_value["_iu_"], err)) continue } if y, err := lp.New("gpfs_num_inode_updates", m.tags, m.meta, map[string]interface{}{"value": numInodeUpdates}, timestamp); err == nil { output <- y } } } func (m *GpfsCollector) Close() { m.init = false }