Add sink for InfluxDB (with the original InfluxDB client)

This commit is contained in:
Thomas Roehl
2021-03-26 16:48:09 +01:00
parent e92f54b411
commit f822f00cdc
6 changed files with 153 additions and 27 deletions

View File

@@ -4,7 +4,7 @@ import (
"encoding/json"
"fmt"
"github.com/ClusterCockpit/cc-metric-collector/collectors"
protocol "github.com/influxdata/line-protocol"
"github.com/ClusterCockpit/cc-metric-collector/sinks"
"log"
"os"
"os/signal"
@@ -24,8 +24,9 @@ var Collectors = map[string]collectors.MetricGetter{
"lustrestat": &collectors.LustreCollector{},
}
// Pointer to the InfluxDB line protocol encoder
var serializer *protocol.Encoder
var Sinks = map[string]sinks.SinkFuncs{
"influxdb": &sinks.InfluxSink{},
}
// Structure of the configuration file
type GlobalConfig struct {
@@ -34,6 +35,8 @@ type GlobalConfig struct {
Password string `json:"password"`
Host string `json:"host"`
Port string `json:"port"`
Database string `json:"database"`
Type string `json:"type"`
} `json:"sink"`
Interval int `json:"interval"`
Duration int `json:"duration"`
@@ -54,7 +57,7 @@ func LoadConfiguration(file string, config *GlobalConfig) error {
// Register an interrupt handler for Ctrl+C and similar. At signal,
// all collectors are closed
func shutdown(wg *sync.WaitGroup, config *GlobalConfig) {
func shutdown(wg *sync.WaitGroup, config *GlobalConfig, sink sinks.SinkFuncs) {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt)
@@ -67,22 +70,11 @@ func shutdown(wg *sync.WaitGroup, config *GlobalConfig) {
col.Close()
}
time.Sleep(1 * time.Second)
sink.Close()
wg.Done()
}(wg)
}
// Create a new measurement in InfluxDB line protocol
func setupProtocol(scope string, tags map[string]string, fields map[string]interface{}, t time.Time) {
cur, err := protocol.New(scope, tags, fields, t)
if err != nil {
log.Print(err)
}
_, err = serializer.Encode(cur)
if err != nil {
log.Print(err)
}
}
func main() {
var config GlobalConfig
var wg sync.WaitGroup
@@ -117,13 +109,19 @@ func main() {
return
}
}
// Register interrupt handler
shutdown(&wg, &config)
if _, found := Sinks[config.Sink.Type]; !found {
log.Print("Invalid sink type '", config.Sink.Type, "' in configuration")
return
}
// Setup sink
sink := Sinks[config.Sink.Type]
err = sink.Init(config.Sink.Host, config.Sink.Port, config.Sink.User, config.Sink.Password, config.Sink.Database)
if err != nil {
return
}
// Setup InfluxDB line protocol encoder
serializer = protocol.NewEncoder(os.Stdout)
serializer.SetPrecision(time.Second)
serializer.SetMaxLineBytes(1024)
// Register interrupt handler
shutdown(&wg, &config, sink)
// Initialize all collectors
for _, c := range config.Collectors {
@@ -186,20 +184,21 @@ func main() {
}
}
}
// Send out node metrics
setupProtocol("node", map[string]string{"host": host}, nodeFields, t)
sink.Write("node", map[string]string{"host": host}, nodeFields, t)
// Send out socket metrics (if any)
if scount > 0 {
for sid, socket := range socketsFields {
setupProtocol("socket", map[string]string{"socket": fmt.Sprintf("%d", sid), "host": host}, socket, t)
sink.Write("socket", map[string]string{"socket": fmt.Sprintf("%d", sid), "host": host}, socket, t)
}
}
// Send out CPU metrics (if any)
if ccount > 0 {
for cid, cpu := range cpuFields {
setupProtocol("cpu", map[string]string{"cpu": fmt.Sprintf("%d", cid), "host": host}, cpu, t)
sink.Write("cpu", map[string]string{"cpu": fmt.Sprintf("%d", cid), "host": host}, cpu, t)
}
}
}