diff --git a/sinks/prometheusSink.go b/sinks/prometheusSink.go new file mode 100644 index 0000000..72eea92 --- /dev/null +++ b/sinks/prometheusSink.go @@ -0,0 +1,223 @@ +package sinks + +import ( + "errors" + "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "log" + "net/http" + "strings" + "sync" + "time" + "sort" +) + +var ( + socket_labels = []string{"socket"} + cpu_labels = []string{"cpu"} +) + +type PrometheusCollector struct { + sync.Mutex + node map[string]*prometheus.GaugeVec + sockets map[string]*prometheus.GaugeVec + cpus map[string]*prometheus.GaugeVec +} + +type PrometheusSink struct { + Sink + listening bool + col *PrometheusCollector +} + +func NewPrometheusCollector() *PrometheusCollector { + return &PrometheusCollector{ + node: make(map[string]*prometheus.GaugeVec), + sockets: make(map[string]*prometheus.GaugeVec), + cpus: make(map[string]*prometheus.GaugeVec), + } +} + +func (c *PrometheusCollector) check_metrics(measurement string, tags map[string]string, fields map[string]interface{}) error { + labels := make([]string, 0) + for k, _ := range tags { + labels = append(labels, k) + } + sort.Strings(labels) + switch measurement { + case "node": + for k, v := range fields { + switch v.(type) { + case float64: + default: + continue + } + if _, found := c.node[k]; !found { + log.Print("Adding node metric ", k) + c.node[k] = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: measurement, + Name: k, + Help: k, + }, + labels, + ) + } + } + case "socket": + for k, v := range fields { + switch v.(type) { + case float64: + default: + continue + } + if _, found := c.sockets[k]; !found { + log.Print("Adding socket metric ", k) + c.sockets[k] = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: measurement, + Name: k, + Help: k, + }, + labels, + ) + } + } + case "cpu": + for k, v := range fields { + switch v.(type) { + case float64: + default: + continue + } + if _, found := c.cpus[k]; !found { + log.Print("Adding cpu metric ", k) + c.cpus[k] = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: measurement, + Name: k, + Help: k, + }, + labels, + ) + } + } + } + return nil +} + +func (c *PrometheusCollector) update_metrics(measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error { + tagkeys := make([]string, 0) + for k, _ := range tags { + tagkeys = append(tagkeys, k) + } + sort.Strings(tagkeys) + labels := make([]string, 0) + for _, k := range tagkeys { + labels = append(labels, tags[k]) + } + switch measurement { + case "node": + for k, v := range fields { + switch v.(type) { + case float64: + default: + continue + } + log.Print("Setting node metric ", k, " (", strings.Join(labels, ","), "): ", v.(float64)) + c.node[k].WithLabelValues(labels...).Set(v.(float64)) + } + case "socket": + for k, v := range fields { + switch v.(type) { + case float64: + default: + continue + } + log.Print("Setting socket metric ", k, " (", strings.Join(labels, ","), "): ", v.(float64)) + c.sockets[k].WithLabelValues(labels...).Set(v.(float64)) + } + case "cpu": + for k, v := range fields { + switch v.(type) { + case float64: + default: + continue + } + log.Print("Setting cpu metric ", k, " (", strings.Join(labels, ","), "): ", v.(float64)) + c.cpus[k].WithLabelValues(labels...).Set(v.(float64)) + } + } + return nil +} + +func (c *PrometheusCollector) Describe(ch chan<- *prometheus.Desc) { + for k, _ := range c.node { + c.node[k].Describe(ch) + } + for k, _ := range c.sockets { + c.sockets[k].Describe(ch) + } + for k, _ := range c.cpus { + c.cpus[k].Describe(ch) + } +} + +func (c *PrometheusCollector) Collect(ch chan<- prometheus.Metric) { + c.Lock() + defer c.Unlock() + + for k, _ := range c.node { + c.node[k].Collect(ch) + } + for k, _ := range c.sockets { + c.sockets[k].Collect(ch) + } + for k, _ := range c.cpus { + c.cpus[k].Collect(ch) + } +} + +func (s *PrometheusSink) Init(config SinkConfig) error { + if len(config.Port) == 0 { + return errors.New("Not all configuration variables set required by PrometheusSink") + } + s.host = config.Host + s.port = config.Port + s.database = config.Database + s.organization = config.Organization + s.user = config.User + s.password = config.Password + s.listening = false + s.col = NewPrometheusCollector() + log.Print("Init Prometheus HTTP") + return nil +} + +func (s *PrometheusSink) Write(measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) error { + err := s.col.check_metrics(measurement, tags, fields) + if err != nil { + log.Print(err) + } + err = s.col.update_metrics(measurement, tags, fields, t) + if err != nil { + log.Print(err) + } + if !s.listening { + go func() { + prometheus.MustRegister(s.col) + addr := fmt.Sprintf("%s:%s", s.host, s.port) + err = http.ListenAndServe(addr, promhttp.Handler()) + if err != nil { + log.Fatal(err) + } + }() + s.listening = true + } + return err +} + +func (s *PrometheusSink) Close() { + log.Print("Closing Prometheus HTTP") +}