mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-11-04 02:35:07 +01:00 
			
		
		
		
	Beegfs collector (#50)
* added beegfs collectors to collectors/README.md * added beegfs collectors and docs * added new beegfs collectors to AvailableCollectors list * Feedback implemented * changed error type * changed error to only return * changed beegfs lookup path * fixed typo in md files Co-authored-by: Mehmet Soysal <mehmet.soysal@kit.edu>
This commit is contained in:
		
							
								
								
									
										229
									
								
								collectors/beegfsmetaMetric.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										229
									
								
								collectors/beegfsmetaMetric.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,229 @@
 | 
			
		||||
package collectors
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"os"
 | 
			
		||||
	"os/exec"
 | 
			
		||||
	"os/user"
 | 
			
		||||
	"regexp"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
 | 
			
		||||
	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const DEFAULT_BEEGFS_CMD = "beegfs-ctl"
 | 
			
		||||
 | 
			
		||||
// Struct for the collector-specific JSON config
 | 
			
		||||
type BeegfsMetaCollectorConfig struct {
 | 
			
		||||
	Beegfs            string   `json:"beegfs_path"`
 | 
			
		||||
	ExcludeMetrics    []string `json:"exclude_metrics,omitempty"`
 | 
			
		||||
	ExcludeFilesystem []string `json:"exclude_filesystem"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type BeegfsMetaCollector struct {
 | 
			
		||||
	metricCollector
 | 
			
		||||
	tags    map[string]string
 | 
			
		||||
	matches map[string]string
 | 
			
		||||
	config  BeegfsMetaCollectorConfig
 | 
			
		||||
	skipFS  map[string]struct{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
 | 
			
		||||
	// Check if already initialized
 | 
			
		||||
	if m.init {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	// Metrics
 | 
			
		||||
	var nodeMdstat_array = [39]string{
 | 
			
		||||
		"sum", "ack", "close", "entInf",
 | 
			
		||||
		"fndOwn", "mkdir", "create", "rddir",
 | 
			
		||||
		"refrEn", "mdsInf", "rmdir", "rmLnk",
 | 
			
		||||
		"mvDirIns", "mvFiIns", "open", "ren",
 | 
			
		||||
		"sChDrct", "sAttr", "sDirPat", "stat",
 | 
			
		||||
		"statfs", "trunc", "symlnk", "unlnk",
 | 
			
		||||
		"lookLI", "statLI", "revalLI", "openLI",
 | 
			
		||||
		"createLI", "hardlnk", "flckAp", "flckEn",
 | 
			
		||||
		"flckRg", "dirparent", "listXA", "getXA",
 | 
			
		||||
		"rmXA", "setXA", "mirror"}
 | 
			
		||||
 | 
			
		||||
	m.name = "BeegfsMetaCollector"
 | 
			
		||||
	m.setup()
 | 
			
		||||
	// Set default beegfs-ctl binary
 | 
			
		||||
 | 
			
		||||
	m.config.Beegfs = DEFAULT_BEEGFS_CMD
 | 
			
		||||
 | 
			
		||||
	// Read JSON configuration
 | 
			
		||||
	if len(config) > 0 {
 | 
			
		||||
		err := json.Unmarshal(config, &m.config)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	//create map with possible variables
 | 
			
		||||
	m.matches = make(map[string]string)
 | 
			
		||||
	for _, value := range nodeMdstat_array {
 | 
			
		||||
		_, skip := stringArrayContains(m.config.ExcludeMetrics, value)
 | 
			
		||||
		if skip {
 | 
			
		||||
			m.matches["other"] = "0"
 | 
			
		||||
		} else {
 | 
			
		||||
			m.matches["beegfs_cmeta_"+value] = "0"
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	m.meta = map[string]string{
 | 
			
		||||
		"source": m.name,
 | 
			
		||||
		"group":  "BeegfsMeta",
 | 
			
		||||
	}
 | 
			
		||||
	m.tags = map[string]string{
 | 
			
		||||
		"type":       "node",
 | 
			
		||||
		"filesystem": "",
 | 
			
		||||
	}
 | 
			
		||||
	m.skipFS = make(map[string]struct{})
 | 
			
		||||
	for _, fs := range m.config.ExcludeFilesystem {
 | 
			
		||||
		m.skipFS[fs] = struct{}{}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Beegfs file system statistics can only be queried by user root
 | 
			
		||||
	user, err := user.Current()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to get current user: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if user.Uid != "0" {
 | 
			
		||||
		return fmt.Errorf("BeegfsMetaCollector.Init(): BeeGFS file system statistics can only be queried by user root")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Check if beegfs-ctl is in executable search path
 | 
			
		||||
	_, err = exec.LookPath(m.config.Beegfs)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err)
 | 
			
		||||
	}
 | 
			
		||||
	m.init = true
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMetric) {
 | 
			
		||||
	if !m.init {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	//get mounpoint
 | 
			
		||||
	buffer, _ := ioutil.ReadFile(string("/proc/mounts"))
 | 
			
		||||
	mounts := strings.Split(string(buffer), "\n")
 | 
			
		||||
	var mountpoints []string
 | 
			
		||||
	for _, line := range mounts {
 | 
			
		||||
		if len(line) == 0 {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		f := strings.Fields(line)
 | 
			
		||||
		if strings.Contains(f[0], "beegfs_ondemand") {
 | 
			
		||||
			// Skip excluded filesystems
 | 
			
		||||
			if _, skip := m.skipFS[f[1]]; skip {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			mountpoints = append(mountpoints, f[1])
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(mountpoints) == 0 {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, mountpoint := range mountpoints {
 | 
			
		||||
		m.tags["filesystem"] = mountpoint
 | 
			
		||||
 | 
			
		||||
		// bwwgfs-ctl:
 | 
			
		||||
		// --clientstats: Show client IO statistics.
 | 
			
		||||
		// --nodetype=meta: The node type to query (meta, storage).
 | 
			
		||||
		// --interval:
 | 
			
		||||
		// --mount=/mnt/beeond/: Which mount point
 | 
			
		||||
		//cmd := exec.Command(m.config.Beegfs, "/root/mc/test.txt")
 | 
			
		||||
		mountoption := "--mount=" + mountpoint
 | 
			
		||||
		cmd := exec.Command(m.config.Beegfs, "--clientstats",
 | 
			
		||||
			"--nodetype=meta", mountoption, "--allstats")
 | 
			
		||||
		cmd.Stdin = strings.NewReader("\n")
 | 
			
		||||
		cmdStdout := new(bytes.Buffer)
 | 
			
		||||
		cmdStderr := new(bytes.Buffer)
 | 
			
		||||
		cmd.Stdout = cmdStdout
 | 
			
		||||
		cmd.Stderr = cmdStderr
 | 
			
		||||
		err := cmd.Run()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): Failed to execute command \"%s\": %s\n", cmd.String(), err.Error())
 | 
			
		||||
			fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command exit code: \"%d\"\n", cmd.ProcessState.ExitCode())
 | 
			
		||||
			data, _ := ioutil.ReadAll(cmdStderr)
 | 
			
		||||
			fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command stderr: \"%s\"\n", string(data))
 | 
			
		||||
			data, _ = ioutil.ReadAll(cmdStdout)
 | 
			
		||||
			fmt.Fprintf(os.Stderr, "BeegfsMetaCollector.Read(): command stdout: \"%s\"\n", string(data))
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		// Read I/O statistics
 | 
			
		||||
		scanner := bufio.NewScanner(cmdStdout)
 | 
			
		||||
 | 
			
		||||
		sumLine := regexp.MustCompile(`^Sum:\s+\d+\s+\[[a-zA-Z]+\]+`)
 | 
			
		||||
		//Line := regexp.MustCompile(`^(.*)\s+(\d)+\s+\[([a-zA-Z]+)\]+`)
 | 
			
		||||
		statsLine := regexp.MustCompile(`^(.*?)\s+?(\d.*?)$`)
 | 
			
		||||
		singleSpacePattern := regexp.MustCompile(`\s+`)
 | 
			
		||||
		removePattern := regexp.MustCompile(`[\[|\]]`)
 | 
			
		||||
 | 
			
		||||
		for scanner.Scan() {
 | 
			
		||||
			readLine := scanner.Text()
 | 
			
		||||
			//fmt.Println(readLine)
 | 
			
		||||
			// Jump few lines, we only want the I/O stats from nodes
 | 
			
		||||
			if !sumLine.MatchString(readLine) {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			match := statsLine.FindStringSubmatch(readLine)
 | 
			
		||||
			// nodeName = "Sum:" or would be nodes
 | 
			
		||||
			// nodeName := match[1]
 | 
			
		||||
			//Remove multiple whitespaces
 | 
			
		||||
			dummy := removePattern.ReplaceAllString(match[2], " ")
 | 
			
		||||
			metaStats := strings.TrimSpace(singleSpacePattern.ReplaceAllString(dummy, " "))
 | 
			
		||||
			split := strings.Split(metaStats, " ")
 | 
			
		||||
 | 
			
		||||
			// fill map with values
 | 
			
		||||
			// split[i+1] = mdname
 | 
			
		||||
			// split[i] = amount of md operations
 | 
			
		||||
			for i := 0; i <= len(split)-1; i += 2 {
 | 
			
		||||
				if _, ok := m.matches[split[i+1]]; ok {
 | 
			
		||||
					m.matches["beegfs_cmeta_"+split[i+1]] = split[i]
 | 
			
		||||
				} else {
 | 
			
		||||
					f1, err := strconv.ParseFloat(m.matches["other"], 32)
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						cclog.ComponentError(
 | 
			
		||||
							m.name,
 | 
			
		||||
							fmt.Sprintf("Metric (other): Failed to convert str written '%s' to float: %v", m.matches["other"], err))
 | 
			
		||||
						continue
 | 
			
		||||
					}
 | 
			
		||||
					f2, err := strconv.ParseFloat(split[i], 32)
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						cclog.ComponentError(
 | 
			
		||||
							m.name,
 | 
			
		||||
							fmt.Sprintf("Metric (other): Failed to convert str written '%s' to float: %v", m.matches["other"], err))
 | 
			
		||||
						continue
 | 
			
		||||
					}
 | 
			
		||||
					//mdStat["other"] = fmt.Sprintf("%f", f1+f2)
 | 
			
		||||
					m.matches["beegfs_cstorage_other"] = fmt.Sprintf("%f", f1+f2)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			for key, data := range m.matches {
 | 
			
		||||
				value, _ := strconv.ParseFloat(data, 32)
 | 
			
		||||
				y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
 | 
			
		||||
				if err == nil {
 | 
			
		||||
					output <- y
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *BeegfsMetaCollector) Close() {
 | 
			
		||||
	m.init = false
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user