Formatting

This commit is contained in:
Thomas Roehl 2021-12-21 14:04:31 +01:00
parent 988cea381e
commit a6feb16ec1
30 changed files with 775 additions and 793 deletions

View File

@ -16,5 +16,6 @@ fmt:
go fmt sinks/*.go
go fmt receivers/*.go
go fmt metric-collector.go
find . -name "*.go" -exec go fmt {} \;
.PHONY: clean

View File

@ -1,16 +1,15 @@
package collectors
import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"sync"
"time"
"log"
"os"
"encoding/json"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
"encoding/json"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
"log"
"os"
"sync"
"time"
)
var AvailableCollectors = map[string]MetricCollector{
"likwid": &LikwidCollector{},
"loadavg": &LoadavgCollector{},
@ -27,15 +26,14 @@ var AvailableCollectors = map[string]MetricCollector{
"ipmistat": &IpmiCollector{},
}
type collectorManager struct {
collectors []MetricCollector
output chan lp.CCMetric
done chan bool
ticker mct.MultiChanTicker
duration time.Duration
wg *sync.WaitGroup
config map[string]json.RawMessage
collectors []MetricCollector
output chan lp.CCMetric
done chan bool
ticker mct.MultiChanTicker
duration time.Duration
wg *sync.WaitGroup
config map[string]json.RawMessage
}
type CollectorManager interface {
@ -45,96 +43,95 @@ type CollectorManager interface {
Close()
}
func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error {
cm.collectors = make([]MetricCollector, 0)
cm.output = nil
cm.done = make(chan bool)
cm.wg = wg
cm.ticker = ticker
cm.duration = duration
configFile, err := os.Open(collectConfigFile)
if err != nil {
log.Print(err.Error())
return err
}
defer configFile.Close()
jsonParser := json.NewDecoder(configFile)
cm.collectors = make([]MetricCollector, 0)
cm.output = nil
cm.done = make(chan bool)
cm.wg = wg
cm.ticker = ticker
cm.duration = duration
configFile, err := os.Open(collectConfigFile)
if err != nil {
log.Print(err.Error())
return err
}
defer configFile.Close()
jsonParser := json.NewDecoder(configFile)
err = jsonParser.Decode(&cm.config)
if err != nil {
log.Print(err.Error())
return err
}
for k, cfg := range cm.config {
log.Print(k, " ", cfg)
if _, found := AvailableCollectors[k]; !found {
log.Print("[CollectorManager] SKIP unknown collector ", k)
continue
}
c := AvailableCollectors[k]
log.Print(err.Error())
return err
}
for k, cfg := range cm.config {
log.Print(k, " ", cfg)
if _, found := AvailableCollectors[k]; !found {
log.Print("[CollectorManager] SKIP unknown collector ", k)
continue
}
c := AvailableCollectors[k]
err = c.Init(cfg)
if err != nil {
log.Print("[CollectorManager] Collector ", k, "initialization failed: ", err.Error())
continue
}
cm.collectors = append(cm.collectors, c)
}
return nil
err = c.Init(cfg)
if err != nil {
log.Print("[CollectorManager] Collector ", k, "initialization failed: ", err.Error())
continue
}
cm.collectors = append(cm.collectors, c)
}
return nil
}
func (cm *collectorManager) Start() {
cm.wg.Add(1)
tick := make(chan time.Time)
cm.ticker.AddChannel(tick)
go func() {
for {
CollectorManagerLoop:
select {
case <- cm.done:
for _, c := range cm.collectors {
c.Close()
}
cm.wg.Done()
log.Print("[CollectorManager] DONE\n")
break CollectorManagerLoop
case t := <- tick:
for _, c := range cm.collectors {
CollectorManagerInputLoop:
select {
case <- cm.done:
for _, c := range cm.collectors {
c.Close()
}
cm.wg.Done()
log.Print("[CollectorManager] DONE\n")
break CollectorManagerInputLoop
default:
log.Print("[CollectorManager] ", c.Name(), " ", t)
c.Read(cm.duration, cm.output)
}
}
}
}
log.Print("[CollectorManager] EXIT\n")
}()
log.Print("[CollectorManager] STARTED\n")
cm.wg.Add(1)
tick := make(chan time.Time)
cm.ticker.AddChannel(tick)
go func() {
for {
CollectorManagerLoop:
select {
case <-cm.done:
for _, c := range cm.collectors {
c.Close()
}
cm.wg.Done()
log.Print("[CollectorManager] DONE\n")
break CollectorManagerLoop
case t := <-tick:
for _, c := range cm.collectors {
CollectorManagerInputLoop:
select {
case <-cm.done:
for _, c := range cm.collectors {
c.Close()
}
cm.wg.Done()
log.Print("[CollectorManager] DONE\n")
break CollectorManagerInputLoop
default:
log.Print("[CollectorManager] ", c.Name(), " ", t)
c.Read(cm.duration, cm.output)
}
}
}
}
log.Print("[CollectorManager] EXIT\n")
}()
log.Print("[CollectorManager] STARTED\n")
}
func (cm *collectorManager) AddOutput(output chan lp.CCMetric) {
cm.output = output
cm.output = output
}
func (cm *collectorManager) Close() {
cm.done <- true
log.Print("[CollectorManager] CLOSE")
cm.done <- true
log.Print("[CollectorManager] CLOSE")
}
func New(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) {
cm := &collectorManager{}
err := cm.Init(ticker, duration, wg, collectConfigFile)
if err != nil {
return nil, err
}
return cm, err
cm := &collectorManager{}
err := cm.Init(ticker, duration, wg, collectConfigFile)
if err != nil {
return nil, err
}
return cm, err
}

View File

@ -24,7 +24,7 @@ type CpustatCollector struct {
func (m *CpustatCollector) Init(config json.RawMessage) error {
m.name = "CpustatCollector"
m.setup()
m.meta = map[string]string{"source" : m.name, "group" : "CPU"}
m.meta = map[string]string{"source": m.name, "group": "CPU"}
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {

View File

@ -32,7 +32,7 @@ type CustomCmdCollector struct {
func (m *CustomCmdCollector) Init(config json.RawMessage) error {
var err error
m.name = "CustomCmdCollector"
m.meta = map[string]string{"source" : m.name, "group" : "Custom"}
m.meta = map[string]string{"source": m.name, "group": "Custom"}
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {

View File

@ -27,7 +27,7 @@ type DiskstatCollector struct {
func (m *DiskstatCollector) Init(config json.RawMessage) error {
var err error
m.name = "DiskstatCollector"
m.meta = map[string]string{"source" : m.name, "group" : "Disk"}
m.meta = map[string]string{"source": m.name, "group": "Disk"}
m.setup()
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)

View File

@ -54,7 +54,7 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error {
m.name = "InfinibandCollector"
m.use_perfquery = false
m.setup()
m.meta = map[string]string{"source" : m.name, "group" : "Network"}
m.meta = map[string]string{"source": m.name, "group": "Network"}
m.tags = map[string]string{"type": "node"}
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)

View File

@ -31,7 +31,7 @@ type IpmiCollector struct {
func (m *IpmiCollector) Init(config json.RawMessage) error {
m.name = "IpmiCollector"
m.setup()
m.meta = map[string]string{"source" : m.name, "group" : "IPMI"}
m.meta = map[string]string{"source": m.name, "group": "IPMI"}
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
@ -85,7 +85,7 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) {
y, err := lp.New(name, map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": v}, time.Now())
if err == nil {
y.AddMeta("unit", unit)
y.AddMeta("unit", unit)
output <- y
}
}
@ -112,9 +112,9 @@ func (m *IpmiCollector) readIpmiSensors(cmd string, output chan lp.CCMetric) {
name := strings.ToLower(strings.Replace(lv[1], " ", "_", -1))
y, err := lp.New(name, map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": v}, time.Now())
if err == nil {
if len(lv) > 4 {
y.AddMeta("unit", lv[4])
}
if len(lv) > 4 {
y.AddMeta("unit", lv[4])
}
output <- y
}
}

View File

@ -27,21 +27,21 @@ import (
type MetricScope int
const (
METRIC_SCOPE_HWTHREAD = iota
METRIC_SCOPE_SOCKET
METRIC_SCOPE_NUMA
METRIC_SCOPE_NODE
METRIC_SCOPE_HWTHREAD = iota
METRIC_SCOPE_SOCKET
METRIC_SCOPE_NUMA
METRIC_SCOPE_NODE
)
func (ms MetricScope) String() string {
return []string{"Head", "Shoulder", "Knee", "Toe"}[ms]
return []string{"Head", "Shoulder", "Knee", "Toe"}[ms]
}
type LikwidCollectorMetricConfig struct {
Name string `json:"name"`
Calc string `json:"calc"`
Scope MetricScope `json:"socket_scope"`
Publish bool `json:"publish"`
Name string `json:"name"`
Calc string `json:"calc"`
Scope MetricScope `json:"socket_scope"`
Publish bool `json:"publish"`
}
type LikwidCollectorEventsetConfig struct {
@ -127,13 +127,13 @@ func (m *LikwidCollector) Init(config json.RawMessage) error {
}
}
m.setup()
m.meta = map[string]string{"source" : m.name, "group" : "PerfCounter"}
m.meta = map[string]string{"source": m.name, "group": "PerfCounter"}
cpulist := CpuList()
m.cpulist = make([]C.int, len(cpulist))
slist := getSocketCpus()
m.sock2tid = make(map[int]int)
// m.numa2tid = make(map[int]int)
// m.numa2tid = make(map[int]int)
for i, c := range cpulist {
m.cpulist[i] = C.int(c)
if sid, found := slist[m.cpulist[i]]; found {
@ -264,7 +264,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric)
for sid, tid := range m.sock2tid {
y, err := lp.New(metric.Name,
map[string]string{"type": "socket",
"type-id": fmt.Sprintf("%d", int(sid))},
"type-id": fmt.Sprintf("%d", int(sid))},
m.meta,
map[string]interface{}{"value": m.mresults[i][tid][metric.Name]},
time.Now())
@ -276,7 +276,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric)
for tid, cpu := range m.cpulist {
y, err := lp.New(metric.Name,
map[string]string{"type": "cpu",
"type-id": fmt.Sprintf("%d", int(cpu))},
"type-id": fmt.Sprintf("%d", int(cpu))},
m.meta,
map[string]interface{}{"value": m.mresults[i][tid][metric.Name]},
time.Now())
@ -295,7 +295,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric)
for sid, tid := range m.sock2tid {
y, err := lp.New(metric.Name,
map[string]string{"type": "socket",
"type-id": fmt.Sprintf("%d", int(sid))},
"type-id": fmt.Sprintf("%d", int(sid))},
m.meta,
map[string]interface{}{"value": m.gmresults[tid][metric.Name]},
time.Now())
@ -307,7 +307,7 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric)
for tid, cpu := range m.cpulist {
y, err := lp.New(metric.Name,
map[string]string{"type": "cpu",
"type-id": fmt.Sprintf("%d", int(cpu))},
"type-id": fmt.Sprintf("%d", int(cpu))},
m.meta,
map[string]interface{}{"value": m.gmresults[tid][metric.Name]},
time.Now())

View File

@ -32,7 +32,7 @@ func (m *LoadavgCollector) Init(config json.RawMessage) error {
return err
}
}
m.meta = map[string]string{"source" : m.name, "group" : "LOAD"}
m.meta = map[string]string{"source": m.name, "group": "LOAD"}
m.tags = map[string]string{"type": "node"}
m.load_matches = []string{"load_one", "load_five", "load_fifteen"}
m.proc_matches = []string{"proc_run", "proc_total"}

View File

@ -37,7 +37,7 @@ func (m *LustreCollector) Init(config json.RawMessage) error {
}
m.setup()
m.tags = map[string]string{"type": "node"}
m.meta = map[string]string{"source" : m.name, "group" : "Lustre"}
m.meta = map[string]string{"source": m.name, "group": "Lustre"}
m.matches = map[string]map[string]int{"read_bytes": {"read_bytes": 6, "read_requests": 1},
"write_bytes": {"write_bytes": 6, "write_requests": 1},
"open": {"open": 1},
@ -90,9 +90,9 @@ func (m *LustreCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y, err := lp.New(name, m.tags, m.meta, map[string]interface{}{"value": x}, time.Now())
if err == nil {
if strings.Contains(name, "byte") {
y.AddMeta("unit", "Byte")
}
if strings.Contains(name, "byte") {
y.AddMeta("unit", "Byte")
}
output <- y
}
}

View File

@ -35,7 +35,7 @@ func (m *MemstatCollector) Init(config json.RawMessage) error {
return err
}
}
m.meta = map[string]string{"source" : m.name, "group" : "Memory", "unit": "kByte"}
m.meta = map[string]string{"source": m.name, "group": "Memory", "unit": "kByte"}
m.stats = make(map[string]int64)
m.matches = make(map[string]string)
m.tags = map[string]string{"type": "node"}

View File

@ -1,6 +1,7 @@
package collectors
import (
"encoding/json"
"errors"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influx "github.com/influxdata/line-protocol"
@ -9,7 +10,6 @@ import (
"strconv"
"strings"
"time"
"encoding/json"
)
type MetricCollector interface {
@ -21,10 +21,10 @@ type MetricCollector interface {
}
type metricCollector struct {
output chan lp.CCMetric
name string
init bool
meta map[string]string
output chan lp.CCMetric
name string
init bool
meta map[string]string
}
func (c *metricCollector) Name() string {

View File

@ -25,7 +25,7 @@ type NetstatCollector struct {
func (m *NetstatCollector) Init(config json.RawMessage) error {
m.name = "NetstatCollector"
m.setup()
m.meta = map[string]string{"source" : m.name, "group" : "Memory"}
m.meta = map[string]string{"source": m.name, "group": "Memory"}
m.matches = map[int]string{
1: "bytes_in",
9: "bytes_out",
@ -75,12 +75,12 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMetric)
if err == nil {
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": int(float64(v) * 1.0e-3)}, time.Now())
if err == nil {
switch {
case strings.Contains(name, "byte"):
y.AddMeta("unit", "Byte")
case strings.Contains(name, "pkt"):
y.AddMeta("unit", "Packets")
}
switch {
case strings.Contains(name, "byte"):
y.AddMeta("unit", "Byte")
case strings.Contains(name, "pkt"):
y.AddMeta("unit", "Packets")
}
output <- y
}
}

View File

@ -4,8 +4,8 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/NVIDIA/go-nvml/pkg/nvml"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"github.com/NVIDIA/go-nvml/pkg/nvml"
"log"
"time"
)
@ -32,7 +32,7 @@ func (m *NvidiaCollector) Init(config json.RawMessage) error {
var err error
m.name = "NvidiaCollector"
m.setup()
m.meta = map[string]string{"source" : m.name, "group" : "Nvidia"}
m.meta = map[string]string{"source": m.name, "group": "Nvidia"}
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {
@ -91,14 +91,14 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
_, skip = stringArrayContains(m.config.ExcludeMetrics, "mem_total")
y, err := lp.New("mem_total", tags, m.meta, map[string]interface{}{"value": t}, time.Now())
if err == nil && !skip {
y.AddMeta("unit", "MByte")
y.AddMeta("unit", "MByte")
output <- y
}
f := float64(meminfo.Used) / (1024 * 1024)
_, skip = stringArrayContains(m.config.ExcludeMetrics, "fb_memory")
y, err = lp.New("fb_memory", tags, m.meta, map[string]interface{}{"value": f}, time.Now())
if err == nil && !skip {
y.AddMeta("unit", "MByte")
y.AddMeta("unit", "MByte")
output <- y
}
}
@ -108,7 +108,7 @@ func (m *NvidiaCollector) Read(interval time.Duration, output chan lp.CCMetric)
_, skip = stringArrayContains(m.config.ExcludeMetrics, "temp")
y, err := lp.New("temp", tags, m.meta, map[string]interface{}{"value": float64(temp)}, time.Now())
if err == nil && !skip {
y.AddMeta("unit", "degC")
y.AddMeta("unit", "degC")
output <- y
}
}

View File

@ -5,12 +5,12 @@ import (
"fmt"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"io/ioutil"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"log"
)
const HWMON_PATH = `/sys/class/hwmon`
@ -28,7 +28,7 @@ type TempCollector struct {
func (m *TempCollector) Init(config json.RawMessage) error {
m.name = "TempCollector"
m.setup()
m.meta = map[string]string{"source" : m.name, "group" : "IPMI", "unit": "degC"}
m.meta = map[string]string{"source": m.name, "group": "IPMI", "unit": "degC"}
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
@ -90,10 +90,10 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) {
break
}
}
mname := strings.Replace(name, " ", "_", -1)
if !strings.Contains(mname, "temp") {
mname = fmt.Sprintf("temp_%s", mname)
}
mname := strings.Replace(name, " ", "_", -1)
if !strings.Contains(mname, "temp") {
mname = fmt.Sprintf("temp_%s", mname)
}
buffer, err := ioutil.ReadFile(string(file))
if err != nil {
continue
@ -102,7 +102,7 @@ func (m *TempCollector) Read(interval time.Duration, output chan lp.CCMetric) {
if err == nil {
y, err := lp.New(strings.ToLower(mname), tags, m.meta, map[string]interface{}{"value": int(float64(x) / 1000)}, time.Now())
if err == nil {
log.Print("[", m.name, "] ",y)
log.Print("[", m.name, "] ", y)
output <- y
}
}

View File

@ -28,7 +28,7 @@ func (m *TopProcsCollector) Init(config json.RawMessage) error {
var err error
m.name = "TopProcsCollector"
m.tags = map[string]string{"type": "node"}
m.meta = map[string]string{"source" : m.name, "group" : "TopProcs"}
m.meta = map[string]string{"source": m.name, "group": "TopProcs"}
if len(config) > 0 {
err = json.Unmarshal(config, &m.config)
if err != nil {

1
go.sum
View File

@ -1,7 +1,6 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/NVIDIA/go-nvml v0.11.1-0 h1:XHSz3zZKC4NCP2ja1rI7++DXFhA+uDhdYa3MykCTGHY=
github.com/NVIDIA/go-nvml v0.11.1-0/go.mod h1:hy7HYeQy335x6nEss0Ne3PYqleRa6Ct+VKD9RQ4nyFs=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=

View File

@ -1,10 +1,10 @@
package ccmetric
import (
lp "github.com/influxdata/line-protocol" // MIT license
"time"
"sort"
"fmt"
"fmt"
lp "github.com/influxdata/line-protocol" // MIT license
"sort"
"time"
)
// Most functions are derived from github.com/influxdata/line-protocol/metric.go
@ -12,18 +12,18 @@ import (
// type.
type ccMetric struct {
name string
name string
tags []*lp.Tag
fields []*lp.Field
tm time.Time
meta []*lp.Tag
meta []*lp.Tag
}
type CCMetric interface {
lp.MutableMetric
AddMeta(key, value string)
MetaList() []*lp.Tag
RemoveTag(key string)
lp.MutableMetric
AddMeta(key, value string)
MetaList() []*lp.Tag
RemoveTag(key string)
}
func (m *ccMetric) Meta() map[string]string {
@ -187,9 +187,6 @@ func (m *ccMetric) AddField(key string, value interface{}) {
m.fields = append(m.fields, &lp.Field{Key: key, Value: convertField(value)})
}
func New(
name string,
tags map[string]string,
@ -202,7 +199,7 @@ func New(
tags: nil,
fields: nil,
tm: tm,
meta: nil,
meta: nil,
}
if len(tags) > 0 {

View File

@ -1,36 +1,36 @@
package metricRouter
import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"sync"
"log"
"encoding/json"
"os"
"time"
"gopkg.in/Knetic/govaluate.v2"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
"encoding/json"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
"gopkg.in/Knetic/govaluate.v2"
"log"
"os"
"sync"
"time"
)
type metricRounterTagConfig struct {
Key string `json:"key"`
Value string `json:"value"`
Condition string `json:"if"`
Key string `json:"key"`
Value string `json:"value"`
Condition string `json:"if"`
}
type metricRouterConfig struct {
AddTags []metricRounterTagConfig `json:"add_tags"`
DelTags []metricRounterTagConfig `json:"delete_tags"`
IntervalStamp bool `json:"interval_timestamp"`
AddTags []metricRounterTagConfig `json:"add_tags"`
DelTags []metricRounterTagConfig `json:"delete_tags"`
IntervalStamp bool `json:"interval_timestamp"`
}
type metricRouter struct {
inputs []chan lp.CCMetric
outputs []chan lp.CCMetric
done chan bool
wg *sync.WaitGroup
timestamp time.Time
ticker mct.MultiChanTicker
config metricRouterConfig
inputs []chan lp.CCMetric
outputs []chan lp.CCMetric
done chan bool
wg *sync.WaitGroup
timestamp time.Time
ticker mct.MultiChanTicker
config metricRouterConfig
}
type MetricRouter interface {
@ -41,62 +41,61 @@ type MetricRouter interface {
Close()
}
func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error {
r.inputs = make([]chan lp.CCMetric, 0)
r.outputs = make([]chan lp.CCMetric, 0)
r.done = make(chan bool)
r.wg = wg
r.ticker = ticker
configFile, err := os.Open(routerConfigFile)
if err != nil {
log.Print(err.Error())
return err
}
defer configFile.Close()
jsonParser := json.NewDecoder(configFile)
r.inputs = make([]chan lp.CCMetric, 0)
r.outputs = make([]chan lp.CCMetric, 0)
r.done = make(chan bool)
r.wg = wg
r.ticker = ticker
configFile, err := os.Open(routerConfigFile)
if err != nil {
log.Print(err.Error())
return err
}
defer configFile.Close()
jsonParser := json.NewDecoder(configFile)
err = jsonParser.Decode(&r.config)
if err != nil {
log.Print(err.Error())
return err
}
return nil
log.Print(err.Error())
return err
}
return nil
}
func (r *metricRouter) StartTimer() {
m := make(chan time.Time)
r.ticker.AddChannel(m)
go func() {
for {
select {
case t := <- m:
r.timestamp = t
}
}
}()
m := make(chan time.Time)
r.ticker.AddChannel(m)
go func() {
for {
select {
case t := <-m:
r.timestamp = t
}
}
}()
}
func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error){
expression, err := govaluate.NewEvaluableExpression(Cond)
if err != nil {
func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error) {
expression, err := govaluate.NewEvaluableExpression(Cond)
if err != nil {
log.Print(Cond, " = ", err.Error())
return false, err
}
params := make(map[string]interface{})
params["name"] = point.Name()
for _,t := range point.TagList() {
params[t.Key] = t.Value
for _, t := range point.TagList() {
params[t.Key] = t.Value
}
for _,m := range point.MetaList() {
params[m.Key] = m.Value
for _, m := range point.MetaList() {
params[m.Key] = m.Value
}
for _,f := range point.FieldList() {
params[f.Key] = f.Value
for _, f := range point.FieldList() {
params[f.Key] = f.Value
}
params["timestamp"] = point.Time()
result, err := expression.Evaluate(params)
if err != nil {
result, err := expression.Evaluate(params)
if err != nil {
log.Print(Cond, " = ", err.Error())
return false, err
}
@ -104,106 +103,106 @@ func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, erro
}
func (r *metricRouter) DoAddTags(point lp.CCMetric) {
for _, m := range r.config.AddTags {
var res bool
var err error
for _, m := range r.config.AddTags {
var res bool
var err error
if m.Condition == "*" {
res = true
err = nil
} else {
res, err = r.EvalCondition(m.Condition, point)
if err != nil {
log.Print(err.Error())
res = false
}
}
if res == true {
point.AddTag(m.Key, m.Value)
}
}
if m.Condition == "*" {
res = true
err = nil
} else {
res, err = r.EvalCondition(m.Condition, point)
if err != nil {
log.Print(err.Error())
res = false
}
}
if res == true {
point.AddTag(m.Key, m.Value)
}
}
}
func (r *metricRouter) DoDelTags(point lp.CCMetric) {
for _, m := range r.config.DelTags {
var res bool
var err error
if m.Condition == "*" {
res = true
err = nil
} else {
res, err = r.EvalCondition(m.Condition, point)
if err != nil {
log.Print(err.Error())
res = false
}
}
if res == true {
point.RemoveTag(m.Key)
}
}
for _, m := range r.config.DelTags {
var res bool
var err error
if m.Condition == "*" {
res = true
err = nil
} else {
res, err = r.EvalCondition(m.Condition, point)
if err != nil {
log.Print(err.Error())
res = false
}
}
if res == true {
point.RemoveTag(m.Key)
}
}
}
func (r *metricRouter) Start() {
r.wg.Add(1)
r.timestamp = time.Now()
if r.config.IntervalStamp == true {
r.StartTimer()
}
go func() {
for {
RouterLoop:
select {
case <- r.done:
log.Print("[MetricRouter] DONE\n")
r.wg.Done()
break RouterLoop
default:
for _, c := range r.inputs {
RouterInputLoop:
select {
case <- r.done:
log.Print("[MetricRouter] DONE\n")
r.wg.Done()
break RouterInputLoop
case p := <- c:
log.Print("[MetricRouter] FORWARD ",p)
r.DoAddTags(p)
r.DoDelTags(p)
if r.config.IntervalStamp == true {
p.SetTime(r.timestamp)
}
for _, o := range r.outputs {
o <- p
}
default:
}
}
}
}
log.Print("[MetricRouter] EXIT\n")
}()
log.Print("[MetricRouter] STARTED\n")
r.wg.Add(1)
r.timestamp = time.Now()
if r.config.IntervalStamp == true {
r.StartTimer()
}
go func() {
for {
RouterLoop:
select {
case <-r.done:
log.Print("[MetricRouter] DONE\n")
r.wg.Done()
break RouterLoop
default:
for _, c := range r.inputs {
RouterInputLoop:
select {
case <-r.done:
log.Print("[MetricRouter] DONE\n")
r.wg.Done()
break RouterInputLoop
case p := <-c:
log.Print("[MetricRouter] FORWARD ", p)
r.DoAddTags(p)
r.DoDelTags(p)
if r.config.IntervalStamp == true {
p.SetTime(r.timestamp)
}
for _, o := range r.outputs {
o <- p
}
default:
}
}
}
}
log.Print("[MetricRouter] EXIT\n")
}()
log.Print("[MetricRouter] STARTED\n")
}
func (r *metricRouter) AddInput(input chan lp.CCMetric) {
r.inputs = append(r.inputs, input)
r.inputs = append(r.inputs, input)
}
func (r *metricRouter) AddOutput(output chan lp.CCMetric) {
r.outputs = append(r.outputs, output)
r.outputs = append(r.outputs, output)
}
func (r *metricRouter) Close() {
r.done <- true
log.Print("[MetricRouter] CLOSE\n")
r.done <- true
log.Print("[MetricRouter] CLOSE\n")
}
func New(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) (MetricRouter, error) {
r := &metricRouter{}
err := r.Init(ticker, wg, routerConfigFile)
if err != nil {
return nil, err
}
return r, err
r := &metricRouter{}
err := r.Init(ticker, wg, routerConfigFile)
if err != nil {
return nil, err
}
return r, err
}

View File

@ -1,39 +1,39 @@
package multiChanTicker
import (
"time"
"time"
)
type multiChanTicker struct {
ticker *time.Ticker
channels []chan time.Time
ticker *time.Ticker
channels []chan time.Time
}
type MultiChanTicker interface {
Init(duration time.Duration)
AddChannel(chan time.Time)
Init(duration time.Duration)
AddChannel(chan time.Time)
}
func (t *multiChanTicker) Init(duration time.Duration) {
t.ticker = time.NewTicker(duration)
go func() {
for {
select {
case ts := <-t.ticker.C:
for _, c := range t.channels {
c <- ts
}
}
}
}()
t.ticker = time.NewTicker(duration)
go func() {
for {
select {
case ts := <-t.ticker.C:
for _, c := range t.channels {
c <- ts
}
}
}
}()
}
func (t *multiChanTicker) AddChannel(channel chan time.Time) {
t.channels = append(t.channels, channel)
t.channels = append(t.channels, channel)
}
func NewTicker(duration time.Duration) MultiChanTicker {
t := &multiChanTicker{}
t.Init(duration)
return t
t := &multiChanTicker{}
t.Init(duration)
return t
}

View File

@ -10,12 +10,12 @@ import (
"log"
"os"
"os/signal"
// "strings"
// "strings"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
"sync"
"time"
mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
)
// List of provided collectors. Which collector should be run can be
@ -48,17 +48,17 @@ import (
//}
type CentralConfigFile struct {
Interval int `json:"interval"`
Duration int `json:"duration"`
Pidfile string `json:"pidfile", omitempty`
CollectorConfigFile string `json:"collectors"`
RouterConfigFile string `json:"router"`
SinkConfigFile string `json:"sinks"`
ReceiverConfigFile string `json:"receivers", omitempty`
Interval int `json:"interval"`
Duration int `json:"duration"`
Pidfile string `json:"pidfile", omitempty`
CollectorConfigFile string `json:"collectors"`
RouterConfigFile string `json:"router"`
SinkConfigFile string `json:"sinks"`
ReceiverConfigFile string `json:"receivers", omitempty`
}
func LoadCentralConfiguration(file string, config *CentralConfigFile) error {
configFile, err := os.Open(file)
configFile, err := os.Open(file)
defer configFile.Close()
if err != nil {
fmt.Println(err.Error())
@ -70,29 +70,29 @@ func LoadCentralConfiguration(file string, config *CentralConfigFile) error {
}
type RuntimeConfig struct {
Hostname string
Interval time.Duration
Duration time.Duration
CliArgs map[string]string
ConfigFile CentralConfigFile
Hostname string
Interval time.Duration
Duration time.Duration
CliArgs map[string]string
ConfigFile CentralConfigFile
Router mr.MetricRouter
CollectManager collectors.CollectorManager
SinkManager sinks.SinkManager
ReceiveManager receivers.ReceiveManager
Ticker mct.MultiChanTicker
Router mr.MetricRouter
CollectManager collectors.CollectorManager
SinkManager sinks.SinkManager
ReceiveManager receivers.ReceiveManager
Ticker mct.MultiChanTicker
Channels []chan lp.CCMetric
Sync sync.WaitGroup
Channels []chan lp.CCMetric
Sync sync.WaitGroup
}
func prepare_runcfg() RuntimeConfig {
r := RuntimeConfig{}
r.Router = nil
r.CollectManager = nil
r.SinkManager = nil
r.ReceiveManager = nil
return r
r := RuntimeConfig{}
r.Router = nil
r.CollectManager = nil
r.SinkManager = nil
r.ReceiveManager = nil
return r
}
//// Structure of the configuration file
@ -177,26 +177,26 @@ func ReadCli() map[string]string {
func shutdown(config *RuntimeConfig) {
log.Print("Shutdown...")
if config.CollectManager != nil {
log.Print("Shutdown CollectManager...")
config.CollectManager.Close()
}
if config.ReceiveManager != nil {
log.Print("Shutdown ReceiveManager...")
config.ReceiveManager.Close()
}
if config.Router != nil {
log.Print("Shutdown Router...")
config.Router.Close()
}
log.Print("Shutdown CollectManager...")
config.CollectManager.Close()
}
if config.ReceiveManager != nil {
log.Print("Shutdown ReceiveManager...")
config.ReceiveManager.Close()
}
if config.Router != nil {
log.Print("Shutdown Router...")
config.Router.Close()
}
if config.SinkManager != nil {
log.Print("Shutdown SinkManager...")
config.SinkManager.Close()
}
log.Print("Shutdown SinkManager...")
config.SinkManager.Close()
}
// pidfile := config.ConfigFile.Pidfile
// RemovePidfile(pidfile)
// pidfile = config.CliArgs["pidfile"]
// RemovePidfile(pidfile)
// pidfile := config.ConfigFile.Pidfile
// RemovePidfile(pidfile)
// pidfile = config.CliArgs["pidfile"]
// RemovePidfile(pidfile)
config.Sync.Done()
}
@ -214,7 +214,7 @@ func prepare_shutdown(config *RuntimeConfig) {
}
func main() {
var err error
var err error
use_recv := false
rcfg := prepare_runcfg()
@ -231,194 +231,193 @@ func main() {
log.Print("Configuration value 'interval' must be greater than zero")
return
}
rcfg.Interval = time.Duration(rcfg.ConfigFile.Interval)*time.Second
rcfg.Interval = time.Duration(rcfg.ConfigFile.Interval) * time.Second
if rcfg.ConfigFile.Duration <= 0 || time.Duration(rcfg.ConfigFile.Duration)*time.Second <= 0 {
log.Print("Configuration value 'duration' must be greater than zero")
return
}
rcfg.Duration = time.Duration(rcfg.ConfigFile.Duration)*time.Second
rcfg.Duration = time.Duration(rcfg.ConfigFile.Duration) * time.Second
rcfg.Hostname, err = os.Hostname()
if err != nil {
log.Print(err.Error())
return
}
// err = CreatePidfile(rcfg.CliArgs["pidfile"])
// err = SetLogging(rcfg.CliArgs["logfile"])
// if err != nil {
// log.Print("Error setting up logging system to ", rcfg.CliArgs["logfile"], " on ", rcfg.Hostname)
// return
// }
// err = CreatePidfile(rcfg.CliArgs["pidfile"])
// err = SetLogging(rcfg.CliArgs["logfile"])
// if err != nil {
// log.Print("Error setting up logging system to ", rcfg.CliArgs["logfile"], " on ", rcfg.Hostname)
// return
// }
rcfg.Ticker = mct.NewTicker(rcfg.Interval)
if len(rcfg.ConfigFile.RouterConfigFile) > 0 {
rcfg.Router, err = mr.New(rcfg.Ticker, &rcfg.Sync, rcfg.ConfigFile.RouterConfigFile)
if err != nil {
log.Print(err.Error())
return
}
}
if len(rcfg.ConfigFile.SinkConfigFile) > 0 {
rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile)
if err != nil {
log.Print(err.Error())
return
}
RouterToSinksChannel := make(chan lp.CCMetric)
rcfg.SinkManager.AddInput(RouterToSinksChannel)
rcfg.Router.AddOutput(RouterToSinksChannel)
}
if len(rcfg.ConfigFile.CollectorConfigFile) > 0 {
rcfg.CollectManager, err = collectors.New(rcfg.Ticker, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile)
if err != nil {
log.Print(err.Error())
return
}
CollectToRouterChannel := make(chan lp.CCMetric)
rcfg.CollectManager.AddOutput(CollectToRouterChannel)
rcfg.Router.AddInput(CollectToRouterChannel)
}
if len(rcfg.ConfigFile.ReceiverConfigFile) > 0 {
rcfg.ReceiveManager, err = receivers.New(&rcfg.Sync, rcfg.ConfigFile.ReceiverConfigFile)
if err != nil {
log.Print(err.Error())
return
}
ReceiveToRouterChannel := make(chan lp.CCMetric)
rcfg.ReceiveManager.AddOutput(ReceiveToRouterChannel)
rcfg.Router.AddInput(ReceiveToRouterChannel)
use_recv = true
}
prepare_shutdown(&rcfg)
if len(rcfg.ConfigFile.RouterConfigFile) > 0 {
rcfg.Router, err = mr.New(rcfg.Ticker, &rcfg.Sync, rcfg.ConfigFile.RouterConfigFile)
if err != nil {
log.Print(err.Error())
return
}
}
if len(rcfg.ConfigFile.SinkConfigFile) > 0 {
rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile)
if err != nil {
log.Print(err.Error())
return
}
RouterToSinksChannel := make(chan lp.CCMetric)
rcfg.SinkManager.AddInput(RouterToSinksChannel)
rcfg.Router.AddOutput(RouterToSinksChannel)
}
if len(rcfg.ConfigFile.CollectorConfigFile) > 0 {
rcfg.CollectManager, err = collectors.New(rcfg.Ticker, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile)
if err != nil {
log.Print(err.Error())
return
}
CollectToRouterChannel := make(chan lp.CCMetric)
rcfg.CollectManager.AddOutput(CollectToRouterChannel)
rcfg.Router.AddInput(CollectToRouterChannel)
}
if len(rcfg.ConfigFile.ReceiverConfigFile) > 0 {
rcfg.ReceiveManager, err = receivers.New(&rcfg.Sync, rcfg.ConfigFile.ReceiverConfigFile)
if err != nil {
log.Print(err.Error())
return
}
ReceiveToRouterChannel := make(chan lp.CCMetric)
rcfg.ReceiveManager.AddOutput(ReceiveToRouterChannel)
rcfg.Router.AddInput(ReceiveToRouterChannel)
use_recv = true
}
prepare_shutdown(&rcfg)
rcfg.Sync.Add(1)
rcfg.Router.Start()
rcfg.SinkManager.Start()
rcfg.CollectManager.Start()
if use_recv {
rcfg.ReceiveManager.Start()
}
// if len(config.Collectors) == 0 {
// var keys []string
// for k := range Collectors {
// keys = append(keys, k)
// }
// log.Print("Configuration value 'collectors' does not contain any collector. Available: ", strings.Join(keys, ", "))
// return
// }
// for _, name := range config.Collectors {
// if _, found := Collectors[name]; !found {
// log.Print("Invalid collector '", name, "' in configuration")
// return
// }
// }
// if _, found := Sinks[config.Sink.Type]; !found {
// log.Print("Invalid sink type '", config.Sink.Type, "' in configuration")
// return
// }
// // Setup sink
// sink := Sinks[config.Sink.Type]
// err = sink.Init(config.Sink)
// if err != nil {
// log.Print(err)
// return
// }
// sinkChannel := make(chan bool)
// mproxy.Init(sinkChannel, &wg)
// // Setup receiver
// if len(config.Receiver.Type) > 0 && config.Receiver.Type != "none" {
// if _, found := Receivers[config.Receiver.Type]; !found {
// log.Print("Invalid receiver type '", config.Receiver.Type, "' in configuration")
// return
// } else {
// recv = Receivers[config.Receiver.Type]
// err = recv.Init(config.Receiver, sink)
// if err == nil {
// use_recv = true
// } else {
// log.Print(err)
// }
// }
// }
rcfg.ReceiveManager.Start()
}
// if len(config.Collectors) == 0 {
// var keys []string
// for k := range Collectors {
// keys = append(keys, k)
// }
// log.Print("Configuration value 'collectors' does not contain any collector. Available: ", strings.Join(keys, ", "))
// return
// }
// for _, name := range config.Collectors {
// if _, found := Collectors[name]; !found {
// log.Print("Invalid collector '", name, "' in configuration")
// return
// }
// }
// if _, found := Sinks[config.Sink.Type]; !found {
// log.Print("Invalid sink type '", config.Sink.Type, "' in configuration")
// return
// }
// // Setup sink
// sink := Sinks[config.Sink.Type]
// err = sink.Init(config.Sink)
// if err != nil {
// log.Print(err)
// return
// }
// sinkChannel := make(chan bool)
// mproxy.Init(sinkChannel, &wg)
// // Setup receiver
// if len(config.Receiver.Type) > 0 && config.Receiver.Type != "none" {
// if _, found := Receivers[config.Receiver.Type]; !found {
// log.Print("Invalid receiver type '", config.Receiver.Type, "' in configuration")
// return
// } else {
// recv = Receivers[config.Receiver.Type]
// err = recv.Init(config.Receiver, sink)
// if err == nil {
// use_recv = true
// } else {
// log.Print(err)
// }
// }
// }
// // Register interrupt handler
// prepare_shutdown(&wg, &config, sink, recv, clicfg["pidfile"])
// // Register interrupt handler
// prepare_shutdown(&wg, &config, sink, recv, clicfg["pidfile"])
// // Initialize all collectors
// tmp := make([]string, 0)
// for _, c := range config.Collectors {
// col := Collectors[c]
// conf, found := config.CollectConfigs[c]
// if !found {
// conf = json.RawMessage("")
// }
// err = col.Init([]byte(conf))
// if err != nil {
// log.Print("SKIP ", col.Name(), " (", err.Error(), ")")
// } else if !col.Initialized() {
// log.Print("SKIP ", col.Name(), " (Not initialized)")
// } else {
// log.Print("Start ", col.Name())
// tmp = append(tmp, c)
// }
// }
// config.Collectors = tmp
// config.DefTags["hostname"] = host
// // Initialize all collectors
// tmp := make([]string, 0)
// for _, c := range config.Collectors {
// col := Collectors[c]
// conf, found := config.CollectConfigs[c]
// if !found {
// conf = json.RawMessage("")
// }
// err = col.Init([]byte(conf))
// if err != nil {
// log.Print("SKIP ", col.Name(), " (", err.Error(), ")")
// } else if !col.Initialized() {
// log.Print("SKIP ", col.Name(), " (Not initialized)")
// } else {
// log.Print("Start ", col.Name())
// tmp = append(tmp, c)
// }
// }
// config.Collectors = tmp
// config.DefTags["hostname"] = host
// // Setup up ticker loop
// if clicfg["once"] != "true" {
// log.Print("Running loop every ", time.Duration(config.Interval)*time.Second)
// } else {
// log.Print("Running loop only once")
// }
// ticker := time.NewTicker(time.Duration(config.Interval) * time.Second)
// done := make(chan bool)
// // Setup up ticker loop
// if clicfg["once"] != "true" {
// log.Print("Running loop every ", time.Duration(config.Interval)*time.Second)
// } else {
// log.Print("Running loop only once")
// }
// ticker := time.NewTicker(time.Duration(config.Interval) * time.Second)
// done := make(chan bool)
// // Storage for all node metrics
// tmpPoints := make([]lp.MutableMetric, 0)
// // Storage for all node metrics
// tmpPoints := make([]lp.MutableMetric, 0)
// // Start receiver
// if use_recv {
// recv.Start()
// }
// // Start receiver
// if use_recv {
// recv.Start()
// }
// go func() {
// for {
// select {
// case <-done:
// return
// case t := <-ticker.C:
// go func() {
// for {
// select {
// case <-done:
// return
// case t := <-ticker.C:
// // Read all collectors are sort the results in the right
// // storage locations
// for _, c := range config.Collectors {
// col := Collectors[c]
// col.Read(time.Duration(config.Duration), &tmpPoints)
// // Read all collectors are sort the results in the right
// // storage locations
// for _, c := range config.Collectors {
// col := Collectors[c]
// col.Read(time.Duration(config.Duration), &tmpPoints)
// for {
// if len(tmpPoints) == 0 {
// break
// }
// p := tmpPoints[0]
// for k, v := range config.DefTags {
// p.AddTag(k, v)
// p.SetTime(t)
// }
// sink.Write(p)
// tmpPoints = tmpPoints[1:]
// }
// }
// for {
// if len(tmpPoints) == 0 {
// break
// }
// p := tmpPoints[0]
// for k, v := range config.DefTags {
// p.AddTag(k, v)
// p.SetTime(t)
// }
// sink.Write(p)
// tmpPoints = tmpPoints[1:]
// }
// }
// if err := sink.Flush(); err != nil {
// log.Printf("sink error: %s\n", err)
// }
// if clicfg["once"] == "true" {
// shutdown(&wg, config.Collectors, sink, recv, clicfg["pidfile"])
// return
// }
// }
// }
// }()
// if err := sink.Flush(); err != nil {
// log.Printf("sink error: %s\n", err)
// }
// if clicfg["once"] == "true" {
// shutdown(&wg, config.Collectors, sink, recv, clicfg["pidfile"])
// return
// }
// }
// }
// }()
// Wait until receiving an interrupt
rcfg.Sync.Wait()

View File

@ -2,17 +2,16 @@ package receivers
import (
// "time"
influx "github.com/influxdata/line-protocol"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influx "github.com/influxdata/line-protocol"
)
type ReceiverConfig struct {
Addr string `json:"address"`
Port string `json:"port"`
Database string `json:"database"`
Addr string `json:"address"`
Port string `json:"port"`
Database string `json:"database"`
Organization string `json:"organization", omitempty`
Type string `json:"type"`
Type string `json:"type"`
}
type receiver struct {
@ -37,7 +36,7 @@ func (r *receiver) Name() string {
}
func (r *receiver) SetSink(sink chan lp.CCMetric) {
r.sink = sink
r.sink = sink
}
func Tags2Map(metric influx.Metric) map[string]string {

View File

@ -2,16 +2,16 @@ package receivers
import (
"errors"
influx "github.com/influxdata/line-protocol"
"fmt"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influx "github.com/influxdata/line-protocol"
nats "github.com/nats-io/nats.go"
"log"
"time"
"fmt"
)
type NatsReceiverConfig struct {
Addr string `json:"address"`
Addr string `json:"address"`
Port string `json:"port"`
Database string `json:"database"`
}
@ -37,7 +37,7 @@ func (r *NatsReceiver) Init(config ReceiverConfig) error {
len(r.config.Database) == 0 {
return errors.New("Not all configuration variables set required by NatsReceiver")
}
r.meta = map[string]string{"source" : r.name}
r.meta = map[string]string{"source": r.name}
r.addr = r.config.Addr
if len(r.addr) == 0 {
r.addr = nats.DefaultURL
@ -71,10 +71,10 @@ func (r *NatsReceiver) _NatsReceive(m *nats.Msg) {
metrics, err := r.parser.Parse(m.Data)
if err == nil {
for _, m := range metrics {
y := lp.FromInfluxMetric(m)
for k, v := range r.meta {
y.AddMeta(k, v)
}
y := lp.FromInfluxMetric(m)
for k, v := range r.meta {
y.AddMeta(k, v)
}
//y, err := lp.New(m.Name(), Tags2Map(m), r.meta, Fields2Map(m), m.Time())
if r.sink != nil {
r.sink <- y
@ -89,4 +89,3 @@ func (r *NatsReceiver) Close() {
r.nc.Close()
}
}

View File

@ -1,24 +1,23 @@
package receivers
import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"sync"
"log"
"os"
"encoding/json"
"encoding/json"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"log"
"os"
"sync"
)
var AvailableReceivers = map[string]Receiver{
"nats": &NatsReceiver{},
}
type receiveManager struct {
inputs []Receiver
output chan lp.CCMetric
done chan bool
wg *sync.WaitGroup
config []ReceiverConfig
inputs []Receiver
output chan lp.CCMetric
done chan bool
wg *sync.WaitGroup
config []ReceiverConfig
}
type ReceiveManager interface {
@ -29,128 +28,126 @@ type ReceiveManager interface {
Close()
}
func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) error {
rm.inputs = make([]Receiver, 0)
rm.output = nil
rm.done = make(chan bool)
rm.wg = wg
rm.config = make([]ReceiverConfig, 0)
configFile, err := os.Open(receiverConfigFile)
if err != nil {
log.Print(err.Error())
return err
}
defer configFile.Close()
jsonParser := json.NewDecoder(configFile)
var rawConfigs []json.RawMessage
rm.inputs = make([]Receiver, 0)
rm.output = nil
rm.done = make(chan bool)
rm.wg = wg
rm.config = make([]ReceiverConfig, 0)
configFile, err := os.Open(receiverConfigFile)
if err != nil {
log.Print(err.Error())
return err
}
defer configFile.Close()
jsonParser := json.NewDecoder(configFile)
var rawConfigs []json.RawMessage
err = jsonParser.Decode(&rawConfigs)
if err != nil {
log.Print(err.Error())
return err
}
for _, raw := range rawConfigs {
log.Print("[ReceiveManager] ", string(raw))
rm.AddInput(raw)
// if _, found := AvailableReceivers[k.Type]; !found {
// log.Print("[ReceiveManager] SKIP Config specifies unknown receiver 'type': ", k.Type)
// continue
// }
// r := AvailableReceivers[k.Type]
// err = r.Init(k)
// if err != nil {
// log.Print("[ReceiveManager] SKIP Receiver ", k.Type, " cannot be initialized: ", err.Error())
// continue
// }
// rm.inputs = append(rm.inputs, r)
}
return nil
log.Print(err.Error())
return err
}
for _, raw := range rawConfigs {
log.Print("[ReceiveManager] ", string(raw))
rm.AddInput(raw)
// if _, found := AvailableReceivers[k.Type]; !found {
// log.Print("[ReceiveManager] SKIP Config specifies unknown receiver 'type': ", k.Type)
// continue
// }
// r := AvailableReceivers[k.Type]
// err = r.Init(k)
// if err != nil {
// log.Print("[ReceiveManager] SKIP Receiver ", k.Type, " cannot be initialized: ", err.Error())
// continue
// }
// rm.inputs = append(rm.inputs, r)
}
return nil
}
func (rm *receiveManager) Start() {
rm.wg.Add(1)
rm.wg.Add(1)
for _, r := range rm.inputs {
log.Print("[ReceiveManager] START ", r.Name())
r.Start()
}
log.Print("[ReceiveManager] STARTED\n")
// go func() {
// for {
//ReceiveManagerLoop:
// select {
// case <- rm.done:
// log.Print("ReceiveManager done\n")
// rm.wg.Done()
// break ReceiveManagerLoop
// default:
// for _, c := range rm.inputs {
//ReceiveManagerInputLoop:
// select {
// case <- rm.done:
// log.Print("ReceiveManager done\n")
// rm.wg.Done()
// break ReceiveManagerInputLoop
// case p := <- c:
// log.Print("ReceiveManager: ", p)
// rm.output <- p
// default:
// }
// }
// }
// }
// }()
// for _, r := range rm.inputs {
// r.Close()
// }
for _, r := range rm.inputs {
log.Print("[ReceiveManager] START ", r.Name())
r.Start()
}
log.Print("[ReceiveManager] STARTED\n")
// go func() {
// for {
//ReceiveManagerLoop:
// select {
// case <- rm.done:
// log.Print("ReceiveManager done\n")
// rm.wg.Done()
// break ReceiveManagerLoop
// default:
// for _, c := range rm.inputs {
//ReceiveManagerInputLoop:
// select {
// case <- rm.done:
// log.Print("ReceiveManager done\n")
// rm.wg.Done()
// break ReceiveManagerInputLoop
// case p := <- c:
// log.Print("ReceiveManager: ", p)
// rm.output <- p
// default:
// }
// }
// }
// }
// }()
// for _, r := range rm.inputs {
// r.Close()
// }
}
func (rm *receiveManager) AddInput(rawConfig json.RawMessage) error {
var config ReceiverConfig
err := json.Unmarshal(rawConfig, &config)
var config ReceiverConfig
err := json.Unmarshal(rawConfig, &config)
if err != nil {
log.Print("[ReceiveManager] SKIP ", config.Type, " JSON config error: ", err.Error())
log.Print(err.Error())
return err
log.Print("[ReceiveManager] SKIP ", config.Type, " JSON config error: ", err.Error())
log.Print(err.Error())
return err
}
if _, found := AvailableReceivers[config.Type]; !found {
log.Print("[ReceiveManager] SKIP ", config.Type, " unknown receiver: ", err.Error())
return err
}
r := AvailableReceivers[config.Type]
err = r.Init(config)
if err != nil {
log.Print("[ReceiveManager] SKIP ", r.Name(), " initialization failed: ", err.Error())
return err
if _, found := AvailableReceivers[config.Type]; !found {
log.Print("[ReceiveManager] SKIP ", config.Type, " unknown receiver: ", err.Error())
return err
}
rm.inputs = append(rm.inputs, r)
rm.config = append(rm.config, config)
return nil
r := AvailableReceivers[config.Type]
err = r.Init(config)
if err != nil {
log.Print("[ReceiveManager] SKIP ", r.Name(), " initialization failed: ", err.Error())
return err
}
rm.inputs = append(rm.inputs, r)
rm.config = append(rm.config, config)
return nil
}
func (rm *receiveManager) AddOutput(output chan lp.CCMetric) {
rm.output = output
for _, r := range rm.inputs {
r.SetSink(rm.output)
}
rm.output = output
for _, r := range rm.inputs {
r.SetSink(rm.output)
}
}
func (rm *receiveManager) Close() {
for _, r := range rm.inputs {
log.Print("[ReceiveManager] CLOSE ", r.Name())
r.Close()
}
rm.wg.Done()
log.Print("[ReceiveManager] CLOSE\n")
log.Print("[ReceiveManager] EXIT\n")
for _, r := range rm.inputs {
log.Print("[ReceiveManager] CLOSE ", r.Name())
r.Close()
}
rm.wg.Done()
log.Print("[ReceiveManager] CLOSE\n")
log.Print("[ReceiveManager] EXIT\n")
}
func New(wg *sync.WaitGroup, receiverConfigFile string) (ReceiveManager, error) {
r := &receiveManager{}
err := r.Init(wg, receiverConfigFile)
if err != nil {
return nil, err
}
return r, err
r := &receiveManager{}
err := r.Init(wg, receiverConfigFile)
if err != nil {
return nil, err
}
return r, err
}

View File

@ -7,8 +7,8 @@ import (
"net/http"
"time"
influx "github.com/influxdata/line-protocol"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influx "github.com/influxdata/line-protocol"
)
type HttpSink struct {
@ -20,7 +20,7 @@ type HttpSink struct {
}
func (s *HttpSink) Init(config sinkConfig) error {
s.name = "HttpSink"
s.name = "HttpSink"
if len(config.Host) == 0 || len(config.Port) == 0 || len(config.Database) == 0 {
return errors.New("`host`, `port` and `database` config options required for TCP sink")
}

View File

@ -39,7 +39,7 @@ func (s *InfluxSink) connect() error {
}
func (s *InfluxSink) Init(config sinkConfig) error {
s.name = "InfluxSink"
s.name = "InfluxSink"
if len(config.Host) == 0 ||
len(config.Port) == 0 ||
len(config.Database) == 0 ||
@ -65,9 +65,9 @@ func (s *InfluxSink) Write(point lp.CCMetric) error {
tags[t.Key] = t.Value
}
if s.meta_as_tags {
for _, m := range point.MetaList() {
tags[m.Key] = m.Value
}
for _, m := range point.MetaList() {
tags[m.Key] = m.Value
}
}
for _, f := range point.FieldList() {
fields[f.Key] = f.Value

View File

@ -6,7 +6,7 @@ import (
)
type sinkConfig struct {
Type string `json:"type"`
Type string `json:"type"`
Host string `json:"host", omitempty`
Port string `json:"port", omitempty`
Database string `json:"database, omitempty"`

View File

@ -4,8 +4,8 @@ import (
"bytes"
"errors"
"fmt"
influx "github.com/influxdata/line-protocol"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influx "github.com/influxdata/line-protocol"
nats "github.com/nats-io/nats.go"
"log"
"time"
@ -33,7 +33,7 @@ func (s *NatsSink) connect() error {
}
func (s *NatsSink) Init(config sinkConfig) error {
s.name = "NatsSink"
s.name = "NatsSink"
if len(config.Host) == 0 ||
len(config.Port) == 0 ||
len(config.Database) == 0 {

View File

@ -1,17 +1,16 @@
package sinks
import (
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"sync"
"log"
"os"
"encoding/json"
"encoding/json"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
"log"
"os"
"sync"
)
type SinkEntity struct {
config json.RawMessage
output Sink
config json.RawMessage
output Sink
}
var AvailableSinks = map[string]Sink{
@ -21,13 +20,12 @@ var AvailableSinks = map[string]Sink{
"http": &HttpSink{},
}
type sinkManager struct {
input chan lp.CCMetric
outputs []Sink
done chan bool
wg *sync.WaitGroup
config []sinkConfig
input chan lp.CCMetric
outputs []Sink
done chan bool
wg *sync.WaitGroup
config []sinkConfig
}
type SinkManager interface {
@ -38,114 +36,111 @@ type SinkManager interface {
Close()
}
func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
sm.input = nil
sm.outputs = make([]Sink, 0)
sm.done = make(chan bool)
sm.wg = wg
sm.config = make([]sinkConfig, 0)
if len(sinkConfigFile) > 0 {
configFile, err := os.Open(sinkConfigFile)
if err != nil {
log.Print("[SinkManager] ", err.Error())
return err
}
defer configFile.Close()
jsonParser := json.NewDecoder(configFile)
var rawConfigs []json.RawMessage
err = jsonParser.Decode(&rawConfigs)
if err != nil {
log.Print("[SinkManager] ", err.Error())
return err
}
for _, raw := range rawConfigs {
err = sm.AddOutput(raw)
if err != nil {
continue
}
}
}
return nil
sm.input = nil
sm.outputs = make([]Sink, 0)
sm.done = make(chan bool)
sm.wg = wg
sm.config = make([]sinkConfig, 0)
if len(sinkConfigFile) > 0 {
configFile, err := os.Open(sinkConfigFile)
if err != nil {
log.Print("[SinkManager] ", err.Error())
return err
}
defer configFile.Close()
jsonParser := json.NewDecoder(configFile)
var rawConfigs []json.RawMessage
err = jsonParser.Decode(&rawConfigs)
if err != nil {
log.Print("[SinkManager] ", err.Error())
return err
}
for _, raw := range rawConfigs {
err = sm.AddOutput(raw)
if err != nil {
continue
}
}
}
return nil
}
func (sm *sinkManager) Start() {
sm.wg.Add(1)
batchcount := 20
go func() {
for {
SinkManagerLoop:
select {
case <- sm.done:
for _, s := range sm.outputs {
s.Close()
}
log.Print("[SinkManager] DONE\n")
sm.wg.Done()
break SinkManagerLoop
case p := <- sm.input:
log.Print("[SinkManager] WRITE ", p)
for _, s := range sm.outputs {
s.Write(p)
}
if (batchcount == 0) {
log.Print("[SinkManager] FLUSH")
for _, s := range sm.outputs {
s.Flush()
}
batchcount = 20
}
batchcount--
default:
}
}
log.Print("[SinkManager] EXIT\n")
}()
log.Print("[SinkManager] STARTED\n")
sm.wg.Add(1)
batchcount := 20
go func() {
for {
SinkManagerLoop:
select {
case <-sm.done:
for _, s := range sm.outputs {
s.Close()
}
log.Print("[SinkManager] DONE\n")
sm.wg.Done()
break SinkManagerLoop
case p := <-sm.input:
log.Print("[SinkManager] WRITE ", p)
for _, s := range sm.outputs {
s.Write(p)
}
if batchcount == 0 {
log.Print("[SinkManager] FLUSH")
for _, s := range sm.outputs {
s.Flush()
}
batchcount = 20
}
batchcount--
default:
}
}
log.Print("[SinkManager] EXIT\n")
}()
log.Print("[SinkManager] STARTED\n")
}
func (sm *sinkManager) AddInput(input chan lp.CCMetric) {
sm.input = input
sm.input = input
}
func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error {
var err error
var config sinkConfig
if len(rawConfig) > 3 {
err = json.Unmarshal(rawConfig, &config)
if err != nil {
log.Print("[SinkManager] SKIP ", config.Type, " JSON config error: ", err.Error())
return err
}
}
if _, found := AvailableSinks[config.Type]; !found {
log.Print("[SinkManager] SKIP ", config.Type, " unknown sink: ", err.Error())
return err
}
s := AvailableSinks[config.Type]
err = s.Init(config)
if err != nil {
log.Print("[SinkManager] SKIP ", s.Name(), " initialization failed: ", err.Error())
return err
}
sm.outputs = append(sm.outputs, s)
sm.config = append(sm.config, config)
return nil
var err error
var config sinkConfig
if len(rawConfig) > 3 {
err = json.Unmarshal(rawConfig, &config)
if err != nil {
log.Print("[SinkManager] SKIP ", config.Type, " JSON config error: ", err.Error())
return err
}
}
if _, found := AvailableSinks[config.Type]; !found {
log.Print("[SinkManager] SKIP ", config.Type, " unknown sink: ", err.Error())
return err
}
s := AvailableSinks[config.Type]
err = s.Init(config)
if err != nil {
log.Print("[SinkManager] SKIP ", s.Name(), " initialization failed: ", err.Error())
return err
}
sm.outputs = append(sm.outputs, s)
sm.config = append(sm.config, config)
return nil
}
func (sm *sinkManager) Close() {
sm.done <- true
log.Print("[SinkManager] CLOSE")
return
sm.done <- true
log.Print("[SinkManager] CLOSE")
return
}
func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) {
sm := &sinkManager{}
err := sm.Init(wg, sinkConfigFile)
if err != nil {
return nil, err
}
return sm, err
sm := &sinkManager{}
err := sm.Init(wg, sinkConfigFile)
if err != nil {
return nil, err
}
return sm, err
}

View File

@ -14,8 +14,8 @@ type StdoutSink struct {
}
func (s *StdoutSink) Init(config sinkConfig) error {
s.name = "StdoutSink"
s.meta_as_tags = config.MetaAsTags
s.name = "StdoutSink"
s.meta_as_tags = config.MetaAsTags
return nil
}
@ -26,9 +26,9 @@ func (s *StdoutSink) Write(point lp.CCMetric) error {
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", t.Key, t.Value))
}
if s.meta_as_tags {
for _, m := range point.MetaList() {
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", m.Key, m.Value))
}
for _, m := range point.MetaList() {
tagsstr = append(tagsstr, fmt.Sprintf("%s=%s", m.Key, m.Value))
}
}
for _, f := range point.FieldList() {
switch f.Value.(type) {