From e28c1fb30bff621c0d8118bbe3f75efa3561b80e Mon Sep 17 00:00:00 2001 From: Thomas Gruber Date: Wed, 16 Feb 2022 18:33:46 +0100 Subject: [PATCH] Ganglia sink using `libganglia.so` directly (#35) * Add sink directly using libganglia.so * Remove unneeded confuse header * add submodule init to build action * add submodule init to runonce action * add installation og ganglia to runonce * add installation of ganglia to runonce * add installation of ganglia to runonce * libconfuse not required * Remove ganglia submodule * Remove ganglia.h * Add Makefile to help creating the libganglia.so link * Fix cgo header * Rename new Ganglia sink to 'libgangliaSink' * Add documentation for libgangliaSink * Extend make buildsystem with find&symlink helper for libgangliaSink * Add metric renaming function * Add build tag 'ganglia' and create corresponding files --- .github/workflows/rpmbuild.yml | 2 + .github/workflows/runonce.yml | 12 +- .gitmodules | 4 + Makefile | 12 +- sinks/Makefile | 11 ++ sinks/README.md | 3 +- sinks/gangliaSink.go | 2 + sinks/gangliaSink_disabled.go | 31 ++++ sinks/libgangliaSink.go | 295 +++++++++++++++++++++++++++++++ sinks/libgangliaSink.md | 41 +++++ sinks/libgangliaSink_disabled.go | 29 +++ sinks/sinkManager.go | 1 + 12 files changed, 440 insertions(+), 3 deletions(-) create mode 100644 .gitmodules create mode 100644 sinks/Makefile create mode 100644 sinks/gangliaSink_disabled.go create mode 100644 sinks/libgangliaSink.go create mode 100644 sinks/libgangliaSink.md create mode 100644 sinks/libgangliaSink_disabled.go diff --git a/.github/workflows/rpmbuild.yml b/.github/workflows/rpmbuild.yml index 8d16e37..d9220a7 100644 --- a/.github/workflows/rpmbuild.yml +++ b/.github/workflows/rpmbuild.yml @@ -9,6 +9,8 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 + with: + submodules: recursive - uses: TomTheBear/rpmbuild@master id: rpm name: Build RPM package on CentOS8 diff --git a/.github/workflows/runonce.yml b/.github/workflows/runonce.yml index 2a2cc8a..da5b86c 100644 --- a/.github/workflows/runonce.yml +++ b/.github/workflows/runonce.yml @@ -6,6 +6,8 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 + with: + submodules: recursive # See: https://github.com/marketplace/actions/setup-go-environment - name: Setup Golang @@ -13,6 +15,9 @@ jobs: with: go-version: '^1.17.6' + - name: Setup Ganglia + run: sudo apt install ganglia-monitor libganglia1 + - name: Build MetricCollector run: make @@ -22,6 +27,8 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 + with: + submodules: recursive # See: https://github.com/marketplace/actions/setup-go-environment - name: Setup Golang @@ -29,8 +36,11 @@ jobs: with: go-version: '^1.16.7' # The version AlmaLinux 8.5 uses + - name: Setup Ganglia + run: sudo apt install ganglia-monitor libganglia1 + - name: Build MetricCollector run: make - - name: Run MetricCollector + - name: Run MetricCollectorlibganglia1 run: ./cc-metric-collector --once --config .github/ci-config.json diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..ef3fc5c --- /dev/null +++ b/.gitmodules @@ -0,0 +1,4 @@ +[submodule ".github/actions/rpmbuild-centos8-golang"] + path = .github/actions/rpmbuild-centos8-golang + url = https://github.com/naveenrajm7/rpmbuild.git + branch = centos8 diff --git a/Makefile b/Makefile index c9805eb..33fd515 100644 --- a/Makefile +++ b/Makefile @@ -5,6 +5,15 @@ GOSRC_SINKS := $(wildcard sinks/*.go) GOSRC_RECEIVERS := $(wildcard receivers/*.go) GOSRC_INTERNAL := $(wildcard internal/*/*.go) GOSRC := $(GOSRC_APP) $(GOSRC_COLLECTORS) $(GOSRC_SINKS) $(GOSRC_RECEIVERS) $(GOSRC_INTERNAL) +COMPONENT_DIRS := collectors \ + sinks \ + receivers \ + internal/metricRouter \ + internal/ccMetric \ + internal/metricAggregator \ + internal/ccLogger \ + internal/ccTopology \ + internal/multiChanTicker .PHONY: all @@ -12,12 +21,13 @@ all: $(APP) $(APP): $(GOSRC) make -C collectors + make -C sinks go get go build -o $(APP) $(GOSRC_APP) .PHONY: clean clean: - make -C collectors clean + @for COMP in $(COMPONENT_DIRS); do if [ -e $$COMP/Makefile ]; then make -C $$COMP clean; fi; done rm -f $(APP) .PHONY: fmt diff --git a/sinks/Makefile b/sinks/Makefile new file mode 100644 index 0000000..bc0c09d --- /dev/null +++ b/sinks/Makefile @@ -0,0 +1,11 @@ + +all: libganglia.so + +libganglia.so: + @find /usr -name "libganglia.so*" -exec ln -s {} libganglia.so \; + + +clean: + rm -f libganglia.so + +.PHONY: clean diff --git a/sinks/README.md b/sinks/README.md index 1690df9..8ff3743 100644 --- a/sinks/README.md +++ b/sinks/README.md @@ -7,7 +7,8 @@ This folder contains the SinkManager and sink implementations for the cc-metric- - [`http`](./httpSink.md): Send metrics to an HTTP server as POST requests - [`influxdb`](./influxSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database - [`nats`](./natsSink.md): Publish metrics to the [NATS](https://nats.io/) network overlay system -- [`ganglia`](./gangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) +- [`ganglia`](./gangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) using the `gmetric` CLI tool +- [`libganglia`](./libgangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) directly using `libganglia.so` # Configuration diff --git a/sinks/gangliaSink.go b/sinks/gangliaSink.go index c53b11a..4a57d20 100644 --- a/sinks/gangliaSink.go +++ b/sinks/gangliaSink.go @@ -1,3 +1,5 @@ +//go:build ganglia + package sinks import ( diff --git a/sinks/gangliaSink_disabled.go b/sinks/gangliaSink_disabled.go new file mode 100644 index 0000000..84156d1 --- /dev/null +++ b/sinks/gangliaSink_disabled.go @@ -0,0 +1,31 @@ +//go:build !ganglia + +package sinks + +import ( + "encoding/json" + "errors" + + // "time" + + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" +) + +type GangliaSink struct { + sink +} + +func (s *GangliaSink) Init(config json.RawMessage) error { + return errors.New("sink 'ganglia' not implemented, rebuild with tag 'ganglia'") +} + +func (s *GangliaSink) Write(point lp.CCMetric) error { + return errors.New("sink 'ganglia' not implemented, rebuild with tag 'ganglia'") +} + +func (s *GangliaSink) Flush() error { + return errors.New("sink 'ganglia' not implemented, rebuild with tag 'ganglia'") +} + +func (s *GangliaSink) Close() { +} diff --git a/sinks/libgangliaSink.go b/sinks/libgangliaSink.go new file mode 100644 index 0000000..7e03494 --- /dev/null +++ b/sinks/libgangliaSink.go @@ -0,0 +1,295 @@ +//go:build ganglia + +package sinks + +/* +#cgo CFLAGS: -DGM_PROTOCOL_GUARD +#cgo LDFLAGS: -L. -lganglia +#include + +// This is a copy&paste snippet of ganglia.h (BSD-3 license) +// See https://github.com/ganglia/monitor-core +// for further information + +enum ganglia_slope { + GANGLIA_SLOPE_ZERO = 0, + GANGLIA_SLOPE_POSITIVE, + GANGLIA_SLOPE_NEGATIVE, + GANGLIA_SLOPE_BOTH, + GANGLIA_SLOPE_UNSPECIFIED, + GANGLIA_SLOPE_DERIVATIVE, + GANGLIA_SLOPE_LAST_LEGAL_VALUE=GANGLIA_SLOPE_DERIVATIVE +}; +typedef enum ganglia_slope ganglia_slope_t; + +typedef struct Ganglia_pool* Ganglia_pool; +typedef struct Ganglia_gmond_config* Ganglia_gmond_config; +typedef struct Ganglia_udp_send_channels* Ganglia_udp_send_channels; + +struct Ganglia_metric { + Ganglia_pool pool; + struct Ganglia_metadata_message *msg; + char *value; + void *extra; +}; +typedef struct Ganglia_metric * Ganglia_metric; + +#ifdef __cplusplus +extern "C" { +#endif + +Ganglia_gmond_config Ganglia_gmond_config_create(char *path, int fallback_to_default); +//void Ganglia_gmond_config_destroy(Ganglia_gmond_config config); + +Ganglia_udp_send_channels Ganglia_udp_send_channels_create(Ganglia_pool p, Ganglia_gmond_config config); +void Ganglia_udp_send_channels_destroy(Ganglia_udp_send_channels channels); + +int Ganglia_udp_send_message(Ganglia_udp_send_channels channels, char *buf, int len ); + +Ganglia_metric Ganglia_metric_create( Ganglia_pool parent_pool ); +int Ganglia_metric_set( Ganglia_metric gmetric, char *name, char *value, char *type, char *units, unsigned int slope, unsigned int tmax, unsigned int dmax); +int Ganglia_metric_send( Ganglia_metric gmetric, Ganglia_udp_send_channels send_channels ); +//int Ganglia_metadata_send( Ganglia_metric gmetric, Ganglia_udp_send_channels send_channels ); +//int Ganglia_metadata_send_real( Ganglia_metric gmetric, Ganglia_udp_send_channels send_channels, char *override_string ); +void Ganglia_metadata_add( Ganglia_metric gmetric, char *name, char *value ); +//int Ganglia_value_send( Ganglia_metric gmetric, Ganglia_udp_send_channels send_channels ); +void Ganglia_metric_destroy( Ganglia_metric gmetric ); + +Ganglia_pool Ganglia_pool_create( Ganglia_pool parent ); +void Ganglia_pool_destroy( Ganglia_pool pool ); + +//ganglia_slope_t cstr_to_slope(const char* str); +//const char* slope_to_cstr(unsigned int slope); + +#ifdef __cplusplus +} +#endif +*/ +import "C" + +import ( + "encoding/json" + "errors" + "fmt" + "strings" + "unsafe" + + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" +) + +const GMOND_CONFIG_FILE = `/var/ganglia/gmond.conf` + +type LibgangliaSinkConfig struct { + defaultSinkConfig + GmondConfig string `json:"gmond_config,omitempty"` + AddGangliaGroup bool `json:"add_ganglia_group,omitempty"` + //AddTagsAsDesc bool `json:"add_tags_as_desc,omitempty"` + AddTypeToName bool `json:"add_type_to_name,omitempty"` + AddUnits bool `json:"add_units,omitempty"` + ClusterName string `json:"cluster_name,omitempty"` +} + +type LibgangliaSink struct { + sink + config LibgangliaSinkConfig + global_context C.Ganglia_pool + gmond_config C.Ganglia_gmond_config + send_channels C.Ganglia_udp_send_channels + cstrCache map[string]*C.char +} + +func gangliaMetricName(point lp.CCMetric) string { + name := point.Name() + metricType, typeOK := point.GetTag("type") + metricTid, tidOk := point.GetTag("type-id") + gangliaType := metricType + metricTid + if strings.Contains(name, metricType) && tidOk { + name = strings.Replace(name, metricType, gangliaType, -1) + } else if typeOK && tidOk { + name = metricType + metricTid + "_" + name + } else if point.HasTag("device") { + device, _ := point.GetTag("device") + name = name + "_" + device + } + + return name +} + +func (s *LibgangliaSink) Init(config json.RawMessage) error { + var err error = nil + s.name = "LibgangliaSink" + //s.config.AddTagsAsDesc = false + s.config.AddGangliaGroup = false + s.config.AddTypeToName = false + s.config.AddUnits = true + s.config.GmondConfig = string(GMOND_CONFIG_FILE) + if len(config) > 0 { + err = json.Unmarshal(config, &s.config) + if err != nil { + fmt.Println(s.name, "Error reading config for", s.name, ":", err.Error()) + return err + } + } + + // Set up cache for the C strings + s.cstrCache = make(map[string]*C.char) + // s.cstrCache["globals"] = C.CString("globals") + + // s.cstrCache["override_hostname"] = C.CString("override_hostname") + // s.cstrCache["override_ip"] = C.CString("override_ip") + + // Add some constant strings + s.cstrCache["GROUP"] = C.CString("GROUP") + s.cstrCache["CLUSTER"] = C.CString("CLUSTER") + s.cstrCache[""] = C.CString("") + + // Add cluster name for lookup in Write() + if len(s.config.ClusterName) > 0 { + s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName) + } + // Add supported types for later lookup in Write() + s.cstrCache["double"] = C.CString("double") + s.cstrCache["int32"] = C.CString("int32") + s.cstrCache["string"] = C.CString("string") + + // Create Ganglia pool + s.global_context = C.Ganglia_pool_create(nil) + // Load Ganglia configuration + s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig) + s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0) + //globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"]) + //override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"]) + //override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"]) + + s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config) + return nil +} + +func (s *LibgangliaSink) Write(point lp.CCMetric) error { + var err error = nil + var c_name *C.char + var c_value *C.char + var c_type *C.char + var c_unit *C.char + + // helper function for looking up C strings in the cache + lookup := func(key string) *C.char { + if _, exist := s.cstrCache[key]; !exist { + s.cstrCache[key] = C.CString(key) + } + return s.cstrCache[key] + } + + // Get metric name + if s.config.AddTypeToName { + c_name = lookup(gangliaMetricName(point)) + } else { + c_name = lookup(point.Name()) + } + + // Get the value C string and lookup the type string in the cache + value, ok := point.GetField("value") + if !ok { + return fmt.Errorf("metric %s has no 'value' field", point.Name()) + } + switch real := value.(type) { + case float64: + c_value = C.CString(fmt.Sprintf("%f", real)) + c_type = lookup("double") + case float32: + c_value = C.CString(fmt.Sprintf("%f", real)) + c_type = lookup("float") + case int64: + c_value = C.CString(fmt.Sprintf("%d", real)) + c_type = lookup("int32") + case int32: + c_value = C.CString(fmt.Sprintf("%d", real)) + c_type = lookup("int32") + case int: + c_value = C.CString(fmt.Sprintf("%d", real)) + c_type = lookup("int32") + case string: + c_value = C.CString(real) + c_type = lookup("string") + default: + return fmt.Errorf("metric %s has invalid 'value' type for %s", point.Name(), s.name) + } + + // Add unit + if s.config.AddUnits { + if tagunit, tagok := point.GetTag("unit"); tagok { + c_unit = lookup(tagunit) + } else if metaunit, metaok := point.GetMeta("unit"); metaok { + c_unit = lookup(metaunit) + } else { + c_unit = lookup("") + } + } else { + c_unit = lookup("") + } + + // Create a new Ganglia metric + gmetric := C.Ganglia_metric_create(s.global_context) + rval := C.int(0) + // Set name, value, type and unit in the Ganglia metric + // Since we don't have this information from the collectors, + // we assume that the metric value can go up and down (slope), + // and their is no maximum for 'dmax' and 'tmax' + rval = C.Ganglia_metric_set(gmetric, c_name, c_value, c_type, c_unit, C.GANGLIA_SLOPE_BOTH, 0, 0) + switch rval { + case 1: + C.free(unsafe.Pointer(c_value)) + return errors.New("invalid parameters") + case 2: + C.free(unsafe.Pointer(c_value)) + return errors.New("one of your parameters has an invalid character '\"'") + case 3: + C.free(unsafe.Pointer(c_value)) + return fmt.Errorf("the type parameter \"%s\" is not a valid type", C.GoString(c_type)) + case 4: + C.free(unsafe.Pointer(c_value)) + return fmt.Errorf("the value parameter \"%s\" does not represent a number", C.GoString(c_value)) + default: + } + + // Set the cluster name, otherwise it takes it from the configuration file + if len(s.config.ClusterName) > 0 { + C.Ganglia_metadata_add(gmetric, lookup("CLUSTER"), lookup(s.config.ClusterName)) + } + // Set the group metadata in the Ganglia metric if configured + if group, ok := point.GetMeta("group"); ok && s.config.AddGangliaGroup { + c_group := lookup(group) + C.Ganglia_metadata_add(gmetric, lookup("GROUP"), c_group) + } + + // Now we send the metric + // gmetric does provide some more options like description and other options + // but they are not provided by the collectors + rval = C.Ganglia_metric_send(gmetric, s.send_channels) + if rval != 0 { + err = fmt.Errorf("there was an error sending metric %s to %d of the send channels ", point.Name(), rval) + // fall throuph to use Ganglia_metric_destroy from common cleanup + } + // Cleanup Ganglia metric + C.Ganglia_metric_destroy(gmetric) + // Free the value C string, the only one not stored in the cache + C.free(unsafe.Pointer(c_value)) + return err +} + +func (s *LibgangliaSink) Flush() error { + return nil +} + +func (s *LibgangliaSink) Close() { + // Destroy Ganglia configration struct + // (not done by gmetric, I thought I am more clever but no...) + //C.Ganglia_gmond_config_destroy(s.gmond_config) + // Destroy Ganglia pool + C.Ganglia_pool_destroy(s.global_context) + + // Cleanup C string cache + for _, cstr := range s.cstrCache { + C.free(unsafe.Pointer(cstr)) + } +} diff --git a/sinks/libgangliaSink.md b/sinks/libgangliaSink.md new file mode 100644 index 0000000..a0dede7 --- /dev/null +++ b/sinks/libgangliaSink.md @@ -0,0 +1,41 @@ +## `libganglia` sink + +The `libganglia` sink interacts directly with the library of the [Ganglia Monitoring System](http://ganglia.info/) to submit the metrics. Consequently, it needs to be installed on all nodes. But this is commonly the case if you want to use Ganglia, because it requires at least a node daemon (`gmond` or `ganglia-monitor`) to work. + +The `libganglia` sink has probably less overhead compared to the `ganglia` sink because it does not require any process generation but initializes the environment and UDP connections only once. + + +### Configuration structure + +```json +{ + "": { + "type": "libganglia", + "gmetric_config" : "/path/to/gmetric/config", + "cluster_name": "MyCluster", + "add_ganglia_group" : true, + "add_type_to_name": true, + "add_units" : true + } +} +``` + +- `type`: makes the sink an `libganglia` sink +- `meta_as_tags`: print all meta information as tags in the output (optional) +- `gmond_config`: Path to the Ganglia configuration file `gmond.conf` (default: `/etc/ganglia/gmond.conf`) +- `cluster_name`: Set a cluster name for the metric. If not set, it is taken from `gmond_config` +- `add_ganglia_group`: Add a Ganglia metric group based on meta information. Some old versions of `gmetric` do not support the `--group` option +- `add_type_to_name`: Ganglia commonly uses only node-level metrics but with cc-metric-collector, there are metrics for cpus, memory domains, CPU sockets and the whole node. In order to get eeng, this option prefixes the metric name with `_` or `device_` depending on the metric tags and meta information. For metrics of the whole node `type=node`, no prefix is added +- `add_units`: Add metric value unit if there is a `unit` entry in the metric tags or meta information + +### Ganglia Installation + +My development system is Ubuntu 20.04. To install the required libraries with `apt`: + +``` +$ sudo apt install libganglia1 +``` + +The `libganglia.so` gets installed in `/usr/lib`. The Ganglia headers `libganglia1-dev` are **not** required. + +I added a `Makefile` in the `sinks` subfolder that searches for the library in `/usr` and creates a symlink (`sinks/libganglia.so`) for running/building the cc-metric-collector. So just type `make` before running/building in the main folder or the `sinks` subfolder. \ No newline at end of file diff --git a/sinks/libgangliaSink_disabled.go b/sinks/libgangliaSink_disabled.go new file mode 100644 index 0000000..87ee75c --- /dev/null +++ b/sinks/libgangliaSink_disabled.go @@ -0,0 +1,29 @@ +//go:build !ganglia + +package sinks + +import ( + "encoding/json" + "errors" + + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" +) + +type LibgangliaSink struct { + sink +} + +func (s *LibgangliaSink) Init(config json.RawMessage) error { + return errors.New("sink 'libganglia' not implemented, rebuild with tag 'ganglia'") +} + +func (s *LibgangliaSink) Write(point lp.CCMetric) error { + return errors.New("sink 'libganglia' not implemented, rebuild with tag 'ganglia'") +} + +func (s *LibgangliaSink) Flush() error { + return errors.New("sink 'ganglia' not implemented, rebuild with tag 'ganglia'") +} + +func (s *LibgangliaSink) Close() { +} diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index bd243f4..487e7ca 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -20,6 +20,7 @@ var AvailableSinks = map[string]Sink{ "http": new(HttpSink), "ganglia": new(GangliaSink), "influxasync": new(InfluxAsyncSink), + "libganglia": new(LibgangliaSink), } // Metric collector manager data structure