Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop

This commit is contained in:
Thomas Roehl 2022-02-07 13:47:06 +01:00
commit b7ee125942
2 changed files with 159 additions and 138 deletions

View File

@ -1,4 +1,12 @@
{ {
"cpufreq": {},
"cpufreq_cpuinfo": {},
"gpfs": {
"exclude_filesystem": [ "test_fs" ]
},
"loadavg": {
"exclude_metrics": [ "proc_total" ]
}
"tempstat": { "tempstat": {
"tag_override": { "tag_override": {
"hwmon0" : { "hwmon0" : {
@ -10,6 +18,4 @@
"type-id" : "1" "type-id" : "1"
} }
} }
}
} }

View File

@ -21,8 +21,10 @@ type GpfsCollector struct {
metricCollector metricCollector
tags map[string]string tags map[string]string
config struct { config struct {
Mmpmon string `json:"mmpmon"` Mmpmon string `json:"mmpmon_path,omitempty"`
ExcludeFilesystem []string `json:"exclude_filesystem,omitempty"`
} }
skipFS map[string]struct{}
} }
func (m *GpfsCollector) Init(config json.RawMessage) error { func (m *GpfsCollector) Init(config json.RawMessage) error {
@ -54,6 +56,10 @@ func (m *GpfsCollector) Init(config json.RawMessage) error {
"type": "node", "type": "node",
"filesystem": "", "filesystem": "",
} }
m.skipFS = make(map[string]struct{})
for _, fs := range m.config.ExcludeFilesystem {
m.skipFS[fs] = struct{}{}
}
// GPFS / IBM Spectrum Scale file system statistics can only be queried by user root // GPFS / IBM Spectrum Scale file system statistics can only be queried by user root
user, err := user.Current() user, err := user.Current()
@ -108,154 +114,163 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
scanner := bufio.NewScanner(cmdStdout) scanner := bufio.NewScanner(cmdStdout)
for scanner.Scan() { for scanner.Scan() {
lineSplit := strings.Fields(scanner.Text()) lineSplit := strings.Fields(scanner.Text())
if lineSplit[0] == "_fs_io_s_" {
key_value := make(map[string]string)
for i := 1; i < len(lineSplit); i += 2 {
key_value[lineSplit[i]] = lineSplit[i+1]
}
// Ignore keys: // Only process lines starting with _fs_io_s_
// _n_: node IP address, if lineSplit[0] != "_fs_io_s_" {
// _nn_: node name, continue
// _cl_: cluster name, }
// _d_: number of disks
filesystem, ok := key_value["_fs_"] key_value := make(map[string]string)
if !ok { for i := 1; i < len(lineSplit); i += 2 {
cclog.ComponentError( key_value[lineSplit[i]] = lineSplit[i+1]
m.name, }
"Read(): Failed to get filesystem name.")
continue
}
m.tags["filesystem"] = filesystem // Ignore keys:
// _n_: node IP address,
// _nn_: node name,
// _cl_: cluster name,
// _d_: number of disks
// return code filesystem, ok := key_value["_fs_"]
rc, err := strconv.Atoi(key_value["_rc_"]) if !ok {
if err != nil { cclog.ComponentError(
cclog.ComponentError( m.name,
m.name, "Read(): Failed to get filesystem name.")
fmt.Sprintf("Read(): Failed to convert return code '%s' to int: %v", key_value["_rc_"], err)) continue
continue }
}
if rc != 0 {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Filesystem '%s' is not ok.", filesystem))
continue
}
sec, err := strconv.ParseInt(key_value["_t_"], 10, 64) // Skip excluded filesystems
if err != nil { if _, skip := m.skipFS[filesystem]; skip {
cclog.ComponentError( continue
m.name, }
fmt.Sprintf("Read(): Failed to convert seconds '%s' to int64: %v", key_value["_t_"], err))
continue
}
msec, err := strconv.ParseInt(key_value["_tu_"], 10, 64)
if err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to convert micro seconds '%s' to int64: %v", key_value["_tu_"], err))
continue
}
timestamp := time.Unix(sec, msec*1000)
// bytes read m.tags["filesystem"] = filesystem
bytesRead, err := strconv.ParseInt(key_value["_br_"], 10, 64)
if err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to convert bytes read '%s' to int64: %v", key_value["_br_"], err))
continue
}
if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil {
output <- y
}
// bytes written // return code
bytesWritten, err := strconv.ParseInt(key_value["_bw_"], 10, 64) rc, err := strconv.Atoi(key_value["_rc_"])
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(
m.name, m.name,
fmt.Sprintf("Read(): Failed to convert bytes written '%s' to int64: %v", key_value["_bw_"], err)) fmt.Sprintf("Read(): Failed to convert return code '%s' to int: %v", key_value["_rc_"], err))
continue continue
} }
if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil { if rc != 0 {
output <- y cclog.ComponentError(
} m.name,
fmt.Sprintf("Read(): Filesystem '%s' is not ok.", filesystem))
continue
}
// number of opens sec, err := strconv.ParseInt(key_value["_t_"], 10, 64)
numOpens, err := strconv.ParseInt(key_value["_oc_"], 10, 64) if err != nil {
if err != nil { cclog.ComponentError(
cclog.ComponentError( m.name,
m.name, fmt.Sprintf("Read(): Failed to convert seconds '%s' to int64: %v", key_value["_t_"], err))
fmt.Sprintf("Read(): Failed to convert number of opens '%s' to int64: %v", key_value["_oc_"], err)) continue
continue }
} msec, err := strconv.ParseInt(key_value["_tu_"], 10, 64)
if y, err := lp.New("gpfs_num_opens", m.tags, m.meta, map[string]interface{}{"value": numOpens}, timestamp); err == nil { if err != nil {
output <- y cclog.ComponentError(
} m.name,
fmt.Sprintf("Read(): Failed to convert micro seconds '%s' to int64: %v", key_value["_tu_"], err))
continue
}
timestamp := time.Unix(sec, msec*1000)
// number of closes // bytes read
numCloses, err := strconv.ParseInt(key_value["_cc_"], 10, 64) bytesRead, err := strconv.ParseInt(key_value["_br_"], 10, 64)
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(
m.name, m.name,
fmt.Sprintf("Read(): Failed to convert number of closes: '%s' to int64: %v", key_value["_cc_"], err)) fmt.Sprintf("Read(): Failed to convert bytes read '%s' to int64: %v", key_value["_br_"], err))
continue continue
} }
if y, err := lp.New("gpfs_num_closes", m.tags, m.meta, map[string]interface{}{"value": numCloses}, timestamp); err == nil { if y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp); err == nil {
output <- y output <- y
} }
// number of reads // bytes written
numReads, err := strconv.ParseInt(key_value["_rdc_"], 10, 64) bytesWritten, err := strconv.ParseInt(key_value["_bw_"], 10, 64)
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(
m.name, m.name,
fmt.Sprintf("Read(): Failed to convert number of reads: '%s' to int64: %v", key_value["_rdc_"], err)) fmt.Sprintf("Read(): Failed to convert bytes written '%s' to int64: %v", key_value["_bw_"], err))
continue continue
} }
if y, err := lp.New("gpfs_num_reads", m.tags, m.meta, map[string]interface{}{"value": numReads}, timestamp); err == nil { if y, err := lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp); err == nil {
output <- y output <- y
} }
// number of writes // number of opens
numWrites, err := strconv.ParseInt(key_value["_wc_"], 10, 64) numOpens, err := strconv.ParseInt(key_value["_oc_"], 10, 64)
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(
m.name, m.name,
fmt.Sprintf("Read(): Failed to convert number of writes: '%s' to int64: %v", key_value["_wc_"], err)) fmt.Sprintf("Read(): Failed to convert number of opens '%s' to int64: %v", key_value["_oc_"], err))
continue continue
} }
if y, err := lp.New("gpfs_num_writes", m.tags, m.meta, map[string]interface{}{"value": numWrites}, timestamp); err == nil { if y, err := lp.New("gpfs_num_opens", m.tags, m.meta, map[string]interface{}{"value": numOpens}, timestamp); err == nil {
output <- y output <- y
} }
// number of read directories // number of closes
numReaddirs, err := strconv.ParseInt(key_value["_dir_"], 10, 64) numCloses, err := strconv.ParseInt(key_value["_cc_"], 10, 64)
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(
m.name, m.name,
fmt.Sprintf("Read(): Failed to convert number of read directories: '%s' to int64: %v", key_value["_dir_"], err)) fmt.Sprintf("Read(): Failed to convert number of closes: '%s' to int64: %v", key_value["_cc_"], err))
continue continue
} }
if y, err := lp.New("gpfs_num_readdirs", m.tags, m.meta, map[string]interface{}{"value": numReaddirs}, timestamp); err == nil { if y, err := lp.New("gpfs_num_closes", m.tags, m.meta, map[string]interface{}{"value": numCloses}, timestamp); err == nil {
output <- y output <- y
} }
// Number of inode updates // number of reads
numInodeUpdates, err := strconv.ParseInt(key_value["_iu_"], 10, 64) numReads, err := strconv.ParseInt(key_value["_rdc_"], 10, 64)
if err != nil { if err != nil {
cclog.ComponentError( cclog.ComponentError(
m.name, m.name,
fmt.Sprintf("Read(): Failed to convert number of inode updates: '%s' to int: %v", key_value["_iu_"], err)) fmt.Sprintf("Read(): Failed to convert number of reads: '%s' to int64: %v", key_value["_rdc_"], err))
continue continue
} }
if y, err := lp.New("gpfs_num_inode_updates", m.tags, m.meta, map[string]interface{}{"value": numInodeUpdates}, timestamp); err == nil { if y, err := lp.New("gpfs_num_reads", m.tags, m.meta, map[string]interface{}{"value": numReads}, timestamp); err == nil {
output <- y output <- y
} }
// number of writes
numWrites, err := strconv.ParseInt(key_value["_wc_"], 10, 64)
if err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to convert number of writes: '%s' to int64: %v", key_value["_wc_"], err))
continue
}
if y, err := lp.New("gpfs_num_writes", m.tags, m.meta, map[string]interface{}{"value": numWrites}, timestamp); err == nil {
output <- y
}
// number of read directories
numReaddirs, err := strconv.ParseInt(key_value["_dir_"], 10, 64)
if err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to convert number of read directories: '%s' to int64: %v", key_value["_dir_"], err))
continue
}
if y, err := lp.New("gpfs_num_readdirs", m.tags, m.meta, map[string]interface{}{"value": numReaddirs}, timestamp); err == nil {
output <- y
}
// Number of inode updates
numInodeUpdates, err := strconv.ParseInt(key_value["_iu_"], 10, 64)
if err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to convert number of inode updates: '%s' to int: %v", key_value["_iu_"], err))
continue
}
if y, err := lp.New("gpfs_num_inode_updates", m.tags, m.meta, map[string]interface{}{"value": numInodeUpdates}, timestamp); err == nil {
output <- y
} }
} }
} }