Merge branch 'develop' into channels

This commit is contained in:
Thomas Gruber 2022-01-25 13:26:39 +01:00 committed by GitHub
commit 16273236c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 34 additions and 171 deletions

View File

@ -49,4 +49,4 @@
}
}
}
}
}

View File

@ -6,6 +6,7 @@ GOSRC_RECEIVERS := $(wildcard receivers/*.go)
GOSRC_INTERNAL := $(wildcard internal/*/*.go)
GOSRC := $(GOSRC_APP) $(GOSRC_COLLECTORS) $(GOSRC_SINKS) $(GOSRC_RECEIVERS) $(GOSRC_INTERNAL)
.PHONY: all
all: $(APP)
@ -24,9 +25,9 @@ fmt:
go fmt $(GOSRC_COLLECTORS)
go fmt $(GOSRC_SINKS)
go fmt $(GOSRC_RECEIVERS)
go fmt $(GOSRC_INTERNAL)
go fmt $(GOSRC_APP)
find . -name "*.go" -exec go fmt {} \;
@for F in $(GOSRC_INTERNAL); do go fmt $$F; done
# Examine Go source code and reports suspicious constructs
.PHONY: vet

View File

@ -3,14 +3,15 @@ package collectors
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
//
@ -150,6 +151,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
return nil
}
func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if !m.init {
return

View File

@ -10,7 +10,6 @@ import (
"strconv"
"strings"
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"golang.org/x/sys/unix"
)

View File

@ -7,7 +7,6 @@ import (
"strconv"
"strings"
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)

View File

@ -2,9 +2,7 @@ package collectors
import (
"io/ioutil"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
// "log"
"encoding/json"
"errors"

View File

@ -13,18 +13,19 @@ import (
"strconv"
"strings"
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
type GpfsCollector struct {
metricCollector
tags map[string]string
config struct {
Mmpmon string `json:"mmpmon"`
}
}
func (m *GpfsCollector) Init(config json.RawMessage) error {
var err error
m.name = "GpfsCollector"
@ -116,8 +117,10 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
fmt.Fprintf(os.Stderr, "GpfsCollector.Read(): Failed to get filesystem name.\n")
continue
}
m.tags["filesystem"] = filesystem
// return code
rc, err := strconv.Atoi(key_value["_rc_"])
if err != nil {
@ -150,6 +153,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
key_value["_br_"], err.Error())
continue
}
y, err := lp.New("gpfs_bytes_read", m.tags, m.meta, map[string]interface{}{"value": bytesRead}, timestamp)
if err == nil {
output <- y
@ -163,6 +167,7 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
key_value["_bw_"], err.Error())
continue
}
y, err = lp.New("gpfs_bytes_written", m.tags, m.meta, map[string]interface{}{"value": bytesWritten}, timestamp)
if err == nil {
output <- y

View File

@ -5,9 +5,7 @@ import (
"io/ioutil"
"log"
"os/exec"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
// "os"
"encoding/json"
"errors"
@ -171,6 +169,26 @@ func (m *InfinibandCollector) doPerfQuery(cmd string, dev string, lid string, po
}
}
}
if strings.HasPrefix(line, "PortRcvPkts") || strings.HasPrefix(line, "RcvPkts") {
lv := strings.Fields(line)
v, err := strconv.ParseFloat(lv[1], 64)
if err == nil {
y, err := lp.New("ib_recv_pkts", tags, map[string]interface{}{"value": float64(v)}, time.Now())
if err == nil {
*out = append(*out, y)
}
}
}
if strings.HasPrefix(line, "PortXmitPkts") || strings.HasPrefix(line, "XmtPkts") {
lv := strings.Fields(line)
v, err := strconv.ParseFloat(lv[1], 64)
if err == nil {
y, err := lp.New("ib_xmit_pkts", tags, map[string]interface{}{"value": float64(v)}, time.Now())
if err == nil {
*out = append(*out, y)
}
}
}
}
return nil
}
@ -221,7 +239,6 @@ func (m *InfinibandCollector) doSysfsRead(dev string, lid string, port string, t
}
}
}
return nil
}

View File

@ -9,7 +9,6 @@ import (
"strconv"
"strings"
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)

View File

@ -20,7 +20,6 @@ import (
"strings"
"time"
"unsafe"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"gopkg.in/Knetic/govaluate.v2"
)

View File

@ -6,7 +6,6 @@ import (
"strconv"
"strings"
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)

View File

@ -8,7 +8,6 @@ import (
"strconv"
"strings"
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)

View File

@ -9,7 +9,6 @@ import (
"strconv"
"strings"
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)

View File

@ -7,7 +7,6 @@ import (
"strconv"
"strings"
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)

View File

@ -6,7 +6,6 @@ import (
"fmt"
"log"
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"github.com/NVIDIA/go-nvml/pkg/nvml"
)

View File

@ -10,7 +10,6 @@ import (
"strconv"
"strings"
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)

View File

@ -8,7 +8,6 @@ import (
"os/exec"
"strings"
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)

View File

@ -17,40 +17,12 @@ import (
"sync"
"time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
)
// List of provided collectors. Which collector should be run can be
// configured at 'collectors' list in 'config.json'.
//var Collectors = map[string]collectors.MetricCollector{
// "likwid": &collectors.LikwidCollector{},
// "loadavg": &collectors.LoadavgCollector{},
// "memstat": &collectors.MemstatCollector{},
// "netstat": &collectors.NetstatCollector{},
// "ibstat": &collectors.InfinibandCollector{},
// "lustrestat": &collectors.LustreCollector{},
// "cpustat": &collectors.CpustatCollector{},
// "topprocs": &collectors.TopProcsCollector{},
// "nvidia": &collectors.NvidiaCollector{},
// "customcmd": &collectors.CustomCmdCollector{},
// "diskstat": &collectors.DiskstatCollector{},
// "tempstat": &collectors.TempCollector{},
// "ipmistat": &collectors.IpmiCollector{},
//}
//var Sinks = map[string]sinks.Sink{
// "influxdb": &sinks.InfluxSink{},
// "stdout": &sinks.StdoutSink{},
// "nats": &sinks.NatsSink{},
// "http": &sinks.HttpSink{},
//}
//var Receivers = map[string]receivers.ReceiverFuncs{
// "nats": &receivers.NatsReceiver{},
//}
type CentralConfigFile struct {
Interval int `json:"interval"`
Duration int `json:"duration"`
@ -303,127 +275,6 @@ func main() {
if use_recv {
rcfg.ReceiveManager.Start()
}
// if len(config.Collectors) == 0 {
// var keys []string
// for k := range Collectors {
// keys = append(keys, k)
// }
// log.Print("Configuration value 'collectors' does not contain any collector. Available: ", strings.Join(keys, ", "))
// return
// }
// for _, name := range config.Collectors {
// if _, found := Collectors[name]; !found {
// log.Print("Invalid collector '", name, "' in configuration")
// return
// }
// }
// if _, found := Sinks[config.Sink.Type]; !found {
// log.Print("Invalid sink type '", config.Sink.Type, "' in configuration")
// return
// }
// // Setup sink
// sink := Sinks[config.Sink.Type]
// err = sink.Init(config.Sink)
// if err != nil {
// log.Print(err)
// return
// }
// sinkChannel := make(chan bool)
// mproxy.Init(sinkChannel, &wg)
// // Setup receiver
// if len(config.Receiver.Type) > 0 && config.Receiver.Type != "none" {
// if _, found := Receivers[config.Receiver.Type]; !found {
// log.Print("Invalid receiver type '", config.Receiver.Type, "' in configuration")
// return
// } else {
// recv = Receivers[config.Receiver.Type]
// err = recv.Init(config.Receiver, sink)
// if err == nil {
// use_recv = true
// } else {
// log.Print(err)
// }
// }
// }
// // Register interrupt handler
// prepare_shutdown(&wg, &config, sink, recv, clicfg["pidfile"])
// // Initialize all collectors
// tmp := make([]string, 0)
// for _, c := range config.Collectors {
// col := Collectors[c]
// conf, found := config.CollectConfigs[c]
// if !found {
// conf = json.RawMessage("")
// }
// err = col.Init([]byte(conf))
// if err != nil {
// log.Print("SKIP ", col.Name(), " (", err.Error(), ")")
// } else if !col.Initialized() {
// log.Print("SKIP ", col.Name(), " (Not initialized)")
// } else {
// log.Print("Start ", col.Name())
// tmp = append(tmp, c)
// }
// }
// config.Collectors = tmp
// config.DefTags["hostname"] = host
// // Setup up ticker loop
// if clicfg["once"] != "true" {
// log.Print("Running loop every ", time.Duration(config.Interval)*time.Second)
// } else {
// log.Print("Running loop only once")
// }
// ticker := time.NewTicker(time.Duration(config.Interval) * time.Second)
// done := make(chan bool)
// // Storage for all node metrics
// tmpPoints := make([]lp.MutableMetric, 0)
// // Start receiver
// if use_recv {
// recv.Start()
// }
// go func() {
// for {
// select {
// case <-done:
// return
// case t := <-ticker.C:
// // Read all collectors are sort the results in the right
// // storage locations
// for _, c := range config.Collectors {
// col := Collectors[c]
// col.Read(time.Duration(config.Duration), &tmpPoints)
// for {
// if len(tmpPoints) == 0 {
// break
// }
// p := tmpPoints[0]
// for k, v := range config.DefTags {
// p.AddTag(k, v)
// p.SetTime(t)
// }
// sink.Write(p)
// tmpPoints = tmpPoints[1:]
// }
// }
// if err := sink.Flush(); err != nil {
// log.Printf("sink error: %s\n", err)
// }
// if clicfg["once"] == "true" {
// shutdown(&wg, config.Collectors, sink, recv, clicfg["pidfile"])
// return
// }
// }
// }
// }()
// Wait until receiving an interrupt
rcfg.Sync.Wait()