mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-10-31 09:05:05 +01:00 
			
		
		
		
	Only compute rates with a valid previous state
This commit is contained in:
		| @@ -17,11 +17,11 @@ import ( | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||
| ) | ||||
|  | ||||
| const DEFAULT_GPFS_CMD = `mmpmon` | ||||
| const DEFAULT_GPFS_CMD = "mmpmon" | ||||
|  | ||||
| type GpfsCollectorLastValues struct { | ||||
| 	read  int64 | ||||
| 	write int64 | ||||
| type GpfsCollectorLastState struct { | ||||
| 	bytesRead    int64 | ||||
| 	bytesWritten int64 | ||||
| } | ||||
|  | ||||
| type GpfsCollector struct { | ||||
| @@ -34,7 +34,7 @@ type GpfsCollector struct { | ||||
| 	} | ||||
| 	skipFS        map[string]struct{} | ||||
| 	lastTimestamp time.Time // Store time stamp of last tick to derive bandwidths | ||||
| 	lastValues    map[string]GpfsCollectorLastValues | ||||
| 	lastState     map[string]GpfsCollectorLastState | ||||
| } | ||||
|  | ||||
| func (m *GpfsCollector) Init(config json.RawMessage) error { | ||||
| @@ -46,10 +46,9 @@ func (m *GpfsCollector) Init(config json.RawMessage) error { | ||||
| 	var err error | ||||
| 	m.name = "GpfsCollector" | ||||
| 	m.setup() | ||||
| 	m.lastTimestamp = time.Now() | ||||
|  | ||||
| 	// Set default mmpmon binary | ||||
| 	m.config.Mmpmon = string(DEFAULT_GPFS_CMD) | ||||
| 	m.config.Mmpmon = DEFAULT_GPFS_CMD | ||||
|  | ||||
| 	// Read JSON configuration | ||||
| 	if len(config) > 0 { | ||||
| @@ -98,9 +97,13 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// Current time stamp | ||||
| 	now := time.Now() | ||||
| 	tdiff := now.Sub(m.lastTimestamp) | ||||
| 	// time difference to last time stamp | ||||
| 	timeDiff := now.Sub(m.lastTimestamp).Seconds() | ||||
| 	// Save current timestamp | ||||
| 	m.lastTimestamp = now | ||||
|  | ||||
| 	// mmpmon: | ||||
| 	// -p: generate output that can be parsed | ||||
| 	// -s: suppress the prompt on input | ||||
| @@ -160,10 +163,10 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 		} | ||||
|  | ||||
| 		m.tags["filesystem"] = filesystem | ||||
| 		if _, ok := m.lastValues[filesystem]; !ok { | ||||
| 			m.lastValues[filesystem] = GpfsCollectorLastValues{ | ||||
| 				read:  0, | ||||
| 				write: 0, | ||||
| 		if _, ok := m.lastState[filesystem]; !ok { | ||||
| 			m.lastState[filesystem] = GpfsCollectorLastState{ | ||||
| 				bytesRead:    -1, | ||||
| 				bytesWritten: -1, | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| @@ -210,13 +213,12 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 			output <- y | ||||
| 		} | ||||
| 		if m.config.SendBandwidths { | ||||
| 			lastVal := m.lastValues[filesystem] | ||||
| 			diff := bytesRead - lastVal.read | ||||
| 			lastVal.read = bytesRead | ||||
| 			if y, err := lp.New("gpfs_bw_read", m.tags, m.meta, map[string]interface{}{"value": float64(diff) / tdiff.Seconds()}, timestamp); err == nil { | ||||
| 				output <- y | ||||
| 			if lastBytesRead := m.lastState[filesystem].bytesRead; lastBytesRead >= 0 { | ||||
| 				bwRead := float64(bytesRead-lastBytesRead) / timeDiff | ||||
| 				if y, err := lp.New("gpfs_bw_read", m.tags, m.meta, map[string]interface{}{"value": bwRead}, timestamp); err == nil { | ||||
| 					output <- y | ||||
| 				} | ||||
| 			} | ||||
| 			m.lastValues[filesystem] = lastVal | ||||
| 		} | ||||
|  | ||||
| 		// bytes written | ||||
| @@ -231,13 +233,19 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { | ||||
| 			output <- y | ||||
| 		} | ||||
| 		if m.config.SendBandwidths { | ||||
| 			lastVal := m.lastValues[filesystem] | ||||
| 			diff := bytesWritten - lastVal.write | ||||
| 			lastVal.write = bytesWritten | ||||
| 			if y, err := lp.New("gpfs_bw_write", m.tags, m.meta, map[string]interface{}{"value": float64(diff) / tdiff.Seconds()}, timestamp); err == nil { | ||||
| 				output <- y | ||||
| 			if lastBytesWritten := m.lastState[filesystem].bytesRead; lastBytesWritten >= 0 { | ||||
| 				bwWrite := float64(bytesWritten-lastBytesWritten) / timeDiff | ||||
| 				if y, err := lp.New("gpfs_bw_write", m.tags, m.meta, map[string]interface{}{"value": bwWrite}, timestamp); err == nil { | ||||
| 					output <- y | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		if m.config.SendBandwidths { | ||||
| 			m.lastState[filesystem] = GpfsCollectorLastState{ | ||||
| 				bytesRead:    bytesRead, | ||||
| 				bytesWritten: bytesWritten, | ||||
| 			} | ||||
| 			m.lastValues[filesystem] = lastVal | ||||
| 		} | ||||
|  | ||||
| 		// number of opens | ||||
|   | ||||
| @@ -161,6 +161,8 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr | ||||
| 	now := time.Now() | ||||
| 	// time difference to last time stamp | ||||
| 	timeDiff := now.Sub(m.lastTimestamp).Seconds() | ||||
| 	// Save current timestamp | ||||
| 	m.lastTimestamp = now | ||||
|  | ||||
| 	for _, info := range m.info { | ||||
| 		for counterName, counterFile := range info.portCounterFiles { | ||||
| @@ -205,9 +207,6 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr | ||||
| 		} | ||||
|  | ||||
| 	} | ||||
|  | ||||
| 	// Save current timestamp | ||||
| 	m.lastTimestamp = now | ||||
| } | ||||
|  | ||||
| func (m *InfinibandCollector) Close() { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user