From bed5491068d6e959bca0d904bcaf63b1f7d30808 Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Mon, 8 Jun 2026 14:00:09 +0200 Subject: [PATCH] Fix Overflows in Infiniband collector (#219) * Add information about the used infiniband counters * Change datatype from int64 to uint64 * uint64 subtraction handles wraparound automatically * Compute total rates by summing up the xmit and recv rates. This avoids overflows in the raw counters * Check for cases where the current counter can not be saved as last state * Use golang variable naming convention (camelCase) --- collectors/infinibandMetric.go | 156 ++++++++++++++++++++------------- 1 file changed, 93 insertions(+), 63 deletions(-) diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index d68eea1..0d15311 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -23,20 +23,29 @@ import ( "golang.org/x/sys/unix" ) -const IB_BASEPATH = "/sys/class/infiniband/" +// See: https://www.kernel.org/doc/Documentation/ABI/stable/sysfs-class-infiniband +const ( + ibBasePath = "/sys/class/infiniband/" + ibDataUnit = "bytes" + ibDataRateUnit = ibDataUnit + "/sec" + ibPkgUnit = "packets" + ibPkgRateUnit = ibPkgUnit + "/sec" +) type InfinibandCollectorMetric struct { - name string - path string - unit string - scale int64 - addToIBTotal bool - addToIBTotalPkgs bool - lastState int64 + name string + path string + unit string + unitRates string + scaleByFourLanes bool + addToIBTotal bool + addToIBTotalPkgs bool + lastState uint64 + lastStateAvailable bool } type InfinibandCollectorInfo struct { - LID string // IB local Identifier (LID) + lid string // IB local Identifier (LID) device string // IB device port string // IB device port portCounterFiles []InfinibandCollectorMetric // mapping counter name -> InfinibandCollectorMetric @@ -56,7 +65,7 @@ type InfinibandCollector struct { lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths } -// Init initializes the Infiniband collector by walking through files below IB_BASEPATH +// Init initializes the Infiniband collector by walking through files below ibBasePath func (m *InfinibandCollector) Init(config json.RawMessage) error { // Check if already initialized if m.init { @@ -87,7 +96,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { } // Loop for all InfiniBand directories - globPattern := filepath.Join(IB_BASEPATH, "*", "ports", "*") + globPattern := filepath.Join(ibBasePath, "*", "ports", "*") ibDirs, err := filepath.Glob(globPattern) if err != nil { return fmt.Errorf("%s Init(): unable to glob files with pattern %s: %w", m.name, globPattern, err) @@ -122,36 +131,42 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { countersDir := filepath.Join(path, "counters") portCounterFiles := []InfinibandCollectorMetric{ { - name: "ib_recv", - path: filepath.Join(countersDir, "port_rcv_data"), - unit: "bytes", - scale: 4, - addToIBTotal: true, - lastState: -1, + // Total number of data octets, divided by 4 (lanes), received on all VLs. + // This is 64 bit counter + name: "ib_recv", + path: filepath.Join(countersDir, "port_rcv_data"), + unit: ibDataUnit, + unitRates: ibDataRateUnit, + scaleByFourLanes: true, + addToIBTotal: true, }, { - name: "ib_xmit", - path: filepath.Join(countersDir, "port_xmit_data"), - unit: "bytes", - scale: 4, - addToIBTotal: true, - lastState: -1, + // Total number of data octets, divided by 4 (lanes), transmitted on all VLs. + // This is 64 bit counter + name: "ib_xmit", + path: filepath.Join(countersDir, "port_xmit_data"), + unit: ibDataUnit, + unitRates: ibDataRateUnit, + scaleByFourLanes: true, + addToIBTotal: true, }, { + // Total number of packets received on all VLs from this port (this may include packets containing Errors. + // This is 64 bit counter. name: "ib_recv_pkts", path: filepath.Join(countersDir, "port_rcv_packets"), - unit: "packets", - scale: 1, + unit: ibPkgUnit, + unitRates: ibPkgRateUnit, addToIBTotalPkgs: true, - lastState: -1, }, { + // Total number of packets transmitted on all VLs from this port. This may include packets with errors. + // This is 64 bit counter. name: "ib_xmit_pkts", path: filepath.Join(countersDir, "port_xmit_packets"), - unit: "packets", - scale: 1, + unit: ibPkgUnit, + unitRates: ibPkgRateUnit, addToIBTotalPkgs: true, - lastState: -1, }, } for _, counter := range portCounterFiles { @@ -163,7 +178,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { m.info = append(m.info, InfinibandCollectorInfo{ - LID: LID, + lid: LID, device: device, port: port, portCounterFiles: portCounterFiles, @@ -184,7 +199,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { return nil } -// Read reads Infiniband counter files below IB_BASEPATH +// Read reads Infiniband counter files below ibBasePath func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMessage) { // Check if already initialized if !m.init { @@ -201,9 +216,9 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess for i := range m.info { info := &m.info[i] - var ib_total, ib_total_last_state, - ib_total_pkts, ib_total_pkts_last_state int64 - var ib_total_last_state_available, ib_total_pkts_last_state_available bool + var ibTotal, ibTotalPkts uint64 // sum of xmit and recv counters + var ibTotalBw, ibTotalPktsBw float64 // sum of xmit and recv rates + var ibTotalBwAvailable, ibTotalPktsBwAvailable bool for i := range info.portCounterFiles { counterDef := &info.portCounterFiles[i] @@ -213,24 +228,30 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess cclog.ComponentError( m.name, fmt.Sprintf("Read(): Failed to read from file '%s': %v", counterDef.path, err)) + // Current counter can not be saved as last state + counterDef.lastStateAvailable = false continue } data := strings.TrimSpace(string(line)) - // convert counter to int64 - v, err := strconv.ParseInt(data, 10, 64) + // convert counter to uint64 + vRawCounter, err := strconv.ParseUint(data, 10, 64) if err != nil { cclog.ComponentError( m.name, - fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to int64: %v", counterDef.name, data, err)) + fmt.Sprintf("Read(): Failed to convert Infininiband metrice %s='%s' to uint64: %v", counterDef.name, data, err)) + // Current counter can not be saved as last state + counterDef.lastStateAvailable = false continue } - // Scale raw value - v *= counterDef.scale + vScaledCounter := vRawCounter + if counterDef.scaleByFourLanes { + vScaledCounter *= uint64(4) + } // Send absolut values if m.config.SendAbsoluteValues { - if y, err := lp.NewMetric(counterDef.name, info.tagSet, m.meta, v, now); err == nil { + if y, err := lp.NewMetric(counterDef.name, info.tagSet, m.meta, vScaledCounter, now); err == nil { y.AddMeta("unit", counterDef.unit) output <- y } @@ -238,63 +259,72 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMess // Send derived values if m.config.SendDerivedValues { - if counterDef.lastState >= 0 { - rate := float64((v - counterDef.lastState)) / timeDiff + if counterDef.lastStateAvailable { + var rate float64 + // uint64 subtraction handles wraparound automatically + // in case vRawCounter < counterDef.lastState we would compute: + // math.MaxUint64 - lastState + vRawCounter + 1 + // = (2^64 - 1) - lastState + vRawCounter + 1 + // = 2^64 - lastState + vRawCounter + // ≡ vRawCounter - lastState (mod 2^64) + rate = float64(vRawCounter-counterDef.lastState) / timeDiff + if counterDef.scaleByFourLanes { + rate *= float64(4) + } if y, err := lp.NewMetric(counterDef.name+"_bw", info.tagSet, m.meta, rate, now); err == nil { - y.AddMeta("unit", counterDef.unit+"/sec") + y.AddMeta("unit", counterDef.unitRates) output <- y } - // Sum up total values of last state + // Sum up rates for total rates if m.config.SendTotalValues { switch { case counterDef.addToIBTotal: - ib_total_last_state += counterDef.lastState - ib_total_last_state_available = true + ibTotalBw += rate + ibTotalBwAvailable = true case counterDef.addToIBTotalPkgs: - ib_total_pkts_last_state += counterDef.lastState - ib_total_pkts_last_state_available = true + ibTotalPktsBw += rate + ibTotalPktsBwAvailable = true } } } - counterDef.lastState = v + counterDef.lastState = vRawCounter + counterDef.lastStateAvailable = true } // Sum up total values if m.config.SendTotalValues { switch { case counterDef.addToIBTotal: - ib_total += v + ibTotal += vScaledCounter case counterDef.addToIBTotalPkgs: - ib_total_pkts += v + ibTotalPkts += vScaledCounter } } } // Send total values if m.config.SendTotalValues { - if y, err := lp.NewMetric("ib_total", info.tagSet, m.meta, ib_total, now); err == nil { - y.AddMeta("unit", "bytes") + if y, err := lp.NewMetric("ib_total", info.tagSet, m.meta, ibTotal, now); err == nil { + y.AddMeta("unit", ibDataUnit) output <- y } - if y, err := lp.NewMetric("ib_total_pkts", info.tagSet, m.meta, ib_total_pkts, now); err == nil { - y.AddMeta("unit", "packets") + if y, err := lp.NewMetric("ib_total_pkts", info.tagSet, m.meta, ibTotalPkts, now); err == nil { + y.AddMeta("unit", ibPkgUnit) output <- y } - if m.config.SendDerivedValues && ib_total_last_state_available { - rate := float64((ib_total - ib_total_last_state)) / timeDiff - if y, err := lp.NewMetric("ib_total_bw", info.tagSet, m.meta, rate, now); err == nil { - y.AddMeta("unit", "bytes/sec") + if m.config.SendDerivedValues && ibTotalBwAvailable { + if y, err := lp.NewMetric("ib_total_bw", info.tagSet, m.meta, ibTotalBw, now); err == nil { + y.AddMeta("unit", ibDataRateUnit) output <- y } } - if m.config.SendDerivedValues && ib_total_pkts_last_state_available { - rate := float64((ib_total_pkts - ib_total_pkts_last_state)) / timeDiff - if y, err := lp.NewMetric("ib_total_pkts_bw", info.tagSet, m.meta, rate, now); err == nil { - y.AddMeta("unit", "packets/sec") + if m.config.SendDerivedValues && ibTotalPktsBwAvailable { + if y, err := lp.NewMetric("ib_total_pkts_bw", info.tagSet, m.meta, ibTotalPktsBw, now); err == nil { + y.AddMeta("unit", ibPkgRateUnit) output <- y } }