cc-metric-collector/collectors/cpufreqCpuinfoMetric.go
Thomas Gruber 8d85bd53f1
Merge latest development changes to main branch (#79)
* Cleanup: Remove unused code

* Use Golang duration parser for 'interval' and 'duration'
 in main config

* Update handling of LIKWID headers. Download only if not already present in the system. Fixes #73

* Units with cc-units (#64)

* Add option to normalize units with cc-unit

* Add unit conversion to router

* Add option to change unit prefix in the router

* Add to MetricRouter README

* Add order of operations in router to README

* Use second add_tags/del_tags only if metric gets renamed

* Skip disks in DiskstatCollector that have size=0

* Check readability of sensor files in TempCollector

* Fix for --once option

* Rename `cpu` type to `hwthread` (#69)

* Rename 'cpu' type to 'hwthread' to avoid naming clashes with MetricStore and CC-Webfrontend

* Collectors in parallel (#74)

* Provide info to CollectorManager whether the collector can be executed in parallel with others

* Split serial and parallel collectors. Read in parallel first

* Update NvidiaCollector with new metrics, MIG and NvLink support (#75)

* CC topology module update (#76)

* Rename CPU to hardware thread, write some comments

* Do renaming in other parts

* Remove CpuList and SocketList function from metricCollector. Available in ccTopology

* Option to use MIG UUID as subtype-id in NvidiaCollector

* Option to use MIG slice name as subtype-id in NvidiaCollector

* MetricRouter: Fix JSON in README

* Fix for Github Action to really use the selected version

* Remove Ganglia installation in runonce Action and add Go 1.18

* Fix daemon options in init script

* Add separate go.mod files to use it with deprecated 1.16

* Minor updates for Makefiles

* fix string comparison

* AMD ROCm SMI collector (#77)

* Add collector for AMD ROCm SMI metrics

* Fix import path

* Fix imports

* Remove Board Number

* store GPU index explicitly

* Remove board number from description

* Use http instead of ftp to download likwid

* Fix serial number in rocmCollector

* Improved http sink (#78)

* automatic flush in NatsSink

* tweak default options of HttpSink

* shorter cirt. section and retries for HttpSink

* fix error handling

* Remove file added by mistake.

* Use http instead of ftp to download likwid

* Fix serial number in rocmCollector

Co-authored-by: Thomas Roehl <thomas.roehl@fau.de>

Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>
Co-authored-by: Lou <lou.knauer@gmx.de>
2022-06-08 15:25:40 +02:00

212 lines
5.3 KiB
Go

package collectors
import (
"bufio"
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
//
// CPUFreqCollector
// a metric collector to measure the current frequency of the CPUs
// as obtained from /proc/cpuinfo
// Only measure on the first hyperthread
//
type CPUFreqCpuInfoCollectorTopology struct {
processor string // logical processor number (continuous, starting at 0)
coreID string // socket local core ID
coreID_int int64
physicalPackageID string // socket / package ID
physicalPackageID_int int64
numPhysicalPackages string // number of sockets / packages
numPhysicalPackages_int int64
isHT bool
numNonHT string // number of non hyperthreading processors
numNonHT_int int64
tagSet map[string]string
}
type CPUFreqCpuInfoCollector struct {
metricCollector
topology []*CPUFreqCpuInfoCollectorTopology
}
func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
// Check if already initialized
if m.init {
return nil
}
m.setup()
m.name = "CPUFreqCpuInfoCollector"
m.parallel = true
m.meta = map[string]string{
"source": m.name,
"group": "CPU",
"unit": "MHz",
}
const cpuInfoFile = "/proc/cpuinfo"
file, err := os.Open(cpuInfoFile)
if err != nil {
return fmt.Errorf("failed to open file '%s': %v", cpuInfoFile, err)
}
defer file.Close()
// Collect topology information from file cpuinfo
foundFreq := false
processor := ""
var numNonHT_int int64 = 0
coreID := ""
physicalPackageID := ""
var maxPhysicalPackageID int64 = 0
m.topology = make([]*CPUFreqCpuInfoCollectorTopology, 0)
coreSeenBefore := make(map[string]bool)
// Read cpuinfo file, line by line
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lineSplit := strings.Split(scanner.Text(), ":")
if len(lineSplit) == 2 {
key := strings.TrimSpace(lineSplit[0])
value := strings.TrimSpace(lineSplit[1])
switch key {
case "cpu MHz":
// frequency
foundFreq = true
case "processor":
processor = value
case "core id":
coreID = value
case "physical id":
physicalPackageID = value
}
}
// were all topology information collected?
if foundFreq &&
len(processor) > 0 &&
len(coreID) > 0 &&
len(physicalPackageID) > 0 {
topology := new(CPUFreqCpuInfoCollectorTopology)
// Processor
topology.processor = processor
// Core ID
topology.coreID = coreID
topology.coreID_int, err = strconv.ParseInt(coreID, 10, 64)
if err != nil {
return fmt.Errorf("unable to convert coreID '%s' to int64: %v", coreID, err)
}
// Physical package ID
topology.physicalPackageID = physicalPackageID
topology.physicalPackageID_int, err = strconv.ParseInt(physicalPackageID, 10, 64)
if err != nil {
return fmt.Errorf("unable to convert physicalPackageID '%s' to int64: %v", physicalPackageID, err)
}
// increase maximun socket / package ID, when required
if topology.physicalPackageID_int > maxPhysicalPackageID {
maxPhysicalPackageID = topology.physicalPackageID_int
}
// is hyperthread?
globalID := physicalPackageID + ":" + coreID
topology.isHT = coreSeenBefore[globalID]
coreSeenBefore[globalID] = true
if !topology.isHT {
// increase number on non hyper thread cores
numNonHT_int++
}
// store collected topology information
m.topology = append(m.topology, topology)
// reset topology information
foundFreq = false
processor = ""
coreID = ""
physicalPackageID = ""
}
}
numPhysicalPackageID_int := maxPhysicalPackageID + 1
numPhysicalPackageID := fmt.Sprint(numPhysicalPackageID_int)
numNonHT := fmt.Sprint(numNonHT_int)
for _, t := range m.topology {
t.numPhysicalPackages = numPhysicalPackageID
t.numPhysicalPackages_int = numPhysicalPackageID_int
t.numNonHT = numNonHT
t.numNonHT_int = numNonHT_int
t.tagSet = map[string]string{
"type": "hwthread",
"type-id": t.processor,
"package_id": t.physicalPackageID,
}
}
m.init = true
return nil
}
func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CCMetric) {
// Check if already initialized
if !m.init {
return
}
const cpuInfoFile = "/proc/cpuinfo"
file, err := os.Open(cpuInfoFile)
if err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to open file '%s': %v", cpuInfoFile, err))
return
}
defer file.Close()
processorCounter := 0
now := time.Now()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lineSplit := strings.Split(scanner.Text(), ":")
if len(lineSplit) == 2 {
key := strings.TrimSpace(lineSplit[0])
// frequency
if key == "cpu MHz" {
t := m.topology[processorCounter]
if !t.isHT {
value, err := strconv.ParseFloat(strings.TrimSpace(lineSplit[1]), 64)
if err != nil {
cclog.ComponentError(
m.name,
fmt.Sprintf("Read(): Failed to convert cpu MHz '%s' to float64: %v", lineSplit[1], err))
return
}
if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": value}, now); err == nil {
output <- y
}
}
processorCounter++
}
}
}
}
func (m *CPUFreqCpuInfoCollector) Close() {
m.init = false
}