From 83720aa5be7d691953e5ae9c735d9bf60e5af80e Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Mon, 16 Feb 2026 09:57:24 +0100 Subject: [PATCH] Use cc-lib lp.FromBytes, do not use influxdb client directly --- collectors/customCmdMetric.go | 83 ++++++++++------------------------- 1 file changed, 22 insertions(+), 61 deletions(-) diff --git a/collectors/customCmdMetric.go b/collectors/customCmdMetric.go index 08b7f4c..a4720eb 100644 --- a/collectors/customCmdMetric.go +++ b/collectors/customCmdMetric.go @@ -8,11 +8,9 @@ package collectors import ( - "bytes" "encoding/json" "errors" "fmt" - "io" "os" "os/exec" "slices" @@ -21,9 +19,6 @@ import ( cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" - - receivers "github.com/ClusterCockpit/cc-lib/v2/receivers" - lp2 "github.com/influxdata/line-protocol/v2/lineprotocol" ) const CUSTOMCMDPATH = `/home/unrz139/Work/cc-metric-collector/collectors/custom` @@ -100,92 +95,58 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMessa // Execute configured commands for _, cmdFields := range m.cmdFieldsSlice { command := exec.Command(cmdFields[0], cmdFields[1:]...) - stdout, err := command.StdoutPipe() + stdout, err := command.Output() if err != nil { cclog.ComponentError( m.name, - fmt.Sprintf("Read(): Failed to create stdout pipe for command \"%s\": %v", command.String(), err), - ) - continue - } - errBuf := new(bytes.Buffer) - command.Stderr = errBuf - - // Start command - if err := command.Start(); err != nil { - cclog.ComponentError( - m.name, - fmt.Sprintf("Read(): Failed to start command \"%s\": %v", command.String(), err), + fmt.Sprintf("Read(): Failed to read command output for command \"%s\": %v", command.String(), err), ) continue } // Read and decode influxDB line-protocol from command output - d := lp2.NewDecoder(stdout) - for d.Next() { - metric, err := receivers.DecodeInfluxMessage(d) - if err != nil { - cclog.ComponentError( - m.name, - fmt.Sprintf("Read(): Failed to decode influx Message: %v", err), - ) - continue - } + metrics, err := lp.FromBytes(stdout) + if err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to decode influx Message: %v", err), + ) + continue + } + for _, metric := range metrics { if slices.Contains(m.config.ExcludeMetrics, metric.Name()) { continue } output <- metric } - - // Wait for command end - if err := command.Wait(); err != nil { - errMsg, _ := io.ReadAll(errBuf) - cclog.ComponentError( - m.name, - fmt.Sprintf("Read(): Failed to wait for the end of command \"%s\": %v\n", command.String(), err), - ) - cclog.ComponentError( - m.name, - fmt.Sprintf("Read(): command stderr: \"%s\"\n", strings.TrimSpace(string(errMsg)))) - continue - } } // Read configured files for _, filename := range m.files { - file, err := os.Open(filename) + input, err := os.ReadFile(filename) if err != nil { cclog.ComponentError( m.name, - fmt.Sprintf("Read(): Failed to open file \"%s\": %v\n", filename, err), + fmt.Sprintf("Read(): Failed to read file \"%s\": %v\n", filename, err), ) continue } // Read and decode influxDB line-protocol from file - d := lp2.NewDecoder(file) - for d.Next() { - metric, err := receivers.DecodeInfluxMessage(d) - if err != nil { - cclog.ComponentError( - m.name, - fmt.Sprintf("Read(): Failed to decode influx Message: %v", err), - ) - continue - } + metrics, err := lp.FromBytes(input) + if err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to decode influx Message: %v", err), + ) + continue + } + for _, metric := range metrics { if slices.Contains(m.config.ExcludeMetrics, metric.Name()) { continue } output <- metric } - - if err := file.Close(); err != nil { - cclog.ComponentError( - m.name, - fmt.Sprintf("Read(): Failed to close file \"%s\": %v\n", filename, err), - ) - continue - } } }