Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop

This commit is contained in:
Thomas Roehl 2022-02-24 18:28:52 +01:00
commit 87ecb12c6f
24 changed files with 858 additions and 578 deletions

View File

@ -5,46 +5,6 @@ on:
- '**' - '**'
jobs: jobs:
build-centos8:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
submodules: recursive
- uses: TomTheBear/rpmbuild@master
id: rpm
name: Build RPM package on CentOS8
with:
spec_file: "./scripts/cc-metric-collector.spec"
- name: Save RPM as artifact
uses: actions/upload-artifact@v1.0.0
with:
name: cc-metric-collector RPM CentOS8
path: ${{ steps.rpm.outputs.rpm_dir_path }}
- name: Save SRPM as artifact
uses: actions/upload-artifact@v1.0.0
with:
name: cc-metric-collector SRPM CentOS8
path: ${{ steps.rpm.outputs.source_rpm_path }}
build-centos-latest:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: TomTheBear/rpmbuild@centos_latest
id: rpm
name: Build RPM package on CentOS 'Latest'
with:
spec_file: "./scripts/cc-metric-collector.spec"
- name: Save RPM as artifact
uses: actions/upload-artifact@v1.0.0
with:
name: cc-metric-collector RPM CentOS 'Latest'
path: ${{ steps.rpm.outputs.rpm_dir_path }}
- name: Save SRPM as artifact
uses: actions/upload-artifact@v1.0.0
with:
name: cc-metric-collector SRPM CentOS 'Latest'
path: ${{ steps.rpm.outputs.source_rpm_path }}
build-alma-8_5: build-alma-8_5:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
@ -58,9 +18,41 @@ jobs:
uses: actions/upload-artifact@v1.0.0 uses: actions/upload-artifact@v1.0.0
with: with:
name: cc-metric-collector RPM AlmaLinux 8.5 name: cc-metric-collector RPM AlmaLinux 8.5
path: ${{ steps.rpm.outputs.rpm_dir_path }} path: ${{ steps.rpm.outputs.rpm_path }}
- name: Save SRPM as artifact - name: Save SRPM as artifact
uses: actions/upload-artifact@v1.0.0 uses: actions/upload-artifact@v1.0.0
with: with:
name: cc-metric-collector SRPM AlmaLinux 8.5 name: cc-metric-collector SRPM AlmaLinux 8.5
path: ${{ steps.rpm.outputs.source_rpm_path }} path: ${{ steps.rpm.outputs.source_rpm_path }}
- name: Release
uses: softprops/action-gh-release@v1
with:
name: cc-metric-collector-${{github.ref_name}}
files: |
${{ steps.rpm.outputs.source_rpm_path }}
${{ steps.rpm.outputs.rpm_path }}
build-rhel-ubi8:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: TomTheBear/rpmbuild@rh-ubi8
id: rpm
name: Build RPM package on Red Hat Universal Base Image 8
with:
spec_file: "./scripts/cc-metric-collector.spec"
- name: Save RPM as artifact
uses: actions/upload-artifact@v1.0.0
with:
name: cc-metric-collector RPM Red Hat Universal Base Image 8
path: ${{ steps.rpm.outputs.rpm_path }}
- name: Save SRPM as artifact
uses: actions/upload-artifact@v1.0.0
with:
name: cc-metric-collector SRPM Red Hat Universal Base Image 8
path: ${{ steps.rpm.outputs.source_rpm_path }}
- name: Release
uses: softprops/action-gh-release@v1
with:
files: |
${{ steps.rpm.outputs.source_rpm_path }}
${{ steps.rpm.outputs.rpm_path }}

View File

@ -21,13 +21,18 @@ all: $(APP)
$(APP): $(GOSRC) $(APP): $(GOSRC)
make -C collectors make -C collectors
make -C sinks
go get go get
go build -o $(APP) $(GOSRC_APP) go build -o $(APP) $(GOSRC_APP)
.PHONY: clean .PHONY: clean
.ONESHELL:
clean: clean:
@for COMP in $(COMPONENT_DIRS); do if [ -e $$COMP/Makefile ]; then make -C $$COMP clean; fi; done @for COMP in $(COMPONENT_DIRS)
do
if [[ -e $$COMP/Makefile ]]; then
make -C $$COMP clean
fi
done
rm -f $(APP) rm -f $(APP)
.PHONY: fmt .PHONY: fmt

View File

@ -40,15 +40,9 @@ See the component READMEs for their configuration:
$ git clone git@github.com:ClusterCockpit/cc-metric-collector.git $ git clone git@github.com:ClusterCockpit/cc-metric-collector.git
$ make (downloads LIKWID, builds it as static library with 'direct' accessmode and copies all required files for the collector) $ make (downloads LIKWID, builds it as static library with 'direct' accessmode and copies all required files for the collector)
$ go get (requires at least golang 1.16) $ go get (requires at least golang 1.16)
$ make tags $ make
Available tags:
ganglia
[...]
$ make # calls go build (-tags ganglia,...) -o cc-metric-collector
``` ```
## `ganglia` build tag
If you want support for the [Ganglia Monitoring System](http://ganglia.info/), you have to add `-tags ganglia` to the build command line. This enables two metric sinks. One is using the command line application `gmetric` (see [`ganglia`](./sinks/gangliaSink.md) sink), the other one interacts directly with `libganglia` the main Ganglia library that is commonly installed on each compute node (see [`libganglia`](./sinks/libgangliaSink.md) sink). The later one requires configuration before building, so use `make` instead of `go build` directly.
# Running # Running
@ -62,7 +56,41 @@ Usage of metric-collector:
-once -once
Run all collectors only once Run all collectors only once
``` ```
# Scenarios
The metric collector was designed with flexibility in mind, so it can be used in many scenarios. Here are a few:
```mermaid
flowchart TD
subgraph a ["Cluster A"]
nodeA[NodeA with CC collector]
nodeB[NodeB with CC collector]
nodeC[NodeC with CC collector]
end
a --> db[(Database)]
db <--> ccweb("Webfrontend")
```
``` mermaid
flowchart TD
subgraph a [ClusterA]
direction LR
nodeA[NodeA with CC collector]
nodeB[NodeB with CC collector]
nodeC[NodeC with CC collector]
end
subgraph b [ClusterB]
direction LR
nodeD[NodeD with CC collector]
nodeE[NodeE with CC collector]
nodeF[NodeF with CC collector]
end
a --> ccrecv{"CC collector as receiver"}
b --> ccrecv
ccrecv --> db[("Database1")]
ccrecv -.-> db2[("Database2")]
db <-.-> ccweb("Webfrontend")
```
# Contributing # Contributing
The ClusterCockpit ecosystem is designed to be used by different HPC computing centers. Since configurations and setups differ between the centers, the centers likely have to put some work into the cc-metric-collector to gather all desired metrics. The ClusterCockpit ecosystem is designed to be used by different HPC computing centers. Since configurations and setups differ between the centers, the centers likely have to put some work into the cc-metric-collector to gather all desired metrics.

View File

@ -102,7 +102,7 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
part_max_used = perc part_max_used = perc
} }
} }
y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": part_max_used}, time.Now()) y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now())
if err == nil { if err == nil {
y.AddMeta("unit", "percent") y.AddMeta("unit", "percent")
output <- y output <- y

View File

@ -4,7 +4,7 @@
The `likwid` collector is probably the most complicated collector. The LIKWID library is included as static library with *direct* access mode. The *direct* access mode is suitable if the daemon is executed by a root user. The static library does not contain the performance groups, so all information needs to be provided in the configuration. The `likwid` collector is probably the most complicated collector. The LIKWID library is included as static library with *direct* access mode. The *direct* access mode is suitable if the daemon is executed by a root user. The static library does not contain the performance groups, so all information needs to be provided in the configuration.
The `likwid` configuration consists of two parts, the "eventsets" and "globalmetrics": The `likwid` configuration consists of two parts, the "eventsets" and "globalmetrics":
- An event set list itself has two parts, the "events" and a set of derivable "metrics". Each of the "events" is a counter:event pair in LIKWID's syntax. The "metrics" are a list of formulas to derive the metric value from the measurements of the "events". Each metric has a name, the formula, a scope and a publish flag. A counter names can be used like variables in the formulas, so `PMC0+PMC1` sums the measurements for the both events configured in the counters `PMC0` and `PMC1`. The scope tells the Collector whether it is a metric for each hardware thread (`cpu`) or each CPU socket (`socket`). The last one is the publishing flag. It tells the collector whether a metric should be sent to the router. - An event set list itself has two parts, the "events" and a set of derivable "metrics". Each of the "events" is a counter:event pair in LIKWID's syntax. The "metrics" are a list of formulas to derive the metric value from the measurements of the "events". Each metric has a name, the formula, a scope and a publish flag. Counter names can be used like variables in the formulas, so `PMC0+PMC1` sums the measurements for the both events configured in the counters `PMC0` and `PMC1`. The scope tells the Collector whether it is a metric for each hardware thread (`cpu`) or each CPU socket (`socket`). The last one is the publishing flag. It tells the collector whether a metric should be sent to the router.
- The global metrics are metrics which require data from all event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a scope and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. So, the idea is to derive a metric in the "eventsets" section and reuse it in the "globalmetrics" part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics. **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases. - The global metrics are metrics which require data from all event set measurements to be derived. The inputs are the metrics in the event sets. Similar to the metrics in the event sets, the global metrics are defined by a name, a formula, a scope and a publish flag. See event set metrics for details. The only difference is that there is no access to the raw event measurements anymore but only to the metrics. So, the idea is to derive a metric in the "eventsets" section and reuse it in the "globalmetrics" part. If you need a metric only for deriving the global metrics, disable forwarding of the event set metrics. **Be aware** that the combination might be misleading because the "behavior" of a metric changes over time and the multiple measurements might count different computing phases.
Additional options: Additional options:
@ -26,6 +26,42 @@ As a guideline:
- All `PWRx` counters have scope `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `cpu` scope - All `PWRx` counters have scope `socket`, except `"PWR1" : "RAPL_CORE_ENERGY"` has `cpu` scope
- All `DFCx` counters have scope `socket` - All `DFCx` counters have scope `socket`
### Help with the configuration
The configuration for the `likwid` collector is quite complicated. Most users don't use LIKWID with the event:counter notation but rely on the performance groups defined by the LIKWID team for each architecture. In order to help with the `likwid` collector configuration, we included a script `scripts/likwid_perfgroup_to_cc_config.py` that creates the configuration of an `eventset` from a performance group (using a LIKWID installation in `$PATH`):
```
$ likwid-perfctr -i
[...]
short name: ICX
[...]
$ likwid-perfctr -a
[...]
MEM_DP
MEM
FLOPS_SP
CLOCK
[...]
$ scripts/likwid_perfgroup_to_cc_config.py ICX MEM_DP
{
"events": {
"FIXC0": "INSTR_RETIRED_ANY",
"..." : "..."
},
"metrics" : [
{
"calc": "time",
"name": "Runtime (RDTSC) [s]",
"publish": true,
"scope": "hwthread"
},
{
"..." : "..."
}
]
}
```
You can copy this JSON and add it to the `eventsets` list. If you specify multiple event sets, you can add globally derived metrics in the extra `global_metrics` section with the metric names as variables.
### Example configuration ### Example configuration

View File

@ -1,35 +1,76 @@
package collectors package collectors
import ( import (
"bufio"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "os"
"log" "path/filepath"
"regexp"
"strconv" "strconv"
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
const MEMSTATFILE = `/proc/meminfo` const MEMSTATFILE = "/proc/meminfo"
const NUMA_MEMSTAT_BASE = "/sys/devices/system/node"
type MemstatCollectorConfig struct { type MemstatCollectorConfig struct {
ExcludeMetrics []string `json:"exclude_metrics"` ExcludeMetrics []string `json:"exclude_metrics"`
NodeStats bool `json:"node_stats,omitempty"`
NumaStats bool `json:"numa_stats,omitempty"`
}
type MemstatCollectorNode struct {
file string
tags map[string]string
} }
type MemstatCollector struct { type MemstatCollector struct {
metricCollector metricCollector
stats map[string]int64 stats map[string]int64
tags map[string]string tags map[string]string
matches map[string]string matches map[string]string
config MemstatCollectorConfig config MemstatCollectorConfig
nodefiles map[int]MemstatCollectorNode
}
func getStats(filename string) map[string]float64 {
stats := make(map[string]float64)
file, err := os.Open(filename)
if err != nil {
cclog.Error(err.Error())
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
linefields := strings.Fields(line)
if len(linefields) == 3 {
v, err := strconv.ParseFloat(linefields[1], 64)
if err == nil {
stats[strings.Trim(linefields[0], ":")] = v
}
} else if len(linefields) == 5 {
v, err := strconv.ParseFloat(linefields[3], 64)
if err == nil {
stats[strings.Trim(linefields[0], ":")] = v
}
}
}
return stats
} }
func (m *MemstatCollector) Init(config json.RawMessage) error { func (m *MemstatCollector) Init(config json.RawMessage) error {
var err error var err error
m.name = "MemstatCollector" m.name = "MemstatCollector"
m.config.NodeStats = true
m.config.NumaStats = false
if len(config) > 0 { if len(config) > 0 {
err = json.Unmarshal(config, &m.config) err = json.Unmarshal(config, &m.config)
if err != nil { if err != nil {
@ -40,7 +81,8 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
m.stats = make(map[string]int64) m.stats = make(map[string]int64)
m.matches = make(map[string]string) m.matches = make(map[string]string)
m.tags = map[string]string{"type": "node"} m.tags = map[string]string{"type": "node"}
matches := map[string]string{`MemTotal`: "mem_total", matches := map[string]string{
"MemTotal": "mem_total",
"SwapTotal": "swap_total", "SwapTotal": "swap_total",
"SReclaimable": "mem_sreclaimable", "SReclaimable": "mem_sreclaimable",
"Slab": "mem_slab", "Slab": "mem_slab",
@ -48,7 +90,9 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
"Buffers": "mem_buffers", "Buffers": "mem_buffers",
"Cached": "mem_cached", "Cached": "mem_cached",
"MemAvailable": "mem_available", "MemAvailable": "mem_available",
"SwapFree": "swap_free"} "SwapFree": "swap_free",
"MemShared": "mem_shared",
}
for k, v := range matches { for k, v := range matches {
_, skip := stringArrayContains(m.config.ExcludeMetrics, k) _, skip := stringArrayContains(m.config.ExcludeMetrics, k)
if !skip { if !skip {
@ -56,13 +100,44 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
} }
} }
if len(m.matches) == 0 { if len(m.matches) == 0 {
return errors.New("No metrics to collect") return errors.New("no metrics to collect")
} }
m.setup() m.setup()
_, err = ioutil.ReadFile(string(MEMSTATFILE))
if err == nil { if m.config.NodeStats {
m.init = true if stats := getStats(MEMSTATFILE); len(stats) == 0 {
return fmt.Errorf("cannot read data from file %s", MEMSTATFILE)
}
} }
if m.config.NumaStats {
globPattern := filepath.Join(NUMA_MEMSTAT_BASE, "node[0-9]*", "meminfo")
regex := regexp.MustCompile(filepath.Join(NUMA_MEMSTAT_BASE, "node(\\d+)", "meminfo"))
files, err := filepath.Glob(globPattern)
if err == nil {
m.nodefiles = make(map[int]MemstatCollectorNode)
for _, f := range files {
if stats := getStats(f); len(stats) == 0 {
return fmt.Errorf("cannot read data from file %s", f)
}
rematch := regex.FindStringSubmatch(f)
if len(rematch) == 2 {
id, err := strconv.Atoi(rematch[1])
if err == nil {
f := MemstatCollectorNode{
file: f,
tags: map[string]string{
"type": "memoryDomain",
"type-id": fmt.Sprintf("%d", id),
},
}
m.nodefiles[id] = f
}
}
}
}
}
m.init = true
return err return err
} }
@ -71,56 +146,41 @@ func (m *MemstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
return return
} }
buffer, err := ioutil.ReadFile(string(MEMSTATFILE)) sendStats := func(stats map[string]float64, tags map[string]string) {
if err != nil { for match, name := range m.matches {
log.Print(err) var value float64 = 0
return if v, ok := stats[match]; ok {
} value = v
}
ll := strings.Split(string(buffer), "\n") y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value}, time.Now())
for _, line := range ll { if err == nil {
ls := strings.Split(line, `:`) output <- y
if len(ls) > 1 { }
lv := strings.Fields(ls[1])
m.stats[ls[0]], err = strconv.ParseInt(lv[0], 0, 64)
} }
} if _, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used"); !skip {
if freeVal, free := stats["MemFree"]; free {
if _, exists := m.stats[`MemTotal`]; !exists { if bufVal, buffers := stats["Buffers"]; buffers {
err = errors.New("Parse error") if cacheVal, cached := stats["Cached"]; cached {
log.Print(err) memUsed := stats["MemTotal"] - (freeVal + bufVal + cacheVal)
return y, err := lp.New("mem_used", tags, m.meta, map[string]interface{}{"value": memUsed}, time.Now())
} if err == nil {
output <- y
for match, name := range m.matches { }
if _, exists := m.stats[match]; !exists { }
err = fmt.Errorf("Parse error for %s : %s", match, name)
log.Print(err)
continue
}
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": int(float64(m.stats[match]) * 1.0e-3)}, time.Now())
if err == nil {
output <- y
}
}
if _, free := m.stats[`MemFree`]; free {
if _, buffers := m.stats[`Buffers`]; buffers {
if _, cached := m.stats[`Cached`]; cached {
memUsed := m.stats[`MemTotal`] - (m.stats[`MemFree`] + m.stats[`Buffers`] + m.stats[`Cached`])
_, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_used")
y, err := lp.New("mem_used", m.tags, m.meta, map[string]interface{}{"value": int(float64(memUsed) * 1.0e-3)}, time.Now())
if err == nil && !skip {
output <- y
} }
} }
} }
} }
if _, found := m.stats[`MemShared`]; found {
_, skip := stringArrayContains(m.config.ExcludeMetrics, "mem_shared") if m.config.NodeStats {
y, err := lp.New("mem_shared", m.tags, m.meta, map[string]interface{}{"value": int(float64(m.stats[`MemShared`]) * 1.0e-3)}, time.Now()) nodestats := getStats(MEMSTATFILE)
if err == nil && !skip { sendStats(nodestats, m.tags)
output <- y }
if m.config.NumaStats {
for _, nodeConf := range m.nodefiles {
stats := getStats(nodeConf.file)
sendStats(stats, nodeConf.tags)
} }
} }
} }

View File

@ -8,6 +8,7 @@ import (
"os/exec" "os/exec"
"strings" "strings"
"time" "time"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )

View File

@ -7,17 +7,13 @@ This folder contains the ReceiveManager and receiver implementations for the cc-
The configuration file for the receivers is a list of configurations. The `type` field in each specifies which receiver to initialize. The configuration file for the receivers is a list of configurations. The `type` field in each specifies which receiver to initialize.
```json ```json
[ {
{ "myreceivername" : {
"type": "nats", <receiver-specific configuration>
"address": "nats://my-url",
"port" : "4222",
"database": "testcluster"
} }
] }
``` ```
## Type `nats` ## Type `nats`
```json ```json
@ -25,20 +21,20 @@ The configuration file for the receivers is a list of configurations. The `type`
"type": "nats", "type": "nats",
"address": "<nats-URI or hostname>", "address": "<nats-URI or hostname>",
"port" : "<portnumber>", "port" : "<portnumber>",
"database": "<subscribe topic>" "subject": "<subscribe topic>"
} }
``` ```
The `nats` receiver subscribes to the topic `database` and listens on `address` and `port` for metrics in the InfluxDB line protocol. The `nats` receiver subscribes to the topic `database` and listens on `address` and `port` for metrics in the InfluxDB line protocol.
# Contributing own receivers # Contributing own receivers
A receiver contains three functions and is derived from the type `Receiver` (in `metricReceiver.go`): A receiver contains a few functions and is derived from the type `Receiver` (in `metricReceiver.go`):
* `Init(config ReceiverConfig) error`
* `Start() error` * `Start() error`
* `Close()` * `Close()`
* `Name() string` * `Name() string`
* `SetSink(sink chan ccMetric.CCMetric)` * `SetSink(sink chan lp.CCMetric)`
* `New<Typename>(name string, config json.RawMessage)`
The data structures should be set up in `Init()` like opening a file or server connection. The `Start()` function should either start a go routine or issue some other asynchronous mechanism for receiving metrics. The `Close()` function should tear down anything created in `Init()`. The data structures should be set up in `Init()` like opening a file or server connection. The `Start()` function should either start a go routine or issue some other asynchronous mechanism for receiving metrics. The `Close()` function should tear down anything created in `Init()`.
Finally, the receiver needs to be registered in the `receiveManager.go`. There is a list of receivers called `AvailableReceivers` which is a map (`receiver_type_string` -> `pointer to Receiver interface`). Add a new entry with a descriptive name and the new receiver. Finally, the receiver needs to be registered in the `receiveManager.go`. There is a list of receivers called `AvailableReceivers` which is a map (`receiver_type_string` -> `pointer to NewReceiver function`). Add a new entry with a descriptive name and the new receiver.

View File

@ -1,9 +1,6 @@
package receivers package receivers
import ( import (
// "time"
"encoding/json"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
@ -20,13 +17,11 @@ type ReceiverConfig struct {
} }
type receiver struct { type receiver struct {
typename string name string
name string sink chan lp.CCMetric
sink chan lp.CCMetric
} }
type Receiver interface { type Receiver interface {
Init(name string, config json.RawMessage) error
Start() Start()
Close() Close()
Name() string Name() string

View File

@ -32,39 +32,6 @@ var DefaultTime = func() time.Time {
return time.Unix(42, 0) return time.Unix(42, 0)
} }
func (r *NatsReceiver) Init(name string, config json.RawMessage) error {
r.typename = "NatsReceiver"
r.name = name
r.config.Addr = nats.DefaultURL
r.config.Port = "4222"
if len(config) > 0 {
err := json.Unmarshal(config, &r.config)
if err != nil {
cclog.ComponentError(r.name, "Error reading config:", err.Error())
return err
}
}
if len(r.config.Addr) == 0 ||
len(r.config.Port) == 0 ||
len(r.config.Subject) == 0 {
return errors.New("not all configuration variables set required by NatsReceiver")
}
r.meta = map[string]string{"source": r.name}
uri := fmt.Sprintf("%s:%s", r.config.Addr, r.config.Port)
cclog.ComponentDebug(r.name, "INIT", uri, "Subject", r.config.Subject)
nc, err := nats.Connect(uri)
if err == nil {
r.nc = nc
} else {
r.nc = nil
return err
}
r.handler = influx.NewMetricHandler()
r.parser = influx.NewParser(r.handler)
r.parser.SetTimeFunc(DefaultTime)
return err
}
func (r *NatsReceiver) Start() { func (r *NatsReceiver) Start() {
cclog.ComponentDebug(r.name, "START") cclog.ComponentDebug(r.name, "START")
r.nc.Subscribe(r.config.Subject, r._NatsReceive) r.nc.Subscribe(r.config.Subject, r._NatsReceive)
@ -91,3 +58,35 @@ func (r *NatsReceiver) Close() {
r.nc.Close() r.nc.Close()
} }
} }
func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) {
r := new(NatsReceiver)
r.name = fmt.Sprintf("NatsReceiver(%s)", name)
r.config.Addr = nats.DefaultURL
r.config.Port = "4222"
if len(config) > 0 {
err := json.Unmarshal(config, &r.config)
if err != nil {
cclog.ComponentError(r.name, "Error reading config:", err.Error())
return nil, err
}
}
if len(r.config.Addr) == 0 ||
len(r.config.Port) == 0 ||
len(r.config.Subject) == 0 {
return nil, errors.New("not all configuration variables set required by NatsReceiver")
}
r.meta = map[string]string{"source": r.name}
uri := fmt.Sprintf("%s:%s", r.config.Addr, r.config.Port)
cclog.ComponentDebug(r.name, "NewNatsReceiver", uri, "Subject", r.config.Subject)
if nc, err := nats.Connect(uri); err == nil {
r.nc = nc
} else {
r.nc = nil
return nil, err
}
r.handler = influx.NewMetricHandler()
r.parser = influx.NewParser(r.handler)
r.parser.SetTimeFunc(DefaultTime)
return r, nil
}

View File

@ -9,8 +9,8 @@ import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
var AvailableReceivers = map[string]Receiver{ var AvailableReceivers = map[string]func(name string, config json.RawMessage) (Receiver, error){
"nats": &NatsReceiver{}, "nats": NewNatsReceiver,
} }
type receiveManager struct { type receiveManager struct {
@ -30,11 +30,13 @@ type ReceiveManager interface {
} }
func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) error { func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) error {
// Initialize struct fields
rm.inputs = make([]Receiver, 0) rm.inputs = make([]Receiver, 0)
rm.output = nil rm.output = nil
rm.done = make(chan bool) rm.done = make(chan bool)
rm.wg = wg rm.wg = wg
rm.config = make([]json.RawMessage, 0) rm.config = make([]json.RawMessage, 0)
configFile, err := os.Open(receiverConfigFile) configFile, err := os.Open(receiverConfigFile)
if err != nil { if err != nil {
cclog.ComponentError("ReceiveManager", err.Error()) cclog.ComponentError("ReceiveManager", err.Error())
@ -51,6 +53,7 @@ func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) er
for name, raw := range rawConfigs { for name, raw := range rawConfigs {
rm.AddInput(name, raw) rm.AddInput(name, raw)
} }
return nil return nil
} }
@ -75,10 +78,9 @@ func (rm *receiveManager) AddInput(name string, rawConfig json.RawMessage) error
cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "unknown receiver:", err.Error()) cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "unknown receiver:", err.Error())
return err return err
} }
r := AvailableReceivers[config.Type] r, err := AvailableReceivers[config.Type](name, rawConfig)
err = r.Init(name, rawConfig)
if err != nil { if err != nil {
cclog.ComponentError("ReceiveManager", "SKIP", r.Name(), "initialization failed:", err.Error()) cclog.ComponentError("ReceiveManager", "SKIP", name, "initialization failed:", err.Error())
return err return err
} }
rm.inputs = append(rm.inputs, r) rm.inputs = append(rm.inputs, r)

View File

@ -39,7 +39,7 @@ def group_to_json(groupfile):
llist = re.split("\s+", line) llist = re.split("\s+", line)
calc = llist[-1] calc = llist[-1]
metric = " ".join(llist[:-1]) metric = " ".join(llist[:-1])
scope = "hwthread" scope = "cpu"
if "BOX" in calc: if "BOX" in calc:
scope = "socket" scope = "socket"
if "PWR" in calc: if "PWR" in calc:

View File

@ -1,14 +0,0 @@
all: libganglia.so
libganglia.so:
@find /usr ! -readable -prune -o -type d ! -executable -prune -o -name "$@*" -print0 | \
xargs --null --no-run-if-empty --replace \
ln --symbolic --verbose --force '{}' "$@"
@if [[ ! -e "$@" ]]; then touch "$@"; fi
clean:
rm -f libganglia.so
.PHONY: clean

View File

@ -6,6 +6,7 @@ This folder contains the SinkManager and sink implementations for the cc-metric-
- [`stdout`](./stdoutSink.md): Print all metrics to `stdout`, `stderr` or a file - [`stdout`](./stdoutSink.md): Print all metrics to `stdout`, `stderr` or a file
- [`http`](./httpSink.md): Send metrics to an HTTP server as POST requests - [`http`](./httpSink.md): Send metrics to an HTTP server as POST requests
- [`influxdb`](./influxSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database - [`influxdb`](./influxSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database
- [`influxasync`](./influxAsyncSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database with non-blocking write API
- [`nats`](./natsSink.md): Publish metrics to the [NATS](https://nats.io/) network overlay system - [`nats`](./natsSink.md): Publish metrics to the [NATS](https://nats.io/) network overlay system
- [`ganglia`](./gangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) using the `gmetric` CLI tool - [`ganglia`](./gangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) using the `gmetric` CLI tool
- [`libganglia`](./libgangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) directly using `libganglia.so` - [`libganglia`](./libgangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) directly using `libganglia.so`
@ -34,11 +35,12 @@ The configuration file for the sinks is a list of configurations. The `type` fie
# Contributing own sinks # Contributing own sinks
A sink contains four functions and is derived from the type `sink`: A sink contains five functions and is derived from the type `sink`:
* `Init(config json.RawMessage) error` * `Init(name string, config json.RawMessage) error`
* `Write(point CCMetric) error` * `Write(point CCMetric) error`
* `Flush() error` * `Flush() error`
* `Close()` * `Close()`
* `New<Typename>(name string, config json.RawMessage) (Sink, error)` (calls the `Init()` function)
The data structures should be set up in `Init()` like opening a file or server connection. The `Write()` function writes/sends the data. For non-blocking sinks, the `Flush()` method tells the sink to drain its internal buffers. The `Close()` function should tear down anything created in `Init()`. The data structures should be set up in `Init()` like opening a file or server connection. The `Write()` function writes/sends the data. For non-blocking sinks, the `Flush()` method tells the sink to drain its internal buffers. The `Close()` function should tear down anything created in `Init()`.
@ -65,8 +67,8 @@ type SampleSink struct {
} }
// Initialize the sink by giving it a name and reading in the config JSON // Initialize the sink by giving it a name and reading in the config JSON
func (s *SampleSink) Init(config json.RawMessage) error { func (s *SampleSink) Init(name string, config json.RawMessage) error {
s.name = "SampleSink" // Always specify a name here s.name = fmt.Sprintf("SampleSink(%s)", name) // Always specify a name here
// Read in the config JSON // Read in the config JSON
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &s.config) err := json.Unmarshal(config, &s.config)
@ -91,4 +93,13 @@ func (s *SampleSink) Flush() error {
// Close sink: close network connection, close files, close libraries, ... // Close sink: close network connection, close files, close libraries, ...
func (s *SampleSink) Close() {} func (s *SampleSink) Close() {}
// New function to create a new instance of the sink
func NewSampleSink(name string, config json.RawMessage) (Sink, error) {
s := new(SampleSink)
err := s.Init(name, config)
return s, err
}
``` ```

View File

@ -1,6 +1,7 @@
package sinks package sinks
import ( import (
"fmt"
"strings" "strings"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
@ -23,11 +24,8 @@ func GangliaMetricName(point lp.CCMetric) string {
return name return name
} }
func GangliaMetricRename(point lp.CCMetric) string { func GangliaMetricRename(name string) string {
name := point.Name() if name == "net_bytes_in" {
if name == "mem_total" || name == "swap_total" {
return name
} else if name == "net_bytes_in" {
return "bytes_in" return "bytes_in"
} else if name == "net_bytes_out" { } else if name == "net_bytes_out" {
return "bytes_out" return "bytes_out"
@ -48,3 +46,213 @@ func GangliaSlopeType(point lp.CCMetric) uint {
} }
return 3 return 3
} }
const DEFAULT_GANGLIA_METRIC_TMAX = 300
const DEFAULT_GANGLIA_METRIC_SLOPE = "both"
type GangliaMetric struct {
Name string
Type string
Slope string
Tmax int
Unit string
}
type GangliaMetricGroup struct {
Name string
Metrics []GangliaMetric
}
var CommonGangliaMetrics = []GangliaMetricGroup{
{
Name: "memory",
Metrics: []GangliaMetric{
{"mem_total", "float", "zero", 1200, "KB"},
{"swap_total", "float", "zero", 1200, "KB"},
{"mem_free", "float", "both", 180, "KB"},
{"mem_shared", "float", "both", 180, "KB"},
{"mem_buffers", "float", "both", 180, "KB"},
{"mem_cached", "float", "both", 180, "KB"},
{"swap_free", "float", "both", 180, "KB"},
{"mem_sreclaimable", "float", "both", 180, "KB"},
{"mem_slab", "float", "both", 180, "KB"},
},
},
{
Name: "cpu",
Metrics: []GangliaMetric{
{"cpu_num", "uint32", "zero", 1200, "CPUs"},
{"cpu_speed", "uint32", "zero", 1200, "MHz"},
{"cpu_user", "float", "both", 90, "%"},
{"cpu_nice", "float", "both", 90, "%"},
{"cpu_system", "float", "both", 90, "%"},
{"cpu_idle", "float", "both", 3800, "%"},
{"cpu_aidle", "float", "both", 90, "%"},
{"cpu_wio", "float", "both", 90, "%"},
{"cpu_intr", "float", "both", 90, "%"},
{"cpu_sintr", "float", "both", 90, "%"},
{"cpu_steal", "float", "both", 90, "%"},
{"cpu_guest", "float", "both", 90, "%"},
{"cpu_gnice", "float", "both", 90, "%"},
},
},
{
Name: "load",
Metrics: []GangliaMetric{
{"load_one", "float", "both", 70, ""},
{"load_five", "float", "both", 325, ""},
{"load_fifteen", "float", "both", 950, ""},
},
},
{
Name: "disk",
Metrics: []GangliaMetric{
{"disk_total", "double", "both", 1200, "GB"},
{"disk_free", "double", "both", 180, "GB"},
{"part_max_used", "float", "both", 180, "%"},
},
},
{
Name: "network",
Metrics: []GangliaMetric{
{"bytes_out", "float", "both", 300, "bytes/sec"},
{"bytes_in", "float", "both", 300, "bytes/sec"},
{"pkts_in", "float", "both", 300, "packets/sec"},
{"pkts_out", "float", "both", 300, "packets/sec"},
},
},
{
Name: "process",
Metrics: []GangliaMetric{
{"proc_run", "uint32", "both", 950, ""},
{"proc_total", "uint32", "both", 950, ""},
},
},
{
Name: "system",
Metrics: []GangliaMetric{
{"boottime", "uint32", "zero", 1200, "s"},
{"sys_clock", "uint32", "zero", 1200, "s"},
{"machine_type", "string", "zero", 1200, ""},
{"os_name", "string", "zero", 1200, ""},
{"os_release", "string", "zero", 1200, ""},
{"mtu", "uint32", "both", 1200, ""},
},
},
}
type GangliaMetricConfig struct {
Type string
Slope string
Tmax int
Unit string
Group string
Value string
}
func GetCommonGangliaConfig(point lp.CCMetric) GangliaMetricConfig {
mname := GangliaMetricRename(point.Name())
for _, group := range CommonGangliaMetrics {
for _, metric := range group.Metrics {
if metric.Name == mname {
valueStr := ""
value, ok := point.GetField("value")
if ok {
switch real := value.(type) {
case float64:
valueStr = fmt.Sprintf("%f", real)
case float32:
valueStr = fmt.Sprintf("%f", real)
case int64:
valueStr = fmt.Sprintf("%d", real)
case int32:
valueStr = fmt.Sprintf("%d", real)
case int:
valueStr = fmt.Sprintf("%d", real)
case uint64:
valueStr = fmt.Sprintf("%d", real)
case uint32:
valueStr = fmt.Sprintf("%d", real)
case uint:
valueStr = fmt.Sprintf("%d", real)
case string:
valueStr = real
default:
}
}
return GangliaMetricConfig{
Group: group.Name,
Type: metric.Type,
Slope: metric.Slope,
Tmax: metric.Tmax,
Unit: metric.Unit,
Value: valueStr,
}
}
}
}
return GangliaMetricConfig{
Group: "",
Type: "",
Slope: "",
Tmax: 0,
Unit: "",
Value: "",
}
}
func GetGangliaConfig(point lp.CCMetric) GangliaMetricConfig {
group := ""
if g, ok := point.GetMeta("group"); ok {
group = g
}
unit := ""
if u, ok := point.GetMeta("unit"); ok {
unit = u
}
valueType := "double"
valueStr := ""
value, ok := point.GetField("value")
if ok {
switch real := value.(type) {
case float64:
valueStr = fmt.Sprintf("%f", real)
valueType = "double"
case float32:
valueStr = fmt.Sprintf("%f", real)
valueType = "float"
case int64:
valueStr = fmt.Sprintf("%d", real)
valueType = "int32"
case int32:
valueStr = fmt.Sprintf("%d", real)
valueType = "int32"
case int:
valueStr = fmt.Sprintf("%d", real)
valueType = "int32"
case uint64:
valueStr = fmt.Sprintf("%d", real)
valueType = "uint32"
case uint32:
valueStr = fmt.Sprintf("%d", real)
valueType = "uint32"
case uint:
valueStr = fmt.Sprintf("%d", real)
valueType = "uint32"
case string:
valueStr = real
valueType = "string"
default:
valueType = "invalid"
}
}
return GangliaMetricConfig{
Group: group,
Type: valueType,
Slope: DEFAULT_GANGLIA_METRIC_SLOPE,
Tmax: DEFAULT_GANGLIA_METRIC_TMAX,
Unit: unit,
Value: valueStr,
}
}

View File

@ -24,6 +24,7 @@ type GangliaSinkConfig struct {
AddTagsAsDesc bool `json:"add_tags_as_desc,omitempty"` AddTagsAsDesc bool `json:"add_tags_as_desc,omitempty"`
ClusterName string `json:"cluster_name,omitempty"` ClusterName string `json:"cluster_name,omitempty"`
AddTypeToName bool `json:"add_type_to_name,omitempty"` AddTypeToName bool `json:"add_type_to_name,omitempty"`
AddUnits bool `json:"add_units,omitempty"`
} }
type GangliaSink struct { type GangliaSink struct {
@ -33,16 +34,73 @@ type GangliaSink struct {
config GangliaSinkConfig config GangliaSinkConfig
} }
func (s *GangliaSink) Init(config json.RawMessage) error { func (s *GangliaSink) Write(point lp.CCMetric) error {
var err error = nil var err error = nil
s.name = "GangliaSink" //var tagsstr []string
var argstr []string
// Get metric name
metricname := GangliaMetricRename(point.Name())
// Get metric config (type, value, ... in suitable format)
conf := GetCommonGangliaConfig(point)
if len(conf.Type) == 0 {
conf = GetGangliaConfig(point)
}
if len(conf.Type) == 0 {
return fmt.Errorf("metric %s has no 'value' field", metricname)
}
if s.config.AddGangliaGroup {
argstr = append(argstr, fmt.Sprintf("--group=%s", conf.Group))
}
if s.config.AddUnits && len(conf.Unit) > 0 {
argstr = append(argstr, fmt.Sprintf("--units=%s", conf.Unit))
}
if len(s.config.ClusterName) > 0 {
argstr = append(argstr, fmt.Sprintf("--cluster=%s", s.config.ClusterName))
}
// if s.config.AddTagsAsDesc && len(tagsstr) > 0 {
// argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ",")))
// }
if len(s.gmetric_config) > 0 {
argstr = append(argstr, fmt.Sprintf("--conf=%s", s.gmetric_config))
}
if s.config.AddTypeToName {
argstr = append(argstr, fmt.Sprintf("--name=%s", GangliaMetricName(point)))
} else {
argstr = append(argstr, fmt.Sprintf("--name=%s", metricname))
}
argstr = append(argstr, fmt.Sprintf("--slope=%s", conf.Slope))
argstr = append(argstr, fmt.Sprintf("--value=%s", conf.Value))
argstr = append(argstr, fmt.Sprintf("--type=%s", conf.Type))
argstr = append(argstr, fmt.Sprintf("--tmax=%d", conf.Tmax))
cclog.ComponentDebug(s.name, s.gmetric_path, strings.Join(argstr, " "))
command := exec.Command(s.gmetric_path, argstr...)
command.Wait()
_, err = command.Output()
return err
}
func (s *GangliaSink) Flush() error {
return nil
}
func (s *GangliaSink) Close() {
}
func NewGangliaSink(name string, config json.RawMessage) (Sink, error) {
s := new(GangliaSink)
s.name = fmt.Sprintf("GangliaSink(%s)", name)
s.config.AddTagsAsDesc = false s.config.AddTagsAsDesc = false
s.config.AddGangliaGroup = false s.config.AddGangliaGroup = false
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &s.config) err := json.Unmarshal(config, &s.config)
if err != nil { if err != nil {
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error()) cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
return err return nil, err
} }
} }
s.gmetric_path = "" s.gmetric_path = ""
@ -60,98 +118,10 @@ func (s *GangliaSink) Init(config json.RawMessage) error {
} }
} }
if len(s.gmetric_path) == 0 { if len(s.gmetric_path) == 0 {
err = errors.New("cannot find executable 'gmetric'") return nil, errors.New("cannot find executable 'gmetric'")
} }
if len(s.config.GmetricConfig) > 0 { if len(s.config.GmetricConfig) > 0 {
s.gmetric_config = s.config.GmetricConfig s.gmetric_config = s.config.GmetricConfig
} }
return err return s, nil
}
func (s *GangliaSink) Write(point lp.CCMetric) error {
var err error = nil
var tagsstr []string
var argstr []string
if s.config.AddGangliaGroup {
if point.HasTag("group") {
g, _ := point.GetTag("group")
argstr = append(argstr, fmt.Sprintf("--group=%s", g))
} else if point.HasMeta("group") {
g, _ := point.GetMeta("group")
argstr = append(argstr, fmt.Sprintf("--group=%s", g))
}
}
for key, value := range point.Tags() {
switch key {
case "unit":
argstr = append(argstr, fmt.Sprintf("--units=%s", value))
default:
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value))
}
}
if s.config.MetaAsTags {
for key, value := range point.Meta() {
switch key {
case "unit":
argstr = append(argstr, fmt.Sprintf("--units=%s", value))
default:
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", key, value))
}
}
}
if len(s.config.ClusterName) > 0 {
argstr = append(argstr, fmt.Sprintf("--cluster=%s", s.config.ClusterName))
}
if s.config.AddTagsAsDesc && len(tagsstr) > 0 {
argstr = append(argstr, fmt.Sprintf("--desc=%q", strings.Join(tagsstr, ",")))
}
if len(s.gmetric_config) > 0 {
argstr = append(argstr, fmt.Sprintf("--conf=%s", s.gmetric_config))
}
name := GangliaMetricRename(point)
if s.config.AddTypeToName {
argstr = append(argstr, fmt.Sprintf("--name=%s", GangliaMetricName(point)))
} else {
argstr = append(argstr, fmt.Sprintf("--name=%s", name))
}
slope := GangliaSlopeType(point)
slopeStr := "both"
if slope == 0 {
slopeStr = "zero"
}
argstr = append(argstr, fmt.Sprintf("--slope=%s", slopeStr))
for k, v := range point.Fields() {
if k == "value" {
switch value := v.(type) {
case float64:
argstr = append(argstr,
fmt.Sprintf("--value=%v", value), "--type=double")
case float32:
argstr = append(argstr,
fmt.Sprintf("--value=%v", value), "--type=float")
case int:
argstr = append(argstr,
fmt.Sprintf("--value=%d", value), "--type=int32")
case int64:
argstr = append(argstr,
fmt.Sprintf("--value=%d", value), "--type=int32")
case string:
argstr = append(argstr,
fmt.Sprintf("--value=%q", value), "--type=string")
}
}
}
command := exec.Command(s.gmetric_path, argstr...)
command.Wait()
_, err = command.Output()
return err
}
func (s *GangliaSink) Flush() error {
return nil
}
func (s *GangliaSink) Close() {
} }

View File

@ -38,57 +38,6 @@ type HttpSink struct {
flushDelay time.Duration flushDelay time.Duration
} }
func (s *HttpSink) Init(config json.RawMessage) error {
// Set default values
s.name = "HttpSink"
s.config.MaxIdleConns = 10
s.config.IdleConnTimeout = "5s"
s.config.Timeout = "5s"
s.config.FlushDelay = "1s"
// Read config
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
if len(s.config.URL) == 0 {
return errors.New("`url` config option is required for HTTP sink")
}
if s.config.MaxIdleConns > 0 {
s.maxIdleConns = s.config.MaxIdleConns
}
if len(s.config.IdleConnTimeout) > 0 {
t, err := time.ParseDuration(s.config.IdleConnTimeout)
if err == nil {
s.idleConnTimeout = t
}
}
if len(s.config.Timeout) > 0 {
t, err := time.ParseDuration(s.config.Timeout)
if err == nil {
s.timeout = t
}
}
if len(s.config.FlushDelay) > 0 {
t, err := time.ParseDuration(s.config.FlushDelay)
if err == nil {
s.flushDelay = t
}
}
tr := &http.Transport{
MaxIdleConns: s.maxIdleConns,
IdleConnTimeout: s.idleConnTimeout,
}
s.client = &http.Client{Transport: tr, Timeout: s.timeout}
s.buffer = &bytes.Buffer{}
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
return nil
}
func (s *HttpSink) Write(m lp.CCMetric) error { func (s *HttpSink) Write(m lp.CCMetric) error {
if s.buffer.Len() == 0 && s.flushDelay != 0 { if s.buffer.Len() == 0 && s.flushDelay != 0 {
// This is the first write since the last flush, start the flushTimer! // This is the first write since the last flush, start the flushTimer!
@ -169,3 +118,54 @@ func (s *HttpSink) Close() {
} }
s.client.CloseIdleConnections() s.client.CloseIdleConnections()
} }
func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
s := new(HttpSink)
// Set default values
s.name = fmt.Sprintf("HttpSink(%s)", name)
s.config.MaxIdleConns = 10
s.config.IdleConnTimeout = "5s"
s.config.Timeout = "5s"
s.config.FlushDelay = "1s"
// Read config
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
}
}
if len(s.config.URL) == 0 {
return nil, errors.New("`url` config option is required for HTTP sink")
}
if s.config.MaxIdleConns > 0 {
s.maxIdleConns = s.config.MaxIdleConns
}
if len(s.config.IdleConnTimeout) > 0 {
t, err := time.ParseDuration(s.config.IdleConnTimeout)
if err == nil {
s.idleConnTimeout = t
}
}
if len(s.config.Timeout) > 0 {
t, err := time.ParseDuration(s.config.Timeout)
if err == nil {
s.timeout = t
}
}
if len(s.config.FlushDelay) > 0 {
t, err := time.ParseDuration(s.config.FlushDelay)
if err == nil {
s.flushDelay = t
}
}
tr := &http.Transport{
MaxIdleConns: s.maxIdleConns,
IdleConnTimeout: s.idleConnTimeout,
}
s.client = &http.Client{Transport: tr, Timeout: s.timeout}
s.buffer = &bytes.Buffer{}
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
return s, nil
}

View File

@ -30,11 +30,10 @@ type InfluxAsyncSinkConfig struct {
type InfluxAsyncSink struct { type InfluxAsyncSink struct {
sink sink
client influxdb2.Client client influxdb2.Client
writeApi influxdb2Api.WriteAPI writeApi influxdb2Api.WriteAPI
retPolicy string errors <-chan error
errors <-chan error config InfluxAsyncSinkConfig
config InfluxAsyncSinkConfig
} }
func (s *InfluxAsyncSink) connect() error { func (s *InfluxAsyncSink) connect() error {
@ -68,39 +67,6 @@ func (s *InfluxAsyncSink) connect() error {
return nil return nil
} }
func (s *InfluxAsyncSink) Init(config json.RawMessage) error {
s.name = "InfluxSink"
// Set default for maximum number of points sent to server in single request.
s.config.BatchSize = 100
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return errors.New("not all configuration variables set required by InfluxAsyncSink")
}
// Connect to InfluxDB server
err := s.connect()
// Start background: Read from error channel
s.errors = s.writeApi.Errors()
go func() {
for err := range s.errors {
cclog.ComponentError(s.name, err.Error())
}
}()
return err
}
func (s *InfluxAsyncSink) Write(m lp.CCMetric) error { func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
s.writeApi.WritePoint( s.writeApi.WritePoint(
m.ToPoint(s.config.MetaAsTags), m.ToPoint(s.config.MetaAsTags),
@ -118,3 +84,40 @@ func (s *InfluxAsyncSink) Close() {
s.writeApi.Flush() s.writeApi.Flush()
s.client.Close() s.client.Close()
} }
func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
s := new(InfluxAsyncSink)
s.name = fmt.Sprintf("InfluxSink(%s)", name)
// Set default for maximum number of points sent to server in single request.
s.config.BatchSize = 100
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return nil, errors.New("not all configuration variables set required by InfluxAsyncSink")
}
// Connect to InfluxDB server
if err := s.connect(); err != nil {
return nil, fmt.Errorf("Unable to connect: %v", err)
}
// Start background: Read from error channel
s.errors = s.writeApi.Errors()
go func() {
for err := range s.errors {
cclog.ComponentError(s.name, err.Error())
}
}()
return s, nil
}

View File

@ -57,26 +57,6 @@ func (s *InfluxSink) connect() error {
return nil return nil
} }
func (s *InfluxSink) Init(config json.RawMessage) error {
s.name = "InfluxSink"
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return errors.New("not all configuration variables set required by InfluxSink")
}
// Connect to InfluxDB server
return s.connect()
}
func (s *InfluxSink) Write(m lp.CCMetric) error { func (s *InfluxSink) Write(m lp.CCMetric) error {
err := err :=
s.writeApi.WritePoint( s.writeApi.WritePoint(
@ -94,3 +74,27 @@ func (s *InfluxSink) Close() {
cclog.ComponentDebug(s.name, "Closing InfluxDB connection") cclog.ComponentDebug(s.name, "Closing InfluxDB connection")
s.client.Close() s.client.Close()
} }
func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
s := new(InfluxSink)
s.name = fmt.Sprintf("InfluxSink(%s)", name)
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return nil, errors.New("not all configuration variables set required by InfluxSink")
}
// Connect to InfluxDB server
if err := s.connect(); err != nil {
return nil, fmt.Errorf("Unable to connect: %v", err)
}
return s, nil
}

View File

@ -2,7 +2,7 @@ package sinks
/* /*
#cgo CFLAGS: -DGM_PROTOCOL_GUARD #cgo CFLAGS: -DGM_PROTOCOL_GUARD
#cgo LDFLAGS: -L. -lganglia -Wl,--unresolved-symbols=ignore-in-object-files #cgo LDFLAGS: -L. -Wl,--unresolved-symbols=ignore-in-object-files
#include <stdlib.h> #include <stdlib.h>
// This is a copy&paste snippet of ganglia.h (BSD-3 license) // This is a copy&paste snippet of ganglia.h (BSD-3 license)
@ -71,6 +71,7 @@ import (
"fmt" "fmt"
"unsafe" "unsafe"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"github.com/NVIDIA/go-nvml/pkg/dl" "github.com/NVIDIA/go-nvml/pkg/dl"
) )
@ -81,21 +82,21 @@ const (
GMOND_CONFIG_FILE = `/etc/ganglia/gmond.conf` GMOND_CONFIG_FILE = `/etc/ganglia/gmond.conf`
) )
type LibgangliaSinkSpecialMetric struct { // type LibgangliaSinkSpecialMetric struct {
MetricName string `json:"metric_name,omitempty"` // MetricName string `json:"metric_name,omitempty"`
NewName string `json:"new_name,omitempty"` // NewName string `json:"new_name,omitempty"`
Slope string `json:"slope,omitempty"` // Slope string `json:"slope,omitempty"`
} // }
type LibgangliaSinkConfig struct { type LibgangliaSinkConfig struct {
defaultSinkConfig defaultSinkConfig
GangliaLib string `json:"libganglia_path,omitempty"` GangliaLib string `json:"libganglia_path,omitempty"`
GmondConfig string `json:"gmond_config,omitempty"` GmondConfig string `json:"gmond_config,omitempty"`
AddGangliaGroup bool `json:"add_ganglia_group,omitempty"` AddGangliaGroup bool `json:"add_ganglia_group,omitempty"`
AddTypeToName bool `json:"add_type_to_name,omitempty"` AddTypeToName bool `json:"add_type_to_name,omitempty"`
AddUnits bool `json:"add_units,omitempty"` AddUnits bool `json:"add_units,omitempty"`
ClusterName string `json:"cluster_name,omitempty"` ClusterName string `json:"cluster_name,omitempty"`
SpecialMetrics map[string]LibgangliaSinkSpecialMetric `json:"rename_metrics,omitempty"` // Map to rename metric name from key to value //SpecialMetrics map[string]LibgangliaSinkSpecialMetric `json:"rename_metrics,omitempty"` // Map to rename metric name from key to value
//AddTagsAsDesc bool `json:"add_tags_as_desc,omitempty"` //AddTagsAsDesc bool `json:"add_tags_as_desc,omitempty"`
} }
@ -108,61 +109,6 @@ type LibgangliaSink struct {
cstrCache map[string]*C.char cstrCache map[string]*C.char
} }
func (s *LibgangliaSink) Init(config json.RawMessage) error {
var err error = nil
s.name = "LibgangliaSink"
//s.config.AddTagsAsDesc = false
s.config.AddGangliaGroup = false
s.config.AddTypeToName = false
s.config.AddUnits = true
s.config.GmondConfig = string(GMOND_CONFIG_FILE)
s.config.GangliaLib = string(GANGLIA_LIB_NAME)
if len(config) > 0 {
err = json.Unmarshal(config, &s.config)
if err != nil {
fmt.Println(s.name, "Error reading config for", s.name, ":", err.Error())
return err
}
}
lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS)
if lib == nil {
return fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib)
}
// Set up cache for the C strings
s.cstrCache = make(map[string]*C.char)
// s.cstrCache["globals"] = C.CString("globals")
// s.cstrCache["override_hostname"] = C.CString("override_hostname")
// s.cstrCache["override_ip"] = C.CString("override_ip")
// Add some constant strings
s.cstrCache["GROUP"] = C.CString("GROUP")
s.cstrCache["CLUSTER"] = C.CString("CLUSTER")
s.cstrCache[""] = C.CString("")
// Add cluster name for lookup in Write()
if len(s.config.ClusterName) > 0 {
s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName)
}
// Add supported types for later lookup in Write()
s.cstrCache["double"] = C.CString("double")
s.cstrCache["int32"] = C.CString("int32")
s.cstrCache["string"] = C.CString("string")
// Create Ganglia pool
s.global_context = C.Ganglia_pool_create(nil)
// Load Ganglia configuration
s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig)
s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0)
//globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"])
//override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"])
//override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"])
s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config)
return nil
}
func (s *LibgangliaSink) Write(point lp.CCMetric) error { func (s *LibgangliaSink) Write(point lp.CCMetric) error {
var err error = nil var err error = nil
var c_name *C.char var c_name *C.char
@ -179,72 +125,48 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error {
} }
// Get metric name // Get metric name
metricname := GangliaMetricRename(point) metricname := GangliaMetricRename(point.Name())
if s.config.AddTypeToName {
c_name = lookup(GangliaMetricName(point))
} else {
c_name = lookup(metricname)
}
// Get the value C string and lookup the type string in the cache conf := GetCommonGangliaConfig(point)
value, ok := point.GetField("value") if len(conf.Type) == 0 {
if !ok { conf = GetGangliaConfig(point)
}
if len(conf.Type) == 0 {
return fmt.Errorf("metric %s has no 'value' field", metricname) return fmt.Errorf("metric %s has no 'value' field", metricname)
} }
switch real := value.(type) {
case float64: if s.config.AddTypeToName {
c_value = C.CString(fmt.Sprintf("%f", real)) metricname = GangliaMetricName(point)
c_type = lookup("double")
case float32:
c_value = C.CString(fmt.Sprintf("%f", real))
c_type = lookup("float")
case int64:
c_value = C.CString(fmt.Sprintf("%d", real))
c_type = lookup("int32")
case int32:
c_value = C.CString(fmt.Sprintf("%d", real))
c_type = lookup("int32")
case int:
c_value = C.CString(fmt.Sprintf("%d", real))
c_type = lookup("int32")
case string:
c_value = C.CString(real)
c_type = lookup("string")
default:
return fmt.Errorf("metric %s has invalid 'value' type for %s", point.Name(), s.name)
} }
c_value = C.CString(conf.Value)
c_type = lookup(conf.Type)
c_name = lookup(metricname)
// Add unit // Add unit
unit := ""
if s.config.AddUnits { if s.config.AddUnits {
if tagunit, tagok := point.GetTag("unit"); tagok { unit = conf.Unit
c_unit = lookup(tagunit)
} else if metaunit, metaok := point.GetMeta("unit"); metaok {
c_unit = lookup(metaunit)
} else {
c_unit = lookup("")
}
} else {
c_unit = lookup("")
} }
c_unit = lookup(unit)
// Determine the slope of the metric. Ganglia's own collector mostly use // Determine the slope of the metric. Ganglia's own collector mostly use
// 'both' but the mem and swap total uses 'zero'. // 'both' but the mem and swap total uses 'zero'.
slope := GangliaSlopeType(point)
slope_type := C.GANGLIA_SLOPE_BOTH slope_type := C.GANGLIA_SLOPE_BOTH
switch slope { switch conf.Slope {
case 0: case "zero":
slope_type = C.GANGLIA_SLOPE_ZERO slope_type = C.GANGLIA_SLOPE_ZERO
case "both":
slope_type = C.GANGLIA_SLOPE_BOTH
} }
// Create a new Ganglia metric // Create a new Ganglia metric
gmetric := C.Ganglia_metric_create(s.global_context) gmetric := C.Ganglia_metric_create(s.global_context)
// Set name, value, type and unit in the Ganglia metric // Set name, value, type and unit in the Ganglia metric
// Since we don't have this information from the collectors, // The default slope_type is both directions, so up and down. Some metrics want 'zero' slope, probably constant.
// we assume that the metric value can go up and down (slope), // The 'tmax' value is by default 300.
// and there is no maximum for 'dmax' and 'tmax'.
// Ganglia's collectors set 'tmax' but not 'dmax'
rval := C.int(0) rval := C.int(0)
rval = C.Ganglia_metric_set(gmetric, c_name, c_value, c_type, c_unit, C.uint(slope_type), 0, 0) rval = C.Ganglia_metric_set(gmetric, c_name, c_value, c_type, c_unit, C.uint(slope_type), C.uint(conf.Tmax), 0)
switch rval { switch rval {
case 1: case 1:
C.free(unsafe.Pointer(c_value)) C.free(unsafe.Pointer(c_value))
@ -254,10 +176,10 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error {
return errors.New("one of your parameters has an invalid character '\"'") return errors.New("one of your parameters has an invalid character '\"'")
case 3: case 3:
C.free(unsafe.Pointer(c_value)) C.free(unsafe.Pointer(c_value))
return fmt.Errorf("the type parameter \"%s\" is not a valid type", C.GoString(c_type)) return fmt.Errorf("the type parameter \"%s\" is not a valid type", conf.Type)
case 4: case 4:
C.free(unsafe.Pointer(c_value)) C.free(unsafe.Pointer(c_value))
return fmt.Errorf("the value parameter \"%s\" does not represent a number", C.GoString(c_value)) return fmt.Errorf("the value parameter \"%s\" does not represent a number", conf.Value)
default: default:
} }
@ -266,8 +188,8 @@ func (s *LibgangliaSink) Write(point lp.CCMetric) error {
C.Ganglia_metadata_add(gmetric, lookup("CLUSTER"), lookup(s.config.ClusterName)) C.Ganglia_metadata_add(gmetric, lookup("CLUSTER"), lookup(s.config.ClusterName))
} }
// Set the group metadata in the Ganglia metric if configured // Set the group metadata in the Ganglia metric if configured
if group, ok := point.GetMeta("group"); ok && s.config.AddGangliaGroup { if s.config.AddGangliaGroup {
c_group := lookup(group) c_group := lookup(conf.Group)
C.Ganglia_metadata_add(gmetric, lookup("GROUP"), c_group) C.Ganglia_metadata_add(gmetric, lookup("GROUP"), c_group)
} }
@ -302,3 +224,63 @@ func (s *LibgangliaSink) Close() {
C.free(unsafe.Pointer(cstr)) C.free(unsafe.Pointer(cstr))
} }
} }
func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) {
s := new(LibgangliaSink)
var err error = nil
s.name = fmt.Sprintf("LibgangliaSink(%s)", name)
//s.config.AddTagsAsDesc = false
s.config.AddGangliaGroup = false
s.config.AddTypeToName = false
s.config.AddUnits = true
s.config.GmondConfig = string(GMOND_CONFIG_FILE)
s.config.GangliaLib = string(GANGLIA_LIB_NAME)
if len(config) > 0 {
err = json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config:", err.Error())
return nil, err
}
}
lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS)
if lib == nil {
return nil, fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib)
}
err = lib.Open()
if err != nil {
return nil, fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err)
}
// Set up cache for the C strings
s.cstrCache = make(map[string]*C.char)
// s.cstrCache["globals"] = C.CString("globals")
// s.cstrCache["override_hostname"] = C.CString("override_hostname")
// s.cstrCache["override_ip"] = C.CString("override_ip")
// Add some constant strings
s.cstrCache["GROUP"] = C.CString("GROUP")
s.cstrCache["CLUSTER"] = C.CString("CLUSTER")
s.cstrCache[""] = C.CString("")
// Add cluster name for lookup in Write()
if len(s.config.ClusterName) > 0 {
s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName)
}
// Add supported types for later lookup in Write()
s.cstrCache["double"] = C.CString("double")
s.cstrCache["int32"] = C.CString("int32")
s.cstrCache["string"] = C.CString("string")
// Create Ganglia pool
s.global_context = C.Ganglia_pool_create(nil)
// Load Ganglia configuration
s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig)
s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0)
//globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"])
//override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"])
//override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"])
s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config)
return s, nil
}

View File

@ -1,8 +1,6 @@
package sinks package sinks
import ( import (
"encoding/json"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
@ -17,7 +15,6 @@ type sink struct {
} }
type Sink interface { type Sink interface {
Init(config json.RawMessage) error
Write(point lp.CCMetric) error Write(point lp.CCMetric) error
Flush() error Flush() error
Close() Close()

View File

@ -53,30 +53,6 @@ func (s *NatsSink) connect() error {
return nil return nil
} }
func (s *NatsSink) Init(config json.RawMessage) error {
s.name = "NatsSink"
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
return err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 {
return errors.New("not all configuration variables set required by NatsSink")
}
// Setup Influx line protocol
s.buffer = &bytes.Buffer{}
s.buffer.Grow(1025)
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
s.encoder.SetMaxLineBytes(1024)
// Setup infos for connection
return s.connect()
}
func (s *NatsSink) Write(m lp.CCMetric) error { func (s *NatsSink) Write(m lp.CCMetric) error {
if s.client != nil { if s.client != nil {
_, err := s.encoder.Encode(m.ToPoint(s.config.MetaAsTags)) _, err := s.encoder.Encode(m.ToPoint(s.config.MetaAsTags))
@ -105,3 +81,31 @@ func (s *NatsSink) Close() {
s.client.Close() s.client.Close()
} }
} }
func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
s := new(NatsSink)
s.name = fmt.Sprintf("NatsSink(%s)", name)
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
return nil, err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 {
return nil, errors.New("not all configuration variables set required by NatsSink")
}
// Setup Influx line protocol
s.buffer = &bytes.Buffer{}
s.buffer.Grow(1025)
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
s.encoder.SetMaxLineBytes(1024)
// Setup infos for connection
if err := s.connect(); err != nil {
return nil, fmt.Errorf("Unable to connect: %v", err)
}
return s, nil
}

View File

@ -13,14 +13,14 @@ import (
const SINK_MAX_FORWARD = 50 const SINK_MAX_FORWARD = 50
// Map of all available sinks // Map of all available sinks
var AvailableSinks = map[string]Sink{ var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink, error){
"influxdb": new(InfluxSink), "ganglia": NewGangliaSink,
"stdout": new(StdoutSink), "libganglia": NewLibgangliaSink,
"nats": new(NatsSink), "stdout": NewStdoutSink,
"http": new(HttpSink), "nats": NewNatsSink,
"ganglia": new(GangliaSink), "influxdb": NewInfluxSink,
"influxasync": new(InfluxAsyncSink), "influxasync": NewInfluxAsyncSink,
"libganglia": new(LibgangliaSink), "http": NewHttpSink,
} }
// Metric collector manager data structure // Metric collector manager data structure
@ -149,8 +149,7 @@ func (sm *sinkManager) AddOutput(name string, rawConfig json.RawMessage) error {
cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", sinkConfig.Type) cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", sinkConfig.Type)
return err return err
} }
s := AvailableSinks[sinkConfig.Type] s, err := AvailableSinks[sinkConfig.Type](name, rawConfig)
err = s.Init(rawConfig)
if err != nil { if err != nil {
cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error()) cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error())
return err return err

View File

@ -19,34 +19,6 @@ type StdoutSink struct {
} }
} }
func (s *StdoutSink) Init(config json.RawMessage) error {
s.name = "StdoutSink"
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
s.output = os.Stdout
if len(s.config.Output) > 0 {
switch strings.ToLower(s.config.Output) {
case "stdout":
s.output = os.Stdout
case "stderr":
s.output = os.Stderr
default:
f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600))
if err != nil {
return err
}
s.output = f
}
}
s.meta_as_tags = s.config.MetaAsTags
return nil
}
func (s *StdoutSink) Write(m lp.CCMetric) error { func (s *StdoutSink) Write(m lp.CCMetric) error {
fmt.Fprint( fmt.Fprint(
s.output, s.output,
@ -65,3 +37,33 @@ func (s *StdoutSink) Close() {
s.output.Close() s.output.Close()
} }
} }
func NewStdoutSink(name string, config json.RawMessage) (Sink, error) {
s := new(StdoutSink)
s.name = fmt.Sprintf("StdoutSink(%s)", name)
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
}
}
s.output = os.Stdout
if len(s.config.Output) > 0 {
switch strings.ToLower(s.config.Output) {
case "stdout":
s.output = os.Stdout
case "stderr":
s.output = os.Stderr
default:
f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600))
if err != nil {
return nil, err
}
s.output = f
}
}
s.meta_as_tags = s.config.MetaAsTags
return s, nil
}