diff --git a/collectors/customCmdMetric.go b/collectors/customCmdMetric.go index 595680c..08b7f4c 100644 --- a/collectors/customCmdMetric.go +++ b/collectors/customCmdMetric.go @@ -8,10 +8,11 @@ package collectors import ( + "bytes" "encoding/json" "errors" "fmt" - "log" + "io" "os" "os/exec" "slices" @@ -20,7 +21,9 @@ import ( cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" lp "github.com/ClusterCockpit/cc-lib/v2/ccMessage" - influx "github.com/influxdata/line-protocol" + + receivers "github.com/ClusterCockpit/cc-lib/v2/receivers" + lp2 "github.com/influxdata/line-protocol/v2/lineprotocol" ) const CUSTOMCMDPATH = `/home/unrz139/Work/cc-metric-collector/collectors/custom` @@ -33,115 +36,155 @@ type CustomCmdCollectorConfig struct { type CustomCmdCollector struct { metricCollector - handler *influx.MetricHandler - parser *influx.Parser - config CustomCmdCollectorConfig - commands []string - files []string + config CustomCmdCollectorConfig + cmdFieldsSlice [][]string + files []string } func (m *CustomCmdCollector) Init(config json.RawMessage) error { - var err error m.name = "CustomCmdCollector" m.parallel = true m.meta = map[string]string{ "source": m.name, "group": "Custom", } + + // Read configuration if len(config) > 0 { - err = json.Unmarshal(config, &m.config) - if err != nil { + if err := json.Unmarshal(config, &m.config); err != nil { return fmt.Errorf("%s Init(): json.Unmarshal() call failed: %w", m.name, err) } } + + // Setup if err := m.setup(); err != nil { return fmt.Errorf("%s Init(): setup() call failed: %w", m.name, err) } + + // Check if command can be executed for _, c := range m.config.Commands { - cmdfields := strings.Fields(c) - command := exec.Command(cmdfields[0], cmdfields[1:]...) - _, err = command.Output() - if err == nil { - m.commands = append(m.commands, c) - } else { + cmdFields := strings.Fields(c) + command := exec.Command(cmdFields[0], cmdFields[1:]...) + if _, err := command.Output(); err != nil { cclog.ComponentWarn( m.name, - fmt.Sprintf("%s Init(): Execution of command \"%s\" failed: %v", m.name, command.String(), err), - ) + fmt.Sprintf("%s Init(): Execution of command \"%s\" failed: %v", m.name, command.String(), err)) continue } + m.cmdFieldsSlice = append(m.cmdFieldsSlice, cmdFields) } - for _, f := range m.config.Files { - _, err = os.ReadFile(f) - if err == nil { - m.files = append(m.files, f) - } else { + + // Check if file can be read + for _, fileName := range m.config.Files { + if _, err := os.ReadFile(fileName); err != nil { cclog.ComponentWarn( m.name, - fmt.Sprintf("%s Init(): Reading of file \"%s\" failed: %v", m.name, f, err), - ) + fmt.Sprintf("%s Init(): Reading of file \"%s\" failed: %v", m.name, fileName, err)) continue } + m.files = append(m.files, fileName) } - if len(m.files) == 0 && len(m.commands) == 0 { + + if len(m.files) == 0 && len(m.cmdFieldsSlice) == 0 { return errors.New("no metrics to collect") } - m.handler = influx.NewMetricHandler() - m.parser = influx.NewParser(m.handler) - m.parser.SetTimeFunc(DefaultTime) m.init = true return nil } -var DefaultTime = func() time.Time { - return time.Unix(42, 0) -} - func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMessage) { if !m.init { return } - for _, cmd := range m.commands { - cmdfields := strings.Fields(cmd) - command := exec.Command(cmdfields[0], cmdfields[1:]...) - if err := command.Wait(); err != nil { - log.Print(err) - continue - } - stdout, err := command.Output() + + // Execute configured commands + for _, cmdFields := range m.cmdFieldsSlice { + command := exec.Command(cmdFields[0], cmdFields[1:]...) + stdout, err := command.StdoutPipe() if err != nil { - log.Print(err) + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to create stdout pipe for command \"%s\": %v", command.String(), err), + ) continue } - cmdmetrics, err := m.parser.Parse(stdout) - if err != nil { - log.Print(err) + errBuf := new(bytes.Buffer) + command.Stderr = errBuf + + // Start command + if err := command.Start(); err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to start command \"%s\": %v", command.String(), err), + ) continue } - for _, c := range cmdmetrics { - if slices.Contains(m.config.ExcludeMetrics, c.Name()) { + + // Read and decode influxDB line-protocol from command output + d := lp2.NewDecoder(stdout) + for d.Next() { + metric, err := receivers.DecodeInfluxMessage(d) + if err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to decode influx Message: %v", err), + ) continue } + if slices.Contains(m.config.ExcludeMetrics, metric.Name()) { + continue + } + output <- metric + } - output <- lp.FromInfluxMetric(c) + // Wait for command end + if err := command.Wait(); err != nil { + errMsg, _ := io.ReadAll(errBuf) + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to wait for the end of command \"%s\": %v\n", command.String(), err), + ) + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): command stderr: \"%s\"\n", strings.TrimSpace(string(errMsg)))) + continue } } - for _, file := range m.files { - buffer, err := os.ReadFile(file) + + // Read configured files + for _, filename := range m.files { + file, err := os.Open(filename) if err != nil { - log.Print(err) - return - } - fmetrics, err := m.parser.Parse(buffer) - if err != nil { - log.Print(err) + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to open file \"%s\": %v\n", filename, err), + ) continue } - for _, f := range fmetrics { - if slices.Contains(m.config.ExcludeMetrics, f.Name()) { + + // Read and decode influxDB line-protocol from file + d := lp2.NewDecoder(file) + for d.Next() { + metric, err := receivers.DecodeInfluxMessage(d) + if err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to decode influx Message: %v", err), + ) continue } - output <- lp.FromInfluxMetric(f) + if slices.Contains(m.config.ExcludeMetrics, metric.Name()) { + continue + } + output <- metric + } + + if err := file.Close(); err != nil { + cclog.ComponentError( + m.name, + fmt.Sprintf("Read(): Failed to close file \"%s\": %v\n", filename, err), + ) + continue } } }