mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-10-31 09:05:05 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			266 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			266 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package collectors
 | |
| 
 | |
| import (
 | |
| 	"bufio"
 | |
| 	"bytes"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"io/ioutil"
 | |
| 	"log"
 | |
| 	"os/exec"
 | |
| 	"os/user"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
 | |
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
 | |
| )
 | |
| 
 | |
| type GpfsCollector struct {
 | |
| 	metricCollector
 | |
| 	tags   map[string]string
 | |
| 	config struct {
 | |
| 		Mmpmon string `json:"mmpmon"`
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (m *GpfsCollector) Init(config json.RawMessage) error {
 | |
| 	// Check if already initialized
 | |
| 	if m.init {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	var err error
 | |
| 	m.name = "GpfsCollector"
 | |
| 	m.setup()
 | |
| 
 | |
| 	// Set default mmpmon binary
 | |
| 	m.config.Mmpmon = "/usr/lpp/mmfs/bin/mmpmon"
 | |
| 
 | |
| 	// Read JSON configuration
 | |
| 	if len(config) > 0 {
 | |
| 		err = json.Unmarshal(config, &m.config)
 | |
| 		if err != nil {
 | |
| 			log.Print(err.Error())
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	m.meta = map[string]string{
 | |
| 		"source": m.name,
 | |
| 		"group":  "GPFS",
 | |
| 	}
 | |
| 	m.tags = map[string]string{
 | |
| 		"type":       "node",
 | |
| 		"filesystem": "",
 | |
| 	}
 | |
| 
 | |
| 	// GPFS / IBM Spectrum Scale file system statistics can only be queried by user root
 | |
| 	user, err := user.Current()
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("Failed to get current user: %v", err)
 | |
| 	}
 | |
| 	if user.Uid != "0" {
 | |
| 		return fmt.Errorf("GPFS file system statistics can only be queried by user root")
 | |
| 	}
 | |
| 
 | |
| 	// Check if mmpmon is in executable search path
 | |
| 	_, err = exec.LookPath(m.config.Mmpmon)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("Failed to find mmpmon binary '%s': %v", m.config.Mmpmon, err)
 | |
| 	}
 | |
| 
 | |
| 	m.init = true
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
 | |
| 	// Check if already initialized
 | |
| 	if !m.init {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// mmpmon:
 | |
| 	// -p: generate output that can be parsed
 | |
| 	// -s: suppress the prompt on input
 | |
| 	// fs_io_s: Displays I/O statistics per mounted file system
 | |
| 	cmd := exec.Command(m.config.Mmpmon, "-p", "-s")
 | |
| 	cmd.Stdin = strings.NewReader("once fs_io_s\n")
 | |
| 	cmdStdout := new(bytes.Buffer)
 | |
| 	cmdStderr := new(bytes.Buffer)
 | |
| 	cmd.Stdout = cmdStdout
 | |
| 	cmd.Stderr = cmdStderr
 | |
| 	err := cmd.Run()
 | |
| 	if err != nil {
 | |
| 		dataStdErr, _ := ioutil.ReadAll(cmdStderr)
 | |
| 		dataStdOut, _ := ioutil.ReadAll(cmdStdout)
 | |
| 		cclog.ComponentError(
 | |
| 			m.name,
 | |
| 			fmt.Sprintf("Read(): Failed to execute command \"%s\": %v\n", cmd.String(), err),
 | |
| 			fmt.Sprintf("Read(): command exit code: \"%d\"\n", cmd.ProcessState.ExitCode()),
 | |
| 			fmt.Sprintf("Read(): command stderr: \"%s\"\n", string(dataStdErr)),
 | |
| 			fmt.Sprintf("Read(): command stdout: \"%s\"\n", string(dataStdOut)),
 | |
| 		)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// Read I/O statistics
 | |
| 	scanner := bufio.NewScanner(cmdStdout)
 | |
| 	for scanner.Scan() {
 | |
| 		lineSplit := strings.Fields(scanner.Text())
 | |
| 		if lineSplit[0] == "_fs_io_s_" {
 | |
| 			key_value := make(map[string]string)
 | |
| 			for i := 1; i < len(lineSplit); i += 2 {
 | |
| 				key_value[lineSplit[i]] = lineSplit[i+1]
 | |
| 			}
 | |
| 
 | |
| 			// Ignore keys:
 | |
| 			// _n_:  node IP address,
 | |
| 			// _nn_: node name,
 | |
| 			// _cl_: cluster name,
 | |
| 			// _d_:  number of disks
 | |
| 
 | |
| 			filesystem, ok := key_value["_fs_"]
 | |
| 			if !ok {
 | |
| 				cclog.ComponentError(
 | |
| 					m.name,
 | |
| 					"Read(): Failed to get filesystem name.")
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			m.tags["filesystem"] = filesystem
 | |
| 
 | |
| 			// return code
 | |
| 			rc, err := strconv.Atoi(key_value["_rc_"])
 | |
| 			if err != nil {
 | |
| 				cclog.ComponentError(
 | |
| 					m.name,
 | |
| 					fmt.Sprintf("Read(): Failed to convert return code '%s' to int: %v", key_value["_rc_"], err))
 | |
| 				continue
 | |
| 			}
 | |
| 			if rc != 0 {
 | |
| 				cclog.ComponentError(
 | |
| 					m.name,
 | |
| 					fmt.Sprintf("Read(): Filesystem '%s' is not ok.", filesystem))
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			sec, err := strconv.ParseInt(key_value["_t_"], 10, 64)
 | |
| 			if err != nil {
 | |
| 				cclog.ComponentError(
 | |
| 					m.name,
 | |
| 					fmt.Sprintf("Read(): Failed to convert seconds '%s' to int64: %v", key_value["_t_"], err))
 | |
| 				continue
 | |
| 			}
 | |
| 			msec, err := strconv.ParseInt(key_value["_tu_"], 10, 64)
 | |
| 			if err != nil {
 | |
| 				cclog.ComponentError(
 | |
| 					m.name,
 | |
| 					fmt.Sprintf("Read(): Failed to convert micro seconds '%s' to int64: %v", key_value["_tu_"], err))
 | |
| 				continue
 | |
| 			}
 | |
| 			timestamp := time.Unix(sec, msec*1000)
 | |
| 
 | |
| 			// bytes read
 | |
| 			bytesRead, err := strconv.ParseInt(key_value["_br_"], 10, 64)
 | |
| 			if err != nil {
 | |
| 				cclog.ComponentError(
 | |
| 					m.name,
 | |
| 					fmt.Sprintf("Read(): Failed to convert bytes read '%s' to int64: %v", key_value["_br_"], err))
 | |
| 				continue
 | |
| 			}
 | |
| 			if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil {
 | |
| 				output <- y
 | |
| 			}
 | |
| 
 | |
| 			// bytes written
 | |
| 			bytesWritten, err := strconv.ParseInt(key_value["_bw_"], 10, 64)
 | |
| 			if err != nil {
 | |
| 				cclog.ComponentError(
 | |
| 					m.name,
 | |
| 					fmt.Sprintf("Read(): Failed to convert bytes written '%s' to int64: %v", key_value["_bw_"], err))
 | |
| 				continue
 | |
| 			}
 | |
| 			if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil {
 | |
| 				output <- y
 | |
| 			}
 | |
| 
 | |
| 			// number of opens
 | |
| 			numOpens, err := strconv.ParseInt(key_value["_oc_"], 10, 64)
 | |
| 			if err != nil {
 | |
| 				cclog.ComponentError(
 | |
| 					m.name,
 | |
| 					fmt.Sprintf("Read(): Failed to convert number of opens '%s' to int64: %v", key_value["_oc_"], err))
 | |
| 				continue
 | |
| 			}
 | |
| 			if y, err := lp.New("gpfs_num_opens", m.tags, m.meta, map[string]interface{}{"value": numOpens}, timestamp); err == nil {
 | |
| 				output <- y
 | |
| 			}
 | |
| 
 | |
| 			// number of closes
 | |
| 			numCloses, err := strconv.ParseInt(key_value["_cc_"], 10, 64)
 | |
| 			if err != nil {
 | |
| 				cclog.ComponentError(
 | |
| 					m.name,
 | |
| 					fmt.Sprintf("Read(): Failed to convert number of closes: '%s' to int64: %v", key_value["_cc_"], err))
 | |
| 				continue
 | |
| 			}
 | |
| 			if y, err := lp.New("gpfs_num_closes", m.tags, m.meta, map[string]interface{}{"value": numCloses}, timestamp); err == nil {
 | |
| 				output <- y
 | |
| 			}
 | |
| 
 | |
| 			// number of reads
 | |
| 			numReads, err := strconv.ParseInt(key_value["_rdc_"], 10, 64)
 | |
| 			if err != nil {
 | |
| 				cclog.ComponentError(
 | |
| 					m.name,
 | |
| 					fmt.Sprintf("Read(): Failed to convert number of reads: '%s' to int64: %v", key_value["_rdc_"], err))
 | |
| 				continue
 | |
| 			}
 | |
| 			if y, err := lp.New("gpfs_num_reads", m.tags, m.meta, map[string]interface{}{"value": numReads}, timestamp); err == nil {
 | |
| 				output <- y
 | |
| 			}
 | |
| 
 | |
| 			// number of writes
 | |
| 			numWrites, err := strconv.ParseInt(key_value["_wc_"], 10, 64)
 | |
| 			if err != nil {
 | |
| 				cclog.ComponentError(
 | |
| 					m.name,
 | |
| 					fmt.Sprintf("Read(): Failed to convert number of writes: '%s' to int64: %v", key_value["_wc_"], err))
 | |
| 				continue
 | |
| 			}
 | |
| 			if y, err := lp.New("gpfs_num_writes", m.tags, m.meta, map[string]interface{}{"value": numWrites}, timestamp); err == nil {
 | |
| 				output <- y
 | |
| 			}
 | |
| 
 | |
| 			// number of read directories
 | |
| 			numReaddirs, err := strconv.ParseInt(key_value["_dir_"], 10, 64)
 | |
| 			if err != nil {
 | |
| 				cclog.ComponentError(
 | |
| 					m.name,
 | |
| 					fmt.Sprintf("Read(): Failed to convert number of read directories: '%s' to int64: %v", key_value["_dir_"], err))
 | |
| 				continue
 | |
| 			}
 | |
| 			if y, err := lp.New("gpfs_num_readdirs", m.tags, m.meta, map[string]interface{}{"value": numReaddirs}, timestamp); err == nil {
 | |
| 				output <- y
 | |
| 			}
 | |
| 
 | |
| 			// Number of inode updates
 | |
| 			numInodeUpdates, err := strconv.ParseInt(key_value["_iu_"], 10, 64)
 | |
| 			if err != nil {
 | |
| 				cclog.ComponentError(
 | |
| 					m.name,
 | |
| 					fmt.Sprintf("Read(): Failed to convert number of inode updates: '%s' to int: %v", key_value["_iu_"], err))
 | |
| 				continue
 | |
| 			}
 | |
| 			if y, err := lp.New("gpfs_num_inode_updates", m.tags, m.meta, map[string]interface{}{"value": numInodeUpdates}, timestamp); err == nil {
 | |
| 				output <- y
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (m *GpfsCollector) Close() {
 | |
| 	m.init = false
 | |
| }
 |