cc-metric-collector/sinks/gangliaSink.go

170 lines
4.4 KiB
Go
Raw Normal View History

package sinks
import (
"encoding/json"
"errors"
"fmt"
"strings"
2022-01-30 15:25:57 +01:00
// "time"
"os/exec"
2022-01-30 15:25:57 +01:00
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
2022-01-30 15:25:57 +01:00
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)
const GMETRIC_EXEC = `gmetric`
2022-02-08 18:04:08 +01:00
const GMETRIC_CONFIG = `/etc/ganglia/gmond.conf`
type GangliaSinkConfig struct {
defaultSinkConfig
2022-02-08 18:04:08 +01:00
GmetricPath string `json:"gmetric_path,omitempty"`
GmetricConfig string `json:"gmetric_config,omitempty"`
AddGangliaGroup bool `json:"add_ganglia_group,omitempty"`
AddTagsAsDesc bool `json:"add_tags_as_desc,omitempty"`
ClusterName string `json:"cluster_name,omitempty"`
AddTypeToName bool `json:"add_type_to_name,omitempty"`
}
type GangliaSink struct {
sink
2022-02-08 18:04:08 +01:00
gmetric_path string
gmetric_config string
config GangliaSinkConfig
}
2022-01-30 15:25:57 +01:00
func (s *GangliaSink) Write(point lp.CCMetric) error {
var err error = nil
var tagsstr []string
var argstr []string
if s.config.AddGangliaGroup {
if point.HasTag("group") {
g, _ := point.GetTag("group")
argstr = append(argstr, fmt.Sprintf("--group=%s", g))
} else if point.HasMeta("group") {
g, _ := point.GetMeta("group")
argstr = append(argstr, fmt.Sprintf("--group=%s", g))
}
}
for key, value := range point.Tags() {
switch key {
2022-01-30 15:25:57 +01:00
case "unit":
argstr = append(argstr, fmt.Sprintf("--units=%s", value))
2022-01-30 15:25:57 +01:00
default:
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value))
2022-01-30 15:25:57 +01:00
}
}
if s.config.MetaAsTags {
for key, value := range point.Meta() {
switch key {
case "unit":
argstr = append(argstr, fmt.Sprintf("--units=%s", value))
default:
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value))
}
}
}
if len(s.config.ClusterName) > 0 {
argstr = append(argstr, fmt.Sprintf("--cluster=%s", s.config.ClusterName))
}
if s.config.AddTagsAsDesc && len(tagsstr) > 0 {
2022-01-30 15:25:57 +01:00
argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ",")))
}
2022-02-08 18:04:08 +01:00
if len(s.gmetric_config) > 0 {
argstr = append(argstr, fmt.Sprintf("--conf=%s", s.gmetric_config))
}
name := GangliaMetricRename(point)
if s.config.AddTypeToName {
argstr = append(argstr, fmt.Sprintf("--name=%s", GangliaMetricName(point)))
} else {
argstr = append(argstr, fmt.Sprintf("--name=%s", name))
}
slope := GangliaSlopeType(point)
slopeStr := "both"
if slope == 0 {
slopeStr = "zero"
}
argstr = append(argstr, fmt.Sprintf("--slope=%s", slopeStr))
for k, v := range point.Fields() {
if k == "value" {
switch value := v.(type) {
2022-01-30 15:25:57 +01:00
case float64:
2022-02-11 15:34:12 +01:00
argstr = append(argstr,
fmt.Sprintf("--value=%v", value), "--type=double")
2022-01-30 15:25:57 +01:00
case float32:
2022-02-11 15:34:12 +01:00
argstr = append(argstr,
fmt.Sprintf("--value=%v", value), "--type=float")
2022-01-30 15:25:57 +01:00
case int:
2022-02-11 15:34:12 +01:00
argstr = append(argstr,
fmt.Sprintf("--value=%d", value), "--type=int32")
case int32:
argstr = append(argstr,
fmt.Sprintf("--value=%d", value), "--type=int32")
2022-01-30 15:25:57 +01:00
case int64:
2022-02-11 15:34:12 +01:00
argstr = append(argstr,
fmt.Sprintf("--value=%d", value), "--type=int32")
case uint:
argstr = append(argstr,
fmt.Sprintf("--value=%d", value), "--type=uint32")
case uint32:
argstr = append(argstr,
fmt.Sprintf("--value=%d", value), "--type=uint32")
case uint64:
argstr = append(argstr,
fmt.Sprintf("--value=%d", value), "--type=uint32")
2022-01-30 15:25:57 +01:00
case string:
2022-02-11 15:34:12 +01:00
argstr = append(argstr,
fmt.Sprintf("--value=%q", value), "--type=string")
2022-01-30 15:25:57 +01:00
}
}
}
2022-02-08 18:04:08 +01:00
command := exec.Command(s.gmetric_path, argstr...)
command.Wait()
_, err = command.Output()
return err
}
func (s *GangliaSink) Flush() error {
return nil
}
func (s *GangliaSink) Close() {
}
func NewGangliaSink(name string, config json.RawMessage) (Sink, error) {
s := new(GangliaSink)
s.name = fmt.Sprintf("GangliaSink(%s)", name)
s.config.AddTagsAsDesc = false
s.config.AddGangliaGroup = false
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
return nil, err
}
}
s.gmetric_path = ""
s.gmetric_config = ""
if len(s.config.GmetricPath) > 0 {
p, err := exec.LookPath(s.config.GmetricPath)
if err == nil {
s.gmetric_path = p
}
}
if len(s.gmetric_path) == 0 {
p, err := exec.LookPath(string(GMETRIC_EXEC))
if err == nil {
s.gmetric_path = p
}
}
if len(s.gmetric_path) == 0 {
return nil, errors.New("cannot find executable 'gmetric'")
}
if len(s.config.GmetricConfig) > 0 {
s.gmetric_config = s.config.GmetricConfig
}
return s, nil
}