diff --git a/.github/ci-config.json b/.github/ci-config.json index 35a837e..402388d 100644 --- a/.github/ci-config.json +++ b/.github/ci-config.json @@ -49,4 +49,4 @@ } } } -} +} \ No newline at end of file diff --git a/Makefile b/Makefile index a3be0af..9c275f4 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,7 @@ GOSRC_RECEIVERS := $(wildcard receivers/*.go) GOSRC_INTERNAL := $(wildcard internal/*/*.go) GOSRC := $(GOSRC_APP) $(GOSRC_COLLECTORS) $(GOSRC_SINKS) $(GOSRC_RECEIVERS) $(GOSRC_INTERNAL) + .PHONY: all all: $(APP) @@ -24,9 +25,9 @@ fmt: go fmt $(GOSRC_COLLECTORS) go fmt $(GOSRC_SINKS) go fmt $(GOSRC_RECEIVERS) - go fmt $(GOSRC_INTERNAL) go fmt $(GOSRC_APP) - find . -name "*.go" -exec go fmt {} \; + @for F in $(GOSRC_INTERNAL); do go fmt $$F; done + # Examine Go source code and reports suspicious constructs .PHONY: vet diff --git a/collectors/cpufreqCpuinfoMetric.go b/collectors/cpufreqCpuinfoMetric.go index 54663de..9c91a50 100644 --- a/collectors/cpufreqCpuinfoMetric.go +++ b/collectors/cpufreqCpuinfoMetric.go @@ -3,14 +3,15 @@ package collectors import ( "bufio" "encoding/json" + "fmt" "log" "os" "strconv" "strings" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + ) // @@ -150,6 +151,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error { return nil } + func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CCMetric) { if !m.init { return diff --git a/collectors/cpufreqMetric.go b/collectors/cpufreqMetric.go index f3309ff..5febed9 100644 --- a/collectors/cpufreqMetric.go +++ b/collectors/cpufreqMetric.go @@ -10,7 +10,6 @@ import ( "strconv" "strings" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "golang.org/x/sys/unix" ) diff --git a/collectors/cpustatMetric.go b/collectors/cpustatMetric.go index 92dba4d..f517300 100644 --- a/collectors/cpustatMetric.go +++ b/collectors/cpustatMetric.go @@ -7,7 +7,6 @@ import ( "strconv" "strings" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) diff --git a/collectors/diskstatMetric.go b/collectors/diskstatMetric.go index b935eb3..50c41cd 100644 --- a/collectors/diskstatMetric.go +++ b/collectors/diskstatMetric.go @@ -2,9 +2,7 @@ package collectors import ( "io/ioutil" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - // "log" "encoding/json" "errors" diff --git a/collectors/gpfsMetric.go b/collectors/gpfsMetric.go index d8d637c..f1d3d75 100644 --- a/collectors/gpfsMetric.go +++ b/collectors/gpfsMetric.go @@ -13,18 +13,19 @@ import ( "strconv" "strings" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) type GpfsCollector struct { metricCollector tags map[string]string + config struct { Mmpmon string `json:"mmpmon"` } } + func (m *GpfsCollector) Init(config json.RawMessage) error { var err error m.name = "GpfsCollector" @@ -116,8 +117,10 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): Failed to get filesystem name.\n") continue } + m.tags["filesystem"] = filesystem + // return code rc, err := strconv.Atoi(key_value["_rc_"]) if err != nil { @@ -150,6 +153,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { key_value["_br_"], err.Error()) continue } + y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp) if err == nil { output <- y @@ -163,6 +167,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { key_value["_bw_"], err.Error()) continue } + y, err = lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp) if err == nil { output <- y diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index 56a34af..7b0e9dc 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -5,9 +5,7 @@ import ( "io/ioutil" "log" "os/exec" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" - // "os" "encoding/json" "errors" @@ -171,6 +169,26 @@ func (m *InfinibandCollector) doPerfQuery(cmd string, dev string, lid string, po } } } + if strings.HasPrefix(line, "PortRcvPkts") || strings.HasPrefix(line, "RcvPkts") { + lv := strings.Fields(line) + v, err := strconv.ParseFloat(lv[1], 64) + if err == nil { + y, err := lp.New("ib_recv_pkts", tags, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + *out = append(*out, y) + } + } + } + if strings.HasPrefix(line, "PortXmitPkts") || strings.HasPrefix(line, "XmtPkts") { + lv := strings.Fields(line) + v, err := strconv.ParseFloat(lv[1], 64) + if err == nil { + y, err := lp.New("ib_xmit_pkts", tags, map[string]interface{}{"value": float64(v)}, time.Now()) + if err == nil { + *out = append(*out, y) + } + } + } } return nil } @@ -221,7 +239,6 @@ func (m *InfinibandCollector) doSysfsRead(dev string, lid string, port string, t } } } - return nil } diff --git a/collectors/ipmiMetric.go b/collectors/ipmiMetric.go index 252055b..f4c5167 100644 --- a/collectors/ipmiMetric.go +++ b/collectors/ipmiMetric.go @@ -9,7 +9,6 @@ import ( "strconv" "strings" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index 56582b0..430a09b 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -20,7 +20,6 @@ import ( "strings" "time" "unsafe" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "gopkg.in/Knetic/govaluate.v2" ) diff --git a/collectors/loadavgMetric.go b/collectors/loadavgMetric.go index 78f1a20..11c0e5e 100644 --- a/collectors/loadavgMetric.go +++ b/collectors/loadavgMetric.go @@ -6,7 +6,6 @@ import ( "strconv" "strings" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) diff --git a/collectors/lustreMetric.go b/collectors/lustreMetric.go index 2fc2171..3e248fa 100644 --- a/collectors/lustreMetric.go +++ b/collectors/lustreMetric.go @@ -8,7 +8,6 @@ import ( "strconv" "strings" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) diff --git a/collectors/memstatMetric.go b/collectors/memstatMetric.go index 98d69ac..c83402c 100644 --- a/collectors/memstatMetric.go +++ b/collectors/memstatMetric.go @@ -9,7 +9,6 @@ import ( "strconv" "strings" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) diff --git a/collectors/netstatMetric.go b/collectors/netstatMetric.go index 077eb15..86437ea 100644 --- a/collectors/netstatMetric.go +++ b/collectors/netstatMetric.go @@ -7,7 +7,6 @@ import ( "strconv" "strings" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) diff --git a/collectors/nvidiaMetric.go b/collectors/nvidiaMetric.go index bfd7e2f..6f5141a 100644 --- a/collectors/nvidiaMetric.go +++ b/collectors/nvidiaMetric.go @@ -6,7 +6,6 @@ import ( "fmt" "log" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" "github.com/NVIDIA/go-nvml/pkg/nvml" ) diff --git a/collectors/tempMetric.go b/collectors/tempMetric.go index a004c24..b73d582 100644 --- a/collectors/tempMetric.go +++ b/collectors/tempMetric.go @@ -10,7 +10,6 @@ import ( "strconv" "strings" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) diff --git a/collectors/topprocsMetric.go b/collectors/topprocsMetric.go index dd6bff3..d2691dc 100644 --- a/collectors/topprocsMetric.go +++ b/collectors/topprocsMetric.go @@ -8,7 +8,6 @@ import ( "os/exec" "strings" "time" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) diff --git a/metric-collector.go b/metric-collector.go index 97e5179..c071933 100644 --- a/metric-collector.go +++ b/metric-collector.go @@ -17,40 +17,12 @@ import ( "sync" "time" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter" mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker" ) -// List of provided collectors. Which collector should be run can be -// configured at 'collectors' list in 'config.json'. -//var Collectors = map[string]collectors.MetricCollector{ -// "likwid": &collectors.LikwidCollector{}, -// "loadavg": &collectors.LoadavgCollector{}, -// "memstat": &collectors.MemstatCollector{}, -// "netstat": &collectors.NetstatCollector{}, -// "ibstat": &collectors.InfinibandCollector{}, -// "lustrestat": &collectors.LustreCollector{}, -// "cpustat": &collectors.CpustatCollector{}, -// "topprocs": &collectors.TopProcsCollector{}, -// "nvidia": &collectors.NvidiaCollector{}, -// "customcmd": &collectors.CustomCmdCollector{}, -// "diskstat": &collectors.DiskstatCollector{}, -// "tempstat": &collectors.TempCollector{}, -// "ipmistat": &collectors.IpmiCollector{}, -//} - -//var Sinks = map[string]sinks.Sink{ -// "influxdb": &sinks.InfluxSink{}, -// "stdout": &sinks.StdoutSink{}, -// "nats": &sinks.NatsSink{}, -// "http": &sinks.HttpSink{}, -//} - -//var Receivers = map[string]receivers.ReceiverFuncs{ -// "nats": &receivers.NatsReceiver{}, -//} - type CentralConfigFile struct { Interval int `json:"interval"` Duration int `json:"duration"` @@ -303,127 +275,6 @@ func main() { if use_recv { rcfg.ReceiveManager.Start() } - // if len(config.Collectors) == 0 { - // var keys []string - // for k := range Collectors { - // keys = append(keys, k) - // } - // log.Print("Configuration value 'collectors' does not contain any collector. Available: ", strings.Join(keys, ", ")) - // return - // } - // for _, name := range config.Collectors { - // if _, found := Collectors[name]; !found { - // log.Print("Invalid collector '", name, "' in configuration") - // return - // } - // } - // if _, found := Sinks[config.Sink.Type]; !found { - // log.Print("Invalid sink type '", config.Sink.Type, "' in configuration") - // return - // } - // // Setup sink - // sink := Sinks[config.Sink.Type] - // err = sink.Init(config.Sink) - // if err != nil { - // log.Print(err) - // return - // } - // sinkChannel := make(chan bool) - // mproxy.Init(sinkChannel, &wg) - // // Setup receiver - // if len(config.Receiver.Type) > 0 && config.Receiver.Type != "none" { - // if _, found := Receivers[config.Receiver.Type]; !found { - // log.Print("Invalid receiver type '", config.Receiver.Type, "' in configuration") - // return - // } else { - // recv = Receivers[config.Receiver.Type] - // err = recv.Init(config.Receiver, sink) - // if err == nil { - // use_recv = true - // } else { - // log.Print(err) - // } - // } - // } - - // // Register interrupt handler - // prepare_shutdown(&wg, &config, sink, recv, clicfg["pidfile"]) - - // // Initialize all collectors - // tmp := make([]string, 0) - // for _, c := range config.Collectors { - // col := Collectors[c] - // conf, found := config.CollectConfigs[c] - // if !found { - // conf = json.RawMessage("") - // } - // err = col.Init([]byte(conf)) - // if err != nil { - // log.Print("SKIP ", col.Name(), " (", err.Error(), ")") - // } else if !col.Initialized() { - // log.Print("SKIP ", col.Name(), " (Not initialized)") - // } else { - // log.Print("Start ", col.Name()) - // tmp = append(tmp, c) - // } - // } - // config.Collectors = tmp - // config.DefTags["hostname"] = host - - // // Setup up ticker loop - // if clicfg["once"] != "true" { - // log.Print("Running loop every ", time.Duration(config.Interval)*time.Second) - // } else { - // log.Print("Running loop only once") - // } - // ticker := time.NewTicker(time.Duration(config.Interval) * time.Second) - // done := make(chan bool) - - // // Storage for all node metrics - // tmpPoints := make([]lp.MutableMetric, 0) - - // // Start receiver - // if use_recv { - // recv.Start() - // } - - // go func() { - // for { - // select { - // case <-done: - // return - // case t := <-ticker.C: - - // // Read all collectors are sort the results in the right - // // storage locations - // for _, c := range config.Collectors { - // col := Collectors[c] - // col.Read(time.Duration(config.Duration), &tmpPoints) - - // for { - // if len(tmpPoints) == 0 { - // break - // } - // p := tmpPoints[0] - // for k, v := range config.DefTags { - // p.AddTag(k, v) - // p.SetTime(t) - // } - // sink.Write(p) - // tmpPoints = tmpPoints[1:] - // } - // } - - // if err := sink.Flush(); err != nil { - // log.Printf("sink error: %s\n", err) - // } - // if clicfg["once"] == "true" { - // shutdown(&wg, config.Collectors, sink, recv, clicfg["pidfile"]) - // return - // } - // } - // } - // }() // Wait until receiving an interrupt rcfg.Sync.Wait()