mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-04-19 03:05:55 +02:00
Merge branch 'develop' into gangliaSink
This commit is contained in:
commit
863c291d9f
6
.github/ci-collectors.json
vendored
Normal file
6
.github/ci-collectors.json
vendored
Normal file
@ -0,0 +1,6 @@
|
||||
{
|
||||
"tempstat": {},
|
||||
"diskstat": {},
|
||||
"memstat": {},
|
||||
"cpustat": {}
|
||||
}
|
56
.github/ci-config.json
vendored
56
.github/ci-config.json
vendored
@ -1,52 +1,8 @@
|
||||
{
|
||||
"sink": {
|
||||
"user": "testuser",
|
||||
"password": "testpass",
|
||||
"host": "127.0.0.1",
|
||||
"port": "9090",
|
||||
"database": "testdb",
|
||||
"organization": "testorg",
|
||||
"type": "stdout"
|
||||
},
|
||||
"interval": 3,
|
||||
"duration": 1,
|
||||
"collectors": [
|
||||
"tempstat",
|
||||
"loadavg",
|
||||
"memstat",
|
||||
"netstat",
|
||||
"ibstat",
|
||||
"lustrestat",
|
||||
"cpustat",
|
||||
"topprocs",
|
||||
"nvidia",
|
||||
"diskstat",
|
||||
"ipmistat",
|
||||
"gpfs",
|
||||
"cpufreq",
|
||||
"cpufreq_cpuinfo"
|
||||
],
|
||||
"default_tags": {
|
||||
"cluster": "testcluster"
|
||||
},
|
||||
"receiver": {
|
||||
"type": "none"
|
||||
},
|
||||
"collect_config": {
|
||||
"topprocs": {
|
||||
"num_procs": 2
|
||||
},
|
||||
"tempstat": {
|
||||
"tag_override": {
|
||||
"hwmon0": {
|
||||
"type": "socket",
|
||||
"type-id": "0"
|
||||
},
|
||||
"hwmon1": {
|
||||
"type": "socket",
|
||||
"type-id": "1"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"sinks": ".github/ci-sinks.json",
|
||||
"collectors" : ".github/ci-collectors.json",
|
||||
"receivers" : ".github/ci-receivers.json",
|
||||
"router" : ".github/ci-router.json",
|
||||
"interval": 5,
|
||||
"duration": 1
|
||||
}
|
1
.github/ci-receivers.json
vendored
Normal file
1
.github/ci-receivers.json
vendored
Normal file
@ -0,0 +1 @@
|
||||
[]
|
37
.github/ci-router.json
vendored
Normal file
37
.github/ci-router.json
vendored
Normal file
@ -0,0 +1,37 @@
|
||||
{
|
||||
"add_tags": [
|
||||
{
|
||||
"key": "cluster",
|
||||
"value": "testcluster",
|
||||
"if": "*"
|
||||
},
|
||||
{
|
||||
"key": "test",
|
||||
"value": "testing",
|
||||
"if": "name == 'temp_package_id_0'"
|
||||
}
|
||||
],
|
||||
"delete_tags": [
|
||||
{
|
||||
"key": "unit",
|
||||
"value": "*",
|
||||
"if": "*"
|
||||
}
|
||||
],
|
||||
"interval_aggregates": [
|
||||
{
|
||||
"name": "temp_cores_avg",
|
||||
"function": "avg(values)",
|
||||
"if": "match('temp_core_%d+', metric.Name())",
|
||||
"tags": {
|
||||
"type": "node"
|
||||
},
|
||||
"meta": {
|
||||
"group": "<copy>",
|
||||
"unit": "<copy>",
|
||||
"source": "MetricAggregator"
|
||||
}
|
||||
}
|
||||
],
|
||||
"interval_timestamp": true
|
||||
}
|
6
.github/ci-sinks.json
vendored
Normal file
6
.github/ci-sinks.json
vendored
Normal file
@ -0,0 +1,6 @@
|
||||
[
|
||||
{
|
||||
"type" : "stdout",
|
||||
"meta_as_tags" : true
|
||||
}
|
||||
]
|
40
.github/workflows/rpmbuild.yml
vendored
40
.github/workflows/rpmbuild.yml
vendored
@ -2,7 +2,7 @@ name: Run RPM Build
|
||||
on: push
|
||||
|
||||
jobs:
|
||||
build:
|
||||
build-centos8:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
@ -21,3 +21,41 @@ jobs:
|
||||
with:
|
||||
name: cc-metric-collector SRPM CentOS8
|
||||
path: ${{ steps.rpm.outputs.source_rpm_path }}
|
||||
build-centos-latest:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: TomTheBear/rpmbuild@centos_latest
|
||||
id: rpm
|
||||
name: Build RPM package on CentOS 'Latest'
|
||||
with:
|
||||
spec_file: "./scripts/cc-metric-collector.spec"
|
||||
- name: Save RPM as artifact
|
||||
uses: actions/upload-artifact@v1.0.0
|
||||
with:
|
||||
name: cc-metric-collector RPM CentOS 'Latest'
|
||||
path: ${{ steps.rpm.outputs.rpm_dir_path }}
|
||||
- name: Save SRPM as artifact
|
||||
uses: actions/upload-artifact@v1.0.0
|
||||
with:
|
||||
name: cc-metric-collector SRPM CentOS 'Latest'
|
||||
path: ${{ steps.rpm.outputs.source_rpm_path }}
|
||||
build-alma-8_5:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: TomTheBear/rpmbuild@alma8.5
|
||||
id: rpm
|
||||
name: Build RPM package on AlmaLinux 8.5
|
||||
with:
|
||||
spec_file: "./scripts/cc-metric-collector.spec"
|
||||
- name: Save RPM as artifact
|
||||
uses: actions/upload-artifact@v1.0.0
|
||||
with:
|
||||
name: cc-metric-collector RPM AlmaLinux 8.5
|
||||
path: ${{ steps.rpm.outputs.rpm_dir_path }}
|
||||
- name: Save SRPM as artifact
|
||||
uses: actions/upload-artifact@v1.0.0
|
||||
with:
|
||||
name: cc-metric-collector SRPM AlmaLinux 8.5
|
||||
path: ${{ steps.rpm.outputs.source_rpm_path }}
|
||||
|
@ -34,17 +34,18 @@ var AvailableCollectors = map[string]MetricCollector{
|
||||
"nfsstat": new(NfsCollector),
|
||||
}
|
||||
|
||||
// Metric collector manager data structure
|
||||
type collectorManager struct {
|
||||
collectors []MetricCollector
|
||||
output chan lp.CCMetric // List of all output channels
|
||||
collectors []MetricCollector // List of metric collectors to use
|
||||
output chan lp.CCMetric // Output channels
|
||||
done chan bool // channel to finish / stop metric collector manager
|
||||
ticker mct.MultiChanTicker
|
||||
duration time.Duration
|
||||
wg *sync.WaitGroup
|
||||
config map[string]json.RawMessage
|
||||
ticker mct.MultiChanTicker // periodically ticking once each interval
|
||||
duration time.Duration // duration (for metrics that measure over a given duration)
|
||||
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
|
||||
config map[string]json.RawMessage // json encoded config for collector manager
|
||||
}
|
||||
|
||||
// Metric collector access functions
|
||||
// Metric collector manager access functions
|
||||
type CollectorManager interface {
|
||||
Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error
|
||||
AddOutput(output chan lp.CCMetric)
|
||||
@ -53,9 +54,9 @@ type CollectorManager interface {
|
||||
}
|
||||
|
||||
// Init initializes a new metric collector manager by setting up:
|
||||
// * output channels
|
||||
// * output channel
|
||||
// * done channel
|
||||
// * wait group synchronization (from variable wg)
|
||||
// * wait group synchronization for goroutines (from variable wg)
|
||||
// * ticker (from variable ticker)
|
||||
// * configuration (read from config file in variable collectConfigFile)
|
||||
// Initialization is done for all configured collectors
|
||||
@ -82,20 +83,20 @@ func (cm *collectorManager) Init(ticker mct.MultiChanTicker, duration time.Durat
|
||||
}
|
||||
|
||||
// Initialize configured collectors
|
||||
for k, cfg := range cm.config {
|
||||
if _, found := AvailableCollectors[k]; !found {
|
||||
cclog.ComponentError("CollectorManager", "SKIP unknown collector", k)
|
||||
for collectorName, collectorCfg := range cm.config {
|
||||
if _, found := AvailableCollectors[collectorName]; !found {
|
||||
cclog.ComponentError("CollectorManager", "SKIP unknown collector", collectorName)
|
||||
continue
|
||||
}
|
||||
c := AvailableCollectors[k]
|
||||
collector := AvailableCollectors[collectorName]
|
||||
|
||||
err = c.Init(cfg)
|
||||
err = collector.Init(collectorCfg)
|
||||
if err != nil {
|
||||
cclog.ComponentError("CollectorManager", "Collector", k, "initialization failed:", err.Error())
|
||||
cclog.ComponentError("CollectorManager", "Collector", collectorName, "initialization failed:", err.Error())
|
||||
continue
|
||||
}
|
||||
cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", c.Name())
|
||||
cm.collectors = append(cm.collectors, c)
|
||||
cclog.ComponentDebug("CollectorManager", "ADD COLLECTOR", collector.Name())
|
||||
cm.collectors = append(cm.collectors, collector)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -114,6 +115,7 @@ func (cm *collectorManager) Start() {
|
||||
for _, c := range cm.collectors {
|
||||
c.Close()
|
||||
}
|
||||
close(cm.done)
|
||||
cclog.ComponentDebug("CollectorManager", "DONE")
|
||||
}
|
||||
|
||||
@ -153,11 +155,13 @@ func (cm *collectorManager) AddOutput(output chan lp.CCMetric) {
|
||||
func (cm *collectorManager) Close() {
|
||||
cclog.ComponentDebug("CollectorManager", "CLOSE")
|
||||
cm.done <- true
|
||||
// wait for close of channel cm.done
|
||||
<-cm.done
|
||||
}
|
||||
|
||||
// New creates a new initialized metric collector manager
|
||||
func New(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) (CollectorManager, error) {
|
||||
cm := &collectorManager{}
|
||||
cm := new(collectorManager)
|
||||
err := cm.Init(ticker, duration, wg, collectConfigFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -130,14 +130,21 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) {
|
||||
continue
|
||||
}
|
||||
|
||||
timestampInt, err := strconv.ParseInt(key_value["_t_"]+key_value["_tu_"], 10, 64)
|
||||
timestamp := time.UnixMicro(timestampInt)
|
||||
sec, err := strconv.ParseInt(key_value["_t_"], 10, 64)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr,
|
||||
"GpfsCollector.Read(): Failed to convert time stamp '%s': %s\n",
|
||||
key_value["_t_"]+key_value["_tu_"], err.Error())
|
||||
"GpfsCollector.Read(): Failed to convert seconds to int '%s': %v\n",
|
||||
key_value["_t_"], err)
|
||||
continue
|
||||
}
|
||||
msec, err := strconv.ParseInt(key_value["_tu_"], 10, 64)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr,
|
||||
"GpfsCollector.Read(): Failed to convert micro seconds to int '%s': %v\n",
|
||||
key_value["_tu_"], err)
|
||||
continue
|
||||
}
|
||||
timestamp := time.Unix(sec, msec*1000)
|
||||
|
||||
// bytes read
|
||||
bytesRead, err := strconv.ParseInt(key_value["_br_"], 10, 64)
|
||||
|
9
go.mod
9
go.mod
@ -1,6 +1,6 @@
|
||||
module github.com/ClusterCockpit/cc-metric-collector
|
||||
|
||||
go 1.17
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/NVIDIA/go-nvml v0.11.1-0
|
||||
@ -12,14 +12,7 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/deepmap/oapi-codegen v1.8.2 // indirect
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/nats-io/nats-server/v2 v2.7.0 // indirect
|
||||
github.com/nats-io/nkeys v0.3.0 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect
|
||||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
|
||||
google.golang.org/protobuf v1.27.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.3.0 // indirect
|
||||
)
|
||||
|
@ -38,7 +38,7 @@ func initLogger() {
|
||||
|
||||
func Print(e ...interface{}) {
|
||||
initLogger()
|
||||
defaultLog.Print(e)
|
||||
defaultLog.Print(e...)
|
||||
}
|
||||
|
||||
func ComponentPrint(component string, e ...interface{}) {
|
||||
@ -48,7 +48,7 @@ func ComponentPrint(component string, e ...interface{}) {
|
||||
|
||||
func Info(e ...interface{}) {
|
||||
initLogger()
|
||||
infoLog.Print(e)
|
||||
infoLog.Print(e...)
|
||||
}
|
||||
|
||||
func ComponentInfo(component string, e ...interface{}) {
|
||||
@ -58,14 +58,14 @@ func ComponentInfo(component string, e ...interface{}) {
|
||||
|
||||
func Debug(e ...interface{}) {
|
||||
initLogger()
|
||||
if globalDebug == true {
|
||||
debugLog.Print(e)
|
||||
if globalDebug {
|
||||
debugLog.Print(e...)
|
||||
}
|
||||
}
|
||||
|
||||
func ComponentDebug(component string, e ...interface{}) {
|
||||
initLogger()
|
||||
if globalDebug == true && debugLog != nil {
|
||||
if globalDebug && debugLog != nil {
|
||||
//CCComponentPrint(debugLog, component, e)
|
||||
debugLog.Print(fmt.Sprintf("[%s] ", component), e)
|
||||
}
|
||||
|
@ -2,9 +2,10 @@ package ccmetric
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
lp "github.com/influxdata/line-protocol" // MIT license
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
lp "github.com/influxdata/line-protocol" // MIT license
|
||||
)
|
||||
|
||||
// Most functions are derived from github.com/influxdata/line-protocol/metric.go
|
||||
@ -24,6 +25,11 @@ type CCMetric interface {
|
||||
AddMeta(key, value string)
|
||||
MetaList() []*lp.Tag
|
||||
RemoveTag(key string)
|
||||
GetTag(key string) (string, bool)
|
||||
GetMeta(key string) (string, bool)
|
||||
GetField(key string) (interface{}, bool)
|
||||
HasField(key string) bool
|
||||
RemoveField(key string)
|
||||
}
|
||||
|
||||
func (m *ccMetric) Meta() map[string]string {
|
||||
@ -187,6 +193,35 @@ func (m *ccMetric) AddField(key string, value interface{}) {
|
||||
m.fields = append(m.fields, &lp.Field{Key: key, Value: convertField(value)})
|
||||
}
|
||||
|
||||
func (m *ccMetric) GetField(key string) (interface{}, bool) {
|
||||
for _, field := range m.fields {
|
||||
if field.Key == key {
|
||||
return field.Value, true
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
func (m *ccMetric) HasField(key string) bool {
|
||||
for _, field := range m.fields {
|
||||
if field.Key == key {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *ccMetric) RemoveField(key string) {
|
||||
for i, field := range m.fields {
|
||||
if field.Key == key {
|
||||
copy(m.fields[i:], m.fields[i+1:])
|
||||
m.fields[len(m.fields)-1] = nil
|
||||
m.fields = m.fields[:len(m.fields)-1]
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func New(
|
||||
name string,
|
||||
tags map[string]string,
|
||||
|
277
internal/ccTopology/ccTopology.go
Normal file
277
internal/ccTopology/ccTopology.go
Normal file
@ -0,0 +1,277 @@
|
||||
package ccTopology
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
cclogger "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
)
|
||||
|
||||
// intArrayContains scans an array of ints if the value str is present in the array
|
||||
// If the specified value is found, the corresponding array index is returned.
|
||||
// The bool value is used to signal success or failure
|
||||
func intArrayContains(array []int, str int) (int, bool) {
|
||||
for i, a := range array {
|
||||
if a == str {
|
||||
return i, true
|
||||
}
|
||||
}
|
||||
return -1, false
|
||||
}
|
||||
|
||||
// stringArrayContains scans an array of strings if the value str is present in the array
|
||||
// If the specified value is found, the corresponding array index is returned.
|
||||
// The bool value is used to signal success or failure
|
||||
// func stringArrayContains(array []string, str string) (int, bool) {
|
||||
// for i, a := range array {
|
||||
// if a == str {
|
||||
// return i, true
|
||||
// }
|
||||
// }
|
||||
// return -1, false
|
||||
// }
|
||||
|
||||
func SocketList() []int {
|
||||
buffer, err := ioutil.ReadFile("/proc/cpuinfo")
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
return nil
|
||||
}
|
||||
ll := strings.Split(string(buffer), "\n")
|
||||
var packs []int
|
||||
for _, line := range ll {
|
||||
if strings.HasPrefix(line, "physical id") {
|
||||
lv := strings.Fields(line)
|
||||
id, err := strconv.ParseInt(lv[3], 10, 32)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
return packs
|
||||
}
|
||||
_, found := intArrayContains(packs, int(id))
|
||||
if !found {
|
||||
packs = append(packs, int(id))
|
||||
}
|
||||
}
|
||||
}
|
||||
return packs
|
||||
}
|
||||
|
||||
func CpuList() []int {
|
||||
buffer, err := ioutil.ReadFile("/proc/cpuinfo")
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
return nil
|
||||
}
|
||||
ll := strings.Split(string(buffer), "\n")
|
||||
var cpulist []int
|
||||
for _, line := range ll {
|
||||
if strings.HasPrefix(line, "processor") {
|
||||
lv := strings.Fields(line)
|
||||
id, err := strconv.ParseInt(lv[2], 10, 32)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
return cpulist
|
||||
}
|
||||
_, found := intArrayContains(cpulist, int(id))
|
||||
if !found {
|
||||
cpulist = append(cpulist, int(id))
|
||||
}
|
||||
}
|
||||
}
|
||||
return cpulist
|
||||
}
|
||||
|
||||
type CpuEntry struct {
|
||||
Cpuid int
|
||||
SMT int
|
||||
Core int
|
||||
Socket int
|
||||
Numadomain int
|
||||
Die int
|
||||
}
|
||||
|
||||
func CpuData() []CpuEntry {
|
||||
|
||||
fileToInt := func(path string) int {
|
||||
buffer, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
cclogger.ComponentError("ccTopology", "Reading", path, ":", err.Error())
|
||||
return -1
|
||||
}
|
||||
sbuffer := strings.Replace(string(buffer), "\n", "", -1)
|
||||
var id int64
|
||||
//_, err = fmt.Scanf("%d", sbuffer, &id)
|
||||
id, err = strconv.ParseInt(sbuffer, 10, 32)
|
||||
if err != nil {
|
||||
cclogger.ComponentError("ccTopology", "Parsing", path, ":", sbuffer, err.Error())
|
||||
return -1
|
||||
}
|
||||
return int(id)
|
||||
}
|
||||
getCore := func(basepath string) int {
|
||||
return fileToInt(fmt.Sprintf("%s/core_id", basepath))
|
||||
}
|
||||
|
||||
getSocket := func(basepath string) int {
|
||||
return fileToInt(fmt.Sprintf("%s/physical_package_id", basepath))
|
||||
}
|
||||
|
||||
getDie := func(basepath string) int {
|
||||
return fileToInt(fmt.Sprintf("%s/die_id", basepath))
|
||||
}
|
||||
|
||||
getSMT := func(cpuid int, basepath string) int {
|
||||
buffer, err := ioutil.ReadFile(fmt.Sprintf("%s/thread_siblings_list", basepath))
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
}
|
||||
threadlist := make([]int, 0)
|
||||
sbuffer := strings.Replace(string(buffer), "\n", "", -1)
|
||||
for _, x := range strings.Split(sbuffer, ",") {
|
||||
id, err := strconv.ParseInt(x, 10, 32)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
}
|
||||
threadlist = append(threadlist, int(id))
|
||||
}
|
||||
for i, x := range threadlist {
|
||||
if x == cpuid {
|
||||
return i
|
||||
}
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
getNumaDomain := func(basepath string) int {
|
||||
files, err := filepath.Glob(fmt.Sprintf("%s/node*", basepath))
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
}
|
||||
for _, f := range files {
|
||||
finfo, err := os.Lstat(f)
|
||||
if err == nil && (finfo.IsDir() || finfo.Mode()&os.ModeSymlink != 0) {
|
||||
var id int
|
||||
parts := strings.Split(f, "/")
|
||||
_, err = fmt.Scanf("node%d", parts[len(parts)-1], &id)
|
||||
if err == nil {
|
||||
return id
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
clist := make([]CpuEntry, 0)
|
||||
for _, c := range CpuList() {
|
||||
clist = append(clist, CpuEntry{Cpuid: c})
|
||||
}
|
||||
for _, centry := range clist {
|
||||
centry.Socket = -1
|
||||
centry.Numadomain = -1
|
||||
centry.Die = -1
|
||||
centry.Core = -1
|
||||
// Set base directory for topology lookup
|
||||
base := fmt.Sprintf("/sys/devices/system/cpu/cpu%d/topology", centry.Cpuid)
|
||||
|
||||
// Lookup CPU core id
|
||||
centry.Core = getCore(base)
|
||||
|
||||
// Lookup CPU socket id
|
||||
centry.Socket = getSocket(base)
|
||||
|
||||
// Lookup CPU die id
|
||||
centry.Die = getDie(base)
|
||||
|
||||
// Lookup SMT thread id
|
||||
centry.SMT = getSMT(centry.Cpuid, base)
|
||||
|
||||
// Lookup NUMA domain id
|
||||
centry.Numadomain = getNumaDomain(base)
|
||||
|
||||
}
|
||||
return clist
|
||||
}
|
||||
|
||||
type CpuInformation struct {
|
||||
NumHWthreads int
|
||||
SMTWidth int
|
||||
NumSockets int
|
||||
NumDies int
|
||||
NumNumaDomains int
|
||||
}
|
||||
|
||||
func CpuInfo() CpuInformation {
|
||||
var c CpuInformation
|
||||
|
||||
smt := 0
|
||||
numa := 0
|
||||
die := 0
|
||||
socket := 0
|
||||
cdata := CpuData()
|
||||
for _, d := range cdata {
|
||||
if d.SMT > smt {
|
||||
smt = d.SMT
|
||||
}
|
||||
if d.Numadomain > numa {
|
||||
numa = d.Numadomain
|
||||
}
|
||||
if d.Die > die {
|
||||
die = d.Die
|
||||
}
|
||||
if d.Socket > socket {
|
||||
socket = d.Socket
|
||||
}
|
||||
}
|
||||
c.NumNumaDomains = numa + 1
|
||||
c.SMTWidth = smt + 1
|
||||
c.NumDies = die + 1
|
||||
c.NumSockets = socket + 1
|
||||
c.NumHWthreads = len(cdata)
|
||||
return c
|
||||
}
|
||||
|
||||
func GetCpuSocket(cpuid int) int {
|
||||
cdata := CpuData()
|
||||
for _, d := range cdata {
|
||||
if d.Cpuid == cpuid {
|
||||
return d.Socket
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func GetCpuNumaDomain(cpuid int) int {
|
||||
cdata := CpuData()
|
||||
for _, d := range cdata {
|
||||
if d.Cpuid == cpuid {
|
||||
return d.Numadomain
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func GetCpuDie(cpuid int) int {
|
||||
cdata := CpuData()
|
||||
for _, d := range cdata {
|
||||
if d.Cpuid == cpuid {
|
||||
return d.Die
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func GetCpuCore(cpuid int) int {
|
||||
cdata := CpuData()
|
||||
for _, d := range cdata {
|
||||
if d.Cpuid == cpuid {
|
||||
return d.Core
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
291
internal/metricRouter/metricAggregator.go
Normal file
291
internal/metricRouter/metricAggregator.go
Normal file
@ -0,0 +1,291 @@
|
||||
package metricRouter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology"
|
||||
|
||||
"github.com/PaesslerAG/gval"
|
||||
)
|
||||
|
||||
type metricAggregatorIntervalConfig struct {
|
||||
Name string `json:"name"` // Metric name for the new metric
|
||||
Function string `json:"function"` // Function to apply on the metric
|
||||
Condition string `json:"if"` // Condition for applying function
|
||||
Tags map[string]string `json:"tags"` // Tags for the new metric
|
||||
Meta map[string]string `json:"meta"` // Meta information for the new metric
|
||||
gvalCond gval.Evaluable
|
||||
gvalFunc gval.Evaluable
|
||||
}
|
||||
|
||||
type metricAggregator struct {
|
||||
functions []*metricAggregatorIntervalConfig
|
||||
constants map[string]interface{}
|
||||
language gval.Language
|
||||
output chan lp.CCMetric
|
||||
}
|
||||
|
||||
type MetricAggregator interface {
|
||||
AddAggregation(name, function, condition string, tags, meta map[string]string) error
|
||||
DeleteAggregation(name string) error
|
||||
Init(output chan lp.CCMetric) error
|
||||
Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMetric)
|
||||
}
|
||||
|
||||
var metricCacheLanguage = gval.NewLanguage(
|
||||
gval.Base(),
|
||||
gval.Function("sum", sumfunc),
|
||||
gval.Function("min", minfunc),
|
||||
gval.Function("avg", avgfunc),
|
||||
gval.Function("mean", avgfunc),
|
||||
gval.Function("max", maxfunc),
|
||||
gval.Function("len", lenfunc),
|
||||
gval.Function("median", medianfunc),
|
||||
gval.InfixOperator("in", infunc),
|
||||
gval.Function("match", matchfunc),
|
||||
gval.Function("getCpuCore", getCpuCoreFunc),
|
||||
gval.Function("getCpuSocket", getCpuSocketFunc),
|
||||
gval.Function("getCpuNuma", getCpuNumaDomainFunc),
|
||||
gval.Function("getCpuDie", getCpuDieFunc),
|
||||
gval.Function("getSockCpuList", getCpuListOfSocketFunc),
|
||||
gval.Function("getNumaCpuList", getCpuListOfNumaDomainFunc),
|
||||
gval.Function("getDieCpuList", getCpuListOfDieFunc),
|
||||
gval.Function("getCoreCpuList", getCpuListOfCoreFunc),
|
||||
gval.Function("getCpuList", getCpuListOfNode),
|
||||
gval.Function("getCpuListOfType", getCpuListOfType),
|
||||
)
|
||||
|
||||
func (c *metricAggregator) Init(output chan lp.CCMetric) error {
|
||||
c.output = output
|
||||
c.functions = make([]*metricAggregatorIntervalConfig, 0)
|
||||
c.constants = make(map[string]interface{})
|
||||
|
||||
// add constants like hostname, numSockets, ... to constants list
|
||||
// Set hostname
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
// Drop domain part of host name
|
||||
c.constants["hostname"] = strings.SplitN(hostname, `.`, 2)[0]
|
||||
cinfo := topo.CpuInfo()
|
||||
c.constants["numHWThreads"] = cinfo.NumHWthreads
|
||||
c.constants["numSockets"] = cinfo.NumSockets
|
||||
c.constants["numNumaDomains"] = cinfo.NumNumaDomains
|
||||
c.constants["numDies"] = cinfo.NumDies
|
||||
c.constants["smtWidth"] = cinfo.SMTWidth
|
||||
|
||||
c.language = gval.NewLanguage(
|
||||
gval.Base(),
|
||||
metricCacheLanguage,
|
||||
)
|
||||
|
||||
// Example aggregation function
|
||||
// var f metricCacheFunctionConfig
|
||||
// f.Name = "temp_cores_avg"
|
||||
// //f.Condition = `"temp_core_" in name`
|
||||
// f.Condition = `match("temp_core_%d+", metric.Name())`
|
||||
// f.Function = `avg(values)`
|
||||
// f.Tags = map[string]string{"type": "node"}
|
||||
// f.Meta = map[string]string{"group": "IPMI", "unit": "degC", "source": "TempCollector"}
|
||||
// c.functions = append(c.functions, &f)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *metricAggregator) Eval(starttime time.Time, endtime time.Time, metrics []lp.CCMetric) {
|
||||
vars := make(map[string]interface{})
|
||||
for k, v := range c.constants {
|
||||
vars[k] = v
|
||||
}
|
||||
vars["starttime"] = starttime
|
||||
vars["endtime"] = endtime
|
||||
for _, f := range c.functions {
|
||||
cclog.ComponentDebug("MetricCache", "COLLECT", f.Name, "COND", f.Condition)
|
||||
values := make([]float64, 0)
|
||||
matches := make([]lp.CCMetric, 0)
|
||||
for _, m := range metrics {
|
||||
vars["metric"] = m
|
||||
//value, err := gval.Evaluate(f.Condition, vars, c.language)
|
||||
value, err := f.gvalCond.EvalBool(context.Background(), vars)
|
||||
if err != nil {
|
||||
cclog.ComponentError("MetricCache", "COLLECT", f.Name, "COND", f.Condition, ":", err.Error())
|
||||
continue
|
||||
}
|
||||
if value {
|
||||
v, valid := m.GetField("value")
|
||||
if valid {
|
||||
switch x := v.(type) {
|
||||
case float64:
|
||||
values = append(values, x)
|
||||
case float32:
|
||||
case int:
|
||||
case int64:
|
||||
values = append(values, float64(x))
|
||||
case bool:
|
||||
if x {
|
||||
values = append(values, float64(1.0))
|
||||
} else {
|
||||
values = append(values, float64(0.0))
|
||||
}
|
||||
default:
|
||||
cclog.ComponentError("MetricCache", "COLLECT ADD VALUE", v, "FAILED")
|
||||
}
|
||||
}
|
||||
matches = append(matches, m)
|
||||
}
|
||||
}
|
||||
delete(vars, "metric")
|
||||
cclog.ComponentDebug("MetricCache", "EVALUATE", f.Name, "METRICS", len(values), "CALC", f.Function)
|
||||
vars["values"] = values
|
||||
vars["metrics"] = matches
|
||||
if len(values) > 0 {
|
||||
value, err := gval.Evaluate(f.Function, vars, c.language)
|
||||
if err != nil {
|
||||
cclog.ComponentError("MetricCache", "EVALUATE", f.Name, "METRICS", len(values), "CALC", f.Function, ":", err.Error())
|
||||
break
|
||||
}
|
||||
|
||||
copy_tags := func(tags map[string]string, metrics []lp.CCMetric) map[string]string {
|
||||
out := make(map[string]string)
|
||||
for key, value := range tags {
|
||||
switch value {
|
||||
case "<copy>":
|
||||
for _, m := range metrics {
|
||||
v, err := m.GetTag(key)
|
||||
if err {
|
||||
out[key] = v
|
||||
}
|
||||
}
|
||||
default:
|
||||
out[key] = value
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
copy_meta := func(meta map[string]string, metrics []lp.CCMetric) map[string]string {
|
||||
out := make(map[string]string)
|
||||
for key, value := range meta {
|
||||
switch value {
|
||||
case "<copy>":
|
||||
for _, m := range metrics {
|
||||
v, err := m.GetMeta(key)
|
||||
if err {
|
||||
out[key] = v
|
||||
}
|
||||
}
|
||||
default:
|
||||
out[key] = value
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
tags := copy_tags(f.Tags, matches)
|
||||
meta := copy_meta(f.Meta, matches)
|
||||
|
||||
var m lp.CCMetric
|
||||
switch t := value.(type) {
|
||||
case float64:
|
||||
m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
|
||||
case float32:
|
||||
m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
|
||||
case int:
|
||||
m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
|
||||
case int64:
|
||||
m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
|
||||
case string:
|
||||
m, err = lp.New(f.Name, tags, meta, map[string]interface{}{"value": t}, starttime)
|
||||
default:
|
||||
cclog.ComponentError("MetricCache", "Gval returned invalid type", t, "skipping metric", f.Name)
|
||||
}
|
||||
if err != nil {
|
||||
cclog.ComponentError("MetricCache", "Cannot create metric from Gval result", value, ":", err.Error())
|
||||
}
|
||||
cclog.ComponentDebug("MetricCache", "SEND", m)
|
||||
select {
|
||||
case c.output <- m:
|
||||
default:
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *metricAggregator) AddAggregation(name, function, condition string, tags, meta map[string]string) error {
|
||||
// Since "" cannot be used inside of JSON strings, we use '' and replace them here because gval does not like ''
|
||||
// but wants ""
|
||||
newfunc := strings.ReplaceAll(function, "'", "\"")
|
||||
newcond := strings.ReplaceAll(condition, "'", "\"")
|
||||
gvalCond, err := gval.Full(metricCacheLanguage).NewEvaluable(newcond)
|
||||
if err != nil {
|
||||
cclog.ComponentError("MetricAggregator", "Cannot add aggregation, invalid if condition", newcond, ":", err.Error())
|
||||
return err
|
||||
}
|
||||
gvalFunc, err := gval.Full(metricCacheLanguage).NewEvaluable(newfunc)
|
||||
if err != nil {
|
||||
cclog.ComponentError("MetricAggregator", "Cannot add aggregation, invalid function condition", newfunc, ":", err.Error())
|
||||
return err
|
||||
}
|
||||
for _, agg := range c.functions {
|
||||
if agg.Name == name {
|
||||
agg.Name = name
|
||||
agg.Condition = newcond
|
||||
agg.Function = newfunc
|
||||
agg.Tags = tags
|
||||
agg.Meta = meta
|
||||
agg.gvalCond = gvalCond
|
||||
agg.gvalFunc = gvalFunc
|
||||
return nil
|
||||
}
|
||||
}
|
||||
var agg metricAggregatorIntervalConfig
|
||||
agg.Name = name
|
||||
agg.Condition = newcond
|
||||
agg.gvalCond = gvalCond
|
||||
agg.Function = newfunc
|
||||
agg.gvalFunc = gvalFunc
|
||||
agg.Tags = tags
|
||||
agg.Meta = meta
|
||||
c.functions = append(c.functions, &agg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *metricAggregator) DeleteAggregation(name string) error {
|
||||
for i, agg := range c.functions {
|
||||
if agg.Name == name {
|
||||
copy(c.functions[i:], c.functions[i+1:])
|
||||
c.functions[len(c.functions)-1] = nil
|
||||
c.functions = c.functions[:len(c.functions)-1]
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("no aggregation for metric name %s", name)
|
||||
}
|
||||
|
||||
func (c *metricAggregator) AddConstant(name string, value interface{}) {
|
||||
c.constants[name] = value
|
||||
}
|
||||
|
||||
func (c *metricAggregator) DelConstant(name string) {
|
||||
delete(c.constants, name)
|
||||
}
|
||||
|
||||
func (c *metricAggregator) AddFunction(name string, function func(args ...interface{}) (interface{}, error)) {
|
||||
c.language = gval.NewLanguage(c.language, gval.Function(name, function))
|
||||
}
|
||||
|
||||
func NewAggregator(output chan lp.CCMetric) (MetricAggregator, error) {
|
||||
a := new(metricAggregator)
|
||||
err := a.Init(output)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return a, err
|
||||
}
|
376
internal/metricRouter/metricAggregatorFunctions.go
Normal file
376
internal/metricRouter/metricAggregatorFunctions.go
Normal file
@ -0,0 +1,376 @@
|
||||
package metricRouter
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
topo "github.com/ClusterCockpit/cc-metric-collector/internal/ccTopology"
|
||||
)
|
||||
|
||||
/*
|
||||
* Arithmetic functions on value arrays
|
||||
*/
|
||||
|
||||
// Sum up values
|
||||
func sumfunc(args ...interface{}) (interface{}, error) {
|
||||
s := 0.0
|
||||
values, ok := args[0].([]float64)
|
||||
if ok {
|
||||
cclog.ComponentDebug("MetricCache", "SUM FUNC START")
|
||||
for _, x := range values {
|
||||
s += x
|
||||
}
|
||||
cclog.ComponentDebug("MetricCache", "SUM FUNC END", s)
|
||||
} else {
|
||||
cclog.ComponentDebug("MetricCache", "SUM FUNC CAST FAILED")
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Get the minimum value
|
||||
func minfunc(args ...interface{}) (interface{}, error) {
|
||||
var err error = nil
|
||||
switch values := args[0].(type) {
|
||||
case []float64:
|
||||
var s float64 = math.MaxFloat64
|
||||
for _, x := range values {
|
||||
if x < s {
|
||||
s = x
|
||||
}
|
||||
}
|
||||
return s, nil
|
||||
case []float32:
|
||||
var s float32 = math.MaxFloat32
|
||||
for _, x := range values {
|
||||
if x < s {
|
||||
s = x
|
||||
}
|
||||
}
|
||||
return s, nil
|
||||
case []int:
|
||||
var s int = math.MaxInt
|
||||
for _, x := range values {
|
||||
if x < s {
|
||||
s = x
|
||||
}
|
||||
}
|
||||
return s, nil
|
||||
case []int64:
|
||||
var s int64 = math.MaxInt64
|
||||
for _, x := range values {
|
||||
if x < s {
|
||||
s = x
|
||||
}
|
||||
}
|
||||
return s, nil
|
||||
case []int32:
|
||||
var s int32 = math.MaxInt32
|
||||
for _, x := range values {
|
||||
if x < s {
|
||||
s = x
|
||||
}
|
||||
}
|
||||
return s, nil
|
||||
default:
|
||||
err = errors.New("function 'min' only on list of values (float64, float32, int, int32, int64)")
|
||||
}
|
||||
|
||||
return 0.0, err
|
||||
}
|
||||
|
||||
// Get the average or mean value
|
||||
func avgfunc(args ...interface{}) (interface{}, error) {
|
||||
switch values := args[0].(type) {
|
||||
case []float64:
|
||||
var s float64 = 0
|
||||
for _, x := range values {
|
||||
s += x
|
||||
}
|
||||
return s / float64(len(values)), nil
|
||||
case []float32:
|
||||
var s float32 = 0
|
||||
for _, x := range values {
|
||||
s += x
|
||||
}
|
||||
return s / float32(len(values)), nil
|
||||
case []int:
|
||||
var s int = 0
|
||||
for _, x := range values {
|
||||
s += x
|
||||
}
|
||||
return s / len(values), nil
|
||||
case []int64:
|
||||
var s int64 = 0
|
||||
for _, x := range values {
|
||||
s += x
|
||||
}
|
||||
return s / int64(len(values)), nil
|
||||
}
|
||||
return 0.0, nil
|
||||
}
|
||||
|
||||
// Get the maximum value
|
||||
func maxfunc(args ...interface{}) (interface{}, error) {
|
||||
s := 0.0
|
||||
values, ok := args[0].([]float64)
|
||||
if ok {
|
||||
for _, x := range values {
|
||||
if x > s {
|
||||
s = x
|
||||
}
|
||||
}
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Get the median value
|
||||
func medianfunc(args ...interface{}) (interface{}, error) {
|
||||
switch values := args[0].(type) {
|
||||
case []float64:
|
||||
sort.Float64s(values)
|
||||
return values[len(values)/2], nil
|
||||
// case []float32:
|
||||
// sort.Float64s(values)
|
||||
// return values[len(values)/2], nil
|
||||
case []int:
|
||||
sort.Ints(values)
|
||||
return values[len(values)/2], nil
|
||||
|
||||
// case []int64:
|
||||
// sort.Ints(values)
|
||||
// return values[len(values)/2], nil
|
||||
// case []int32:
|
||||
// sort.Ints(values)
|
||||
// return values[len(values)/2], nil
|
||||
}
|
||||
return 0.0, errors.New("function 'median()' only on lists of type float64 and int")
|
||||
}
|
||||
|
||||
/*
|
||||
* Get number of values in list. Returns always an int
|
||||
*/
|
||||
|
||||
func lenfunc(args ...interface{}) (interface{}, error) {
|
||||
var err error = nil
|
||||
var length int = 0
|
||||
switch values := args[0].(type) {
|
||||
case []float64:
|
||||
length = len(values)
|
||||
case []float32:
|
||||
length = len(values)
|
||||
case []int:
|
||||
length = len(values)
|
||||
case []int64:
|
||||
length = len(values)
|
||||
case []int32:
|
||||
length = len(values)
|
||||
case float64:
|
||||
err = errors.New("function 'len' can only be applied on arrays and strings")
|
||||
case float32:
|
||||
err = errors.New("function 'len' can only be applied on arrays and strings")
|
||||
case int:
|
||||
err = errors.New("function 'len' can only be applied on arrays and strings")
|
||||
case int64:
|
||||
err = errors.New("function 'len' can only be applied on arrays and strings")
|
||||
case string:
|
||||
length = len(values)
|
||||
}
|
||||
return length, err
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if a values is in a list
|
||||
* In constrast to most of the other functions, this one is an infix operator for
|
||||
* - substring matching: `"abc" in "abcdef"` -> true
|
||||
* - substring matching with int casting: `3 in "abd3"` -> true
|
||||
* - search for an int in an int list: `3 in getCpuList()` -> true (if you have more than 4 CPU hardware threads)
|
||||
*/
|
||||
|
||||
func infunc(a interface{}, b interface{}) (interface{}, error) {
|
||||
switch match := a.(type) {
|
||||
case string:
|
||||
switch total := b.(type) {
|
||||
case string:
|
||||
return strings.Contains(total, match), nil
|
||||
}
|
||||
case int:
|
||||
switch total := b.(type) {
|
||||
case []int:
|
||||
for _, x := range total {
|
||||
if x == match {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
case string:
|
||||
smatch := fmt.Sprintf("%d", match)
|
||||
return strings.Contains(total, smatch), nil
|
||||
}
|
||||
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
/*
|
||||
* Regex matching of strings (metric name, tag keys, tag values, meta keys, meta values)
|
||||
* Since we cannot use \ inside JSON strings without escaping, we use % instead for the
|
||||
* format keys \d = %d, \w = %d, ... Not sure how to fix this
|
||||
*/
|
||||
|
||||
func matchfunc(args ...interface{}) (interface{}, error) {
|
||||
switch match := args[0].(type) {
|
||||
case string:
|
||||
switch total := args[1].(type) {
|
||||
case string:
|
||||
smatch := strings.Replace(match, "%", "\\", -1)
|
||||
regex, err := regexp.Compile(smatch)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
s := regex.Find([]byte(total))
|
||||
return s != nil, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
/*
|
||||
* System topology getter functions
|
||||
*/
|
||||
|
||||
// for a given cpuid, it returns the core id
|
||||
func getCpuCoreFunc(args ...interface{}) (interface{}, error) {
|
||||
switch cpuid := args[0].(type) {
|
||||
case int:
|
||||
return topo.GetCpuCore(cpuid), nil
|
||||
}
|
||||
return -1, errors.New("function 'getCpuCore' accepts only an 'int' cpuid")
|
||||
}
|
||||
|
||||
// for a given cpuid, it returns the socket id
|
||||
func getCpuSocketFunc(args ...interface{}) (interface{}, error) {
|
||||
switch cpuid := args[0].(type) {
|
||||
case int:
|
||||
return topo.GetCpuSocket(cpuid), nil
|
||||
}
|
||||
return -1, errors.New("function 'getCpuCore' accepts only an 'int' cpuid")
|
||||
}
|
||||
|
||||
// for a given cpuid, it returns the id of the NUMA node
|
||||
func getCpuNumaDomainFunc(args ...interface{}) (interface{}, error) {
|
||||
switch cpuid := args[0].(type) {
|
||||
case int:
|
||||
return topo.GetCpuNumaDomain(cpuid), nil
|
||||
}
|
||||
return -1, errors.New("function 'getCpuNuma' accepts only an 'int' cpuid")
|
||||
}
|
||||
|
||||
// for a given cpuid, it returns the id of the CPU die
|
||||
func getCpuDieFunc(args ...interface{}) (interface{}, error) {
|
||||
switch cpuid := args[0].(type) {
|
||||
case int:
|
||||
return topo.GetCpuDie(cpuid), nil
|
||||
}
|
||||
return -1, errors.New("function 'getCpuDie' accepts only an 'int' cpuid")
|
||||
}
|
||||
|
||||
// for a given core id, it returns the list of cpuids
|
||||
func getCpuListOfCoreFunc(args ...interface{}) (interface{}, error) {
|
||||
cpulist := make([]int, 0)
|
||||
switch in := args[0].(type) {
|
||||
case int:
|
||||
for _, c := range topo.CpuData() {
|
||||
if c.Core == in {
|
||||
cpulist = append(cpulist, c.Cpuid)
|
||||
}
|
||||
}
|
||||
}
|
||||
return cpulist, nil
|
||||
}
|
||||
|
||||
// for a given socket id, it returns the list of cpuids
|
||||
func getCpuListOfSocketFunc(args ...interface{}) (interface{}, error) {
|
||||
cpulist := make([]int, 0)
|
||||
switch in := args[0].(type) {
|
||||
case int:
|
||||
for _, c := range topo.CpuData() {
|
||||
if c.Socket == in {
|
||||
cpulist = append(cpulist, c.Cpuid)
|
||||
}
|
||||
}
|
||||
}
|
||||
return cpulist, nil
|
||||
}
|
||||
|
||||
// for a given id of a NUMA domain, it returns the list of cpuids
|
||||
func getCpuListOfNumaDomainFunc(args ...interface{}) (interface{}, error) {
|
||||
cpulist := make([]int, 0)
|
||||
switch in := args[0].(type) {
|
||||
case int:
|
||||
for _, c := range topo.CpuData() {
|
||||
if c.Numadomain == in {
|
||||
cpulist = append(cpulist, c.Cpuid)
|
||||
}
|
||||
}
|
||||
}
|
||||
return cpulist, nil
|
||||
}
|
||||
|
||||
// for a given CPU die id, it returns the list of cpuids
|
||||
func getCpuListOfDieFunc(args ...interface{}) (interface{}, error) {
|
||||
cpulist := make([]int, 0)
|
||||
switch in := args[0].(type) {
|
||||
case int:
|
||||
for _, c := range topo.CpuData() {
|
||||
if c.Die == in {
|
||||
cpulist = append(cpulist, c.Cpuid)
|
||||
}
|
||||
}
|
||||
}
|
||||
return cpulist, nil
|
||||
}
|
||||
|
||||
// wrapper function to get a list of all cpuids of the node
|
||||
func getCpuListOfNode(args ...interface{}) (interface{}, error) {
|
||||
return topo.CpuList(), nil
|
||||
}
|
||||
|
||||
// helper function to get the cpuid list for a CCMetric type tag set (type and type-id)
|
||||
// since there is no access to the metric data in the function, is should be called like
|
||||
// `getCpuListOfType()`
|
||||
func getCpuListOfType(args ...interface{}) (interface{}, error) {
|
||||
cpulist := make([]int, 0)
|
||||
switch typ := args[0].(type) {
|
||||
case string:
|
||||
switch typ {
|
||||
case "node":
|
||||
return topo.CpuList(), nil
|
||||
case "socket":
|
||||
return getCpuListOfSocketFunc(args[1])
|
||||
case "numadomain":
|
||||
return getCpuListOfNumaDomainFunc(args[1])
|
||||
case "core":
|
||||
return getCpuListOfCoreFunc(args[1])
|
||||
case "cpu":
|
||||
var cpu int
|
||||
|
||||
switch id := args[1].(type) {
|
||||
case string:
|
||||
_, err := fmt.Scanf(id, "%d", &cpu)
|
||||
if err == nil {
|
||||
cpulist = append(cpulist, cpu)
|
||||
}
|
||||
case int:
|
||||
cpulist = append(cpulist, id)
|
||||
case int64:
|
||||
cpulist = append(cpulist, int(id))
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
return cpulist, errors.New("no valid args type and type-id")
|
||||
}
|
176
internal/metricRouter/metricCache.go
Normal file
176
internal/metricRouter/metricCache.go
Normal file
@ -0,0 +1,176 @@
|
||||
package metricRouter
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
|
||||
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
mct "github.com/ClusterCockpit/cc-metric-collector/internal/multiChanTicker"
|
||||
)
|
||||
|
||||
type metricCachePeriod struct {
|
||||
startstamp time.Time
|
||||
stopstamp time.Time
|
||||
numMetrics int
|
||||
sizeMetrics int
|
||||
metrics []lp.CCMetric
|
||||
}
|
||||
|
||||
// Metric cache data structure
|
||||
type metricCache struct {
|
||||
numPeriods int
|
||||
curPeriod int
|
||||
intervals []*metricCachePeriod
|
||||
wg *sync.WaitGroup
|
||||
ticker mct.MultiChanTicker
|
||||
tickchan chan time.Time
|
||||
done chan bool
|
||||
output chan lp.CCMetric
|
||||
aggEngine MetricAggregator
|
||||
}
|
||||
|
||||
type MetricCache interface {
|
||||
Init(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error
|
||||
Start()
|
||||
Add(metric lp.CCMetric)
|
||||
GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric)
|
||||
AddAggregation(name, function, condition string, tags, meta map[string]string) error
|
||||
DeleteAggregation(name string) error
|
||||
Close()
|
||||
}
|
||||
|
||||
func (c *metricCache) Init(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) error {
|
||||
var err error = nil
|
||||
c.done = make(chan bool)
|
||||
c.wg = wg
|
||||
c.ticker = ticker
|
||||
c.numPeriods = numPeriods
|
||||
c.output = output
|
||||
c.intervals = make([]*metricCachePeriod, 0)
|
||||
for i := 0; i < c.numPeriods+1; i++ {
|
||||
p := new(metricCachePeriod)
|
||||
p.numMetrics = 0
|
||||
p.sizeMetrics = 0
|
||||
p.metrics = make([]lp.CCMetric, 0)
|
||||
c.intervals = append(c.intervals, p)
|
||||
}
|
||||
|
||||
// Create a new aggregation engine. No separate goroutine at the moment
|
||||
// The code is executed by the MetricCache goroutine
|
||||
c.aggEngine, err = NewAggregator(c.output)
|
||||
if err != nil {
|
||||
cclog.ComponentError("MetricCache", "Cannot create aggregator")
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start starts the metric cache
|
||||
func (c *metricCache) Start() {
|
||||
|
||||
c.tickchan = make(chan time.Time)
|
||||
c.ticker.AddChannel(c.tickchan)
|
||||
// Router cache is done
|
||||
done := func() {
|
||||
cclog.ComponentDebug("MetricCache", "DONE")
|
||||
close(c.done)
|
||||
}
|
||||
|
||||
// Rotate cache interval
|
||||
rotate := func(timestamp time.Time) int {
|
||||
oldPeriod := c.curPeriod
|
||||
c.curPeriod = oldPeriod + 1
|
||||
if c.curPeriod >= c.numPeriods {
|
||||
c.curPeriod = 0
|
||||
}
|
||||
c.intervals[oldPeriod].numMetrics = 0
|
||||
c.intervals[oldPeriod].stopstamp = timestamp
|
||||
c.intervals[c.curPeriod].startstamp = timestamp
|
||||
c.intervals[c.curPeriod].stopstamp = timestamp
|
||||
return oldPeriod
|
||||
}
|
||||
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-c.done:
|
||||
done()
|
||||
return
|
||||
case tick := <-c.tickchan:
|
||||
old := rotate(tick)
|
||||
// Get the last period and evaluate aggregation metrics
|
||||
starttime, endtime, metrics := c.GetPeriod(old)
|
||||
if len(metrics) > 0 {
|
||||
c.aggEngine.Eval(starttime, endtime, metrics)
|
||||
} else {
|
||||
// This message is also printed in the first interval after startup
|
||||
cclog.ComponentDebug("MetricCache", "EMPTY INTERVAL?")
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
cclog.ComponentDebug("MetricCache", "START")
|
||||
}
|
||||
|
||||
// Add a metric to the cache. The interval is defined by the global timer (rotate() in Start())
|
||||
// The intervals list is used as round-robin buffer and the metric list grows dynamically and
|
||||
// to avoid reallocations
|
||||
func (c *metricCache) Add(metric lp.CCMetric) {
|
||||
if c.curPeriod >= 0 && c.curPeriod < c.numPeriods {
|
||||
p := c.intervals[c.curPeriod]
|
||||
if p.numMetrics < p.sizeMetrics {
|
||||
p.metrics[p.numMetrics] = metric
|
||||
p.numMetrics = p.numMetrics + 1
|
||||
p.stopstamp = metric.Time()
|
||||
} else {
|
||||
p.metrics = append(p.metrics, metric)
|
||||
p.numMetrics = p.numMetrics + 1
|
||||
p.sizeMetrics = p.sizeMetrics + 1
|
||||
p.stopstamp = metric.Time()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *metricCache) AddAggregation(name, function, condition string, tags, meta map[string]string) error {
|
||||
return c.aggEngine.AddAggregation(name, function, condition, tags, meta)
|
||||
}
|
||||
|
||||
func (c *metricCache) DeleteAggregation(name string) error {
|
||||
return c.aggEngine.DeleteAggregation(name)
|
||||
}
|
||||
|
||||
// Get all metrics of a interval. The index is the difference to the current interval, so index=0
|
||||
// is the current one, index=1 the last interval and so on. Returns and empty array if a wrong index
|
||||
// is given (negative index, index larger than configured number of total intervals, ...)
|
||||
func (c *metricCache) GetPeriod(index int) (time.Time, time.Time, []lp.CCMetric) {
|
||||
if index >= 0 && index < c.numPeriods {
|
||||
pindex := c.curPeriod - index
|
||||
if pindex < 0 {
|
||||
pindex = c.numPeriods - pindex
|
||||
}
|
||||
if pindex >= 0 && pindex < c.numPeriods {
|
||||
return c.intervals[pindex].startstamp, c.intervals[pindex].stopstamp, c.intervals[pindex].metrics
|
||||
}
|
||||
}
|
||||
return time.Now(), time.Now(), make([]lp.CCMetric, 0)
|
||||
}
|
||||
|
||||
// Close finishes / stops the metric cache
|
||||
func (c *metricCache) Close() {
|
||||
cclog.ComponentDebug("MetricCache", "CLOSE")
|
||||
c.done <- true
|
||||
}
|
||||
|
||||
func NewCache(output chan lp.CCMetric, ticker mct.MultiChanTicker, wg *sync.WaitGroup, numPeriods int) (MetricCache, error) {
|
||||
c := new(metricCache)
|
||||
err := c.Init(output, ticker, wg, numPeriods)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c, err
|
||||
}
|
@ -3,6 +3,7 @@ package metricRouter
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -24,19 +25,26 @@ type metricRouterTagConfig struct {
|
||||
type metricRouterConfig struct {
|
||||
AddTags []metricRouterTagConfig `json:"add_tags"` // List of tags that are added when the condition is met
|
||||
DelTags []metricRouterTagConfig `json:"delete_tags"` // List of tags that are removed when the condition is met
|
||||
IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically?
|
||||
IntervalAgg []metricAggregatorIntervalConfig `json:"interval_aggregates"` // List of aggregation function processed at the end of an interval
|
||||
IntervalStamp bool `json:"interval_timestamp"` // Update timestamp periodically by ticker each interval?
|
||||
NumCacheIntervals int `json:"num_cache_intervals"` // Number of intervals of cached metrics for evaluation
|
||||
}
|
||||
|
||||
// Metric router data structure
|
||||
type metricRouter struct {
|
||||
hostname string // Hostname used in tags
|
||||
coll_input chan lp.CCMetric // Input channel from CollectorManager
|
||||
recv_input chan lp.CCMetric // Input channel from ReceiveManager
|
||||
cache_input chan lp.CCMetric // Input channel from MetricCache
|
||||
outputs []chan lp.CCMetric // List of all output channels
|
||||
done chan bool // channel to finish / stop metric router
|
||||
wg *sync.WaitGroup
|
||||
timestamp time.Time // timestamp
|
||||
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
|
||||
timestamp time.Time // timestamp periodically updated by ticker each interval
|
||||
timerdone chan bool // channel to finish / stop timestamp updater
|
||||
ticker mct.MultiChanTicker
|
||||
config metricRouterConfig
|
||||
ticker mct.MultiChanTicker // periodically ticking once each interval
|
||||
config metricRouterConfig // json encoded config for metric router
|
||||
cache MetricCache // pointer to MetricCache
|
||||
cachewg sync.WaitGroup // wait group for MetricCache
|
||||
}
|
||||
|
||||
// MetricRouter access functions
|
||||
@ -58,8 +66,20 @@ type MetricRouter interface {
|
||||
func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, routerConfigFile string) error {
|
||||
r.outputs = make([]chan lp.CCMetric, 0)
|
||||
r.done = make(chan bool)
|
||||
r.cache_input = make(chan lp.CCMetric)
|
||||
r.wg = wg
|
||||
r.ticker = ticker
|
||||
|
||||
// Set hostname
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
// Drop domain part of host name
|
||||
r.hostname = strings.SplitN(hostname, `.`, 2)[0]
|
||||
|
||||
// Read metric router config file
|
||||
configFile, err := os.Open(routerConfigFile)
|
||||
if err != nil {
|
||||
cclog.ComponentError("MetricRouter", err.Error())
|
||||
@ -72,6 +92,18 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout
|
||||
cclog.ComponentError("MetricRouter", err.Error())
|
||||
return err
|
||||
}
|
||||
numIntervals := r.config.NumCacheIntervals
|
||||
if numIntervals <= 0 {
|
||||
numIntervals = 1
|
||||
}
|
||||
r.cache, err = NewCache(r.cache_input, r.ticker, &r.cachewg, numIntervals)
|
||||
if err != nil {
|
||||
cclog.ComponentError("MetricRouter", "MetricCache initialization failed:", err.Error())
|
||||
return err
|
||||
}
|
||||
for _, agg := range r.config.IntervalAgg {
|
||||
r.cache.AddAggregation(agg.Name, agg.Function, agg.Condition, agg.Tags, agg.Meta)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -87,6 +119,7 @@ func (r *metricRouter) StartTimer() {
|
||||
for {
|
||||
select {
|
||||
case <-r.timerdone:
|
||||
close(r.timerdone)
|
||||
cclog.ComponentDebug("MetricRouter", "TIMER DONE")
|
||||
return
|
||||
case t := <-m:
|
||||
@ -97,11 +130,11 @@ func (r *metricRouter) StartTimer() {
|
||||
cclog.ComponentDebug("MetricRouter", "TIMER START")
|
||||
}
|
||||
|
||||
// EvalCondition evaluates condition Cond for metric data from point
|
||||
func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, error) {
|
||||
expression, err := govaluate.NewEvaluableExpression(Cond)
|
||||
// EvalCondition evaluates condition cond for metric data from point
|
||||
func (r *metricRouter) EvalCondition(cond string, point lp.CCMetric) (bool, error) {
|
||||
expression, err := govaluate.NewEvaluableExpression(cond)
|
||||
if err != nil {
|
||||
cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error())
|
||||
cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error())
|
||||
return false, err
|
||||
}
|
||||
|
||||
@ -122,7 +155,7 @@ func (r *metricRouter) EvalCondition(Cond string, point lp.CCMetric) (bool, erro
|
||||
// evaluate condition
|
||||
result, err := expression.Evaluate(params)
|
||||
if err != nil {
|
||||
cclog.ComponentDebug("MetricRouter", Cond, " = ", err.Error())
|
||||
cclog.ComponentDebug("MetricRouter", cond, " = ", err.Error())
|
||||
return false, err
|
||||
}
|
||||
return bool(result.(bool)), err
|
||||
@ -172,13 +205,21 @@ func (r *metricRouter) DoDelTags(point lp.CCMetric) {
|
||||
|
||||
// Start starts the metric router
|
||||
func (r *metricRouter) Start() {
|
||||
|
||||
// start timer if configured
|
||||
r.timestamp = time.Now()
|
||||
if r.config.IntervalStamp {
|
||||
r.StartTimer()
|
||||
}
|
||||
|
||||
// Router manager is done
|
||||
done := func() {
|
||||
close(r.done)
|
||||
cclog.ComponentDebug("MetricRouter", "DONE")
|
||||
}
|
||||
|
||||
// Forward takes a received metric, adds or deletes tags
|
||||
// and forwards it to the output channels
|
||||
forward := func(point lp.CCMetric) {
|
||||
cclog.ComponentDebug("MetricRouter", "FORWARD", point)
|
||||
r.DoAddTags(point)
|
||||
@ -188,36 +229,50 @@ func (r *metricRouter) Start() {
|
||||
}
|
||||
}
|
||||
|
||||
// Start Metric Cache
|
||||
r.cache.Start()
|
||||
|
||||
r.wg.Add(1)
|
||||
go func() {
|
||||
defer r.wg.Done()
|
||||
for {
|
||||
// RouterLoop:
|
||||
select {
|
||||
case <-r.done:
|
||||
done()
|
||||
return
|
||||
|
||||
case p := <-r.coll_input:
|
||||
// receive from metric collector
|
||||
p.AddTag("hostname", r.hostname)
|
||||
if r.config.IntervalStamp {
|
||||
p.SetTime(r.timestamp)
|
||||
}
|
||||
forward(p)
|
||||
r.cache.Add(p)
|
||||
|
||||
case p := <-r.recv_input:
|
||||
// receive from receive manager
|
||||
if r.config.IntervalStamp {
|
||||
p.SetTime(r.timestamp)
|
||||
}
|
||||
forward(p)
|
||||
|
||||
case p := <-r.cache_input:
|
||||
// receive from metric collector
|
||||
p.AddTag("hostname", r.hostname)
|
||||
forward(p)
|
||||
}
|
||||
}
|
||||
}()
|
||||
cclog.ComponentDebug("MetricRouter", "STARTED")
|
||||
}
|
||||
|
||||
// AddInput adds a input channel to the metric router
|
||||
// AddCollectorInput adds a channel between metric collector and metric router
|
||||
func (r *metricRouter) AddCollectorInput(input chan lp.CCMetric) {
|
||||
r.coll_input = input
|
||||
}
|
||||
|
||||
// AddReceiverInput adds a channel between metric receiver and metric router
|
||||
func (r *metricRouter) AddReceiverInput(input chan lp.CCMetric) {
|
||||
r.recv_input = input
|
||||
}
|
||||
@ -231,10 +286,16 @@ func (r *metricRouter) AddOutput(output chan lp.CCMetric) {
|
||||
func (r *metricRouter) Close() {
|
||||
cclog.ComponentDebug("MetricRouter", "CLOSE")
|
||||
r.done <- true
|
||||
// wait for close of channel r.done
|
||||
<-r.done
|
||||
if r.config.IntervalStamp {
|
||||
cclog.ComponentDebug("MetricRouter", "TIMER CLOSE")
|
||||
r.timerdone <- true
|
||||
// wait for close of channel r.timerdone
|
||||
<-r.timerdone
|
||||
}
|
||||
r.cache.Close()
|
||||
r.cachewg.Wait()
|
||||
}
|
||||
|
||||
// New creates a new initialized metric router
|
||||
|
@ -23,6 +23,7 @@ func (t *multiChanTicker) Init(duration time.Duration) {
|
||||
t.done = make(chan bool)
|
||||
go func() {
|
||||
done := func() {
|
||||
close(t.done)
|
||||
cclog.ComponentDebug("MultiChanTicker", "DONE")
|
||||
}
|
||||
for {
|
||||
@ -52,6 +53,8 @@ func (t *multiChanTicker) AddChannel(channel chan time.Time) {
|
||||
func (t *multiChanTicker) Close() {
|
||||
cclog.ComponentDebug("MultiChanTicker", "CLOSE")
|
||||
t.done <- true
|
||||
// wait for close of channel t.done
|
||||
<-t.done
|
||||
}
|
||||
|
||||
func NewTicker(duration time.Duration) MultiChanTicker {
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"flag"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/ClusterCockpit/cc-metric-collector/collectors"
|
||||
@ -45,7 +44,6 @@ func LoadCentralConfiguration(file string, config *CentralConfigFile) error {
|
||||
}
|
||||
|
||||
type RuntimeConfig struct {
|
||||
Hostname string
|
||||
Interval time.Duration
|
||||
Duration time.Duration
|
||||
CliArgs map[string]string
|
||||
@ -213,13 +211,21 @@ func mainFunc() int {
|
||||
}
|
||||
rcfg.Duration = time.Duration(rcfg.ConfigFile.Duration) * time.Second
|
||||
|
||||
rcfg.Hostname, err = os.Hostname()
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
if len(rcfg.ConfigFile.RouterConfigFile) == 0 {
|
||||
cclog.Error("Metric router configuration file must be set")
|
||||
return 1
|
||||
}
|
||||
// Drop domain part of host name
|
||||
rcfg.Hostname = strings.SplitN(rcfg.Hostname, `.`, 2)[0]
|
||||
|
||||
if len(rcfg.ConfigFile.SinkConfigFile) == 0 {
|
||||
cclog.Error("Sink configuration file must be set")
|
||||
return 1
|
||||
}
|
||||
|
||||
if len(rcfg.ConfigFile.CollectorConfigFile) == 0 {
|
||||
cclog.Error("Metric collector configuration file must be set")
|
||||
return 1
|
||||
}
|
||||
|
||||
// err = CreatePidfile(rcfg.CliArgs["pidfile"])
|
||||
|
||||
// Set log file
|
||||
@ -231,16 +237,13 @@ func mainFunc() int {
|
||||
rcfg.MultiChanTicker = mct.NewTicker(rcfg.Interval)
|
||||
|
||||
// Create new metric router
|
||||
if len(rcfg.ConfigFile.RouterConfigFile) > 0 {
|
||||
rcfg.MetricRouter, err = mr.New(rcfg.MultiChanTicker, &rcfg.Sync, rcfg.ConfigFile.RouterConfigFile)
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
}
|
||||
|
||||
// Create new sink
|
||||
if len(rcfg.ConfigFile.SinkConfigFile) > 0 {
|
||||
rcfg.SinkManager, err = sinks.New(&rcfg.Sync, rcfg.ConfigFile.SinkConfigFile)
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
@ -251,10 +254,8 @@ func mainFunc() int {
|
||||
RouterToSinksChannel := make(chan lp.CCMetric, 200)
|
||||
rcfg.SinkManager.AddInput(RouterToSinksChannel)
|
||||
rcfg.MetricRouter.AddOutput(RouterToSinksChannel)
|
||||
}
|
||||
|
||||
// Create new collector manager
|
||||
if len(rcfg.ConfigFile.CollectorConfigFile) > 0 {
|
||||
rcfg.CollectManager, err = collectors.New(rcfg.MultiChanTicker, rcfg.Duration, &rcfg.Sync, rcfg.ConfigFile.CollectorConfigFile)
|
||||
if err != nil {
|
||||
cclog.Error(err.Error())
|
||||
@ -265,7 +266,6 @@ func mainFunc() int {
|
||||
CollectToRouterChannel := make(chan lp.CCMetric, 200)
|
||||
rcfg.CollectManager.AddOutput(CollectToRouterChannel)
|
||||
rcfg.MetricRouter.AddCollectorInput(CollectToRouterChannel)
|
||||
}
|
||||
|
||||
// Create new receive manager
|
||||
if len(rcfg.ConfigFile.ReceiverConfigFile) > 0 {
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
)
|
||||
|
||||
// Map of all available sinks
|
||||
var AvailableSinks = map[string]Sink{
|
||||
"influxdb": new(InfluxSink),
|
||||
"stdout": new(StdoutSink),
|
||||
@ -17,14 +18,16 @@ var AvailableSinks = map[string]Sink{
|
||||
"ganglia": new(GangliaSink),
|
||||
}
|
||||
|
||||
// Metric collector manager data structure
|
||||
type sinkManager struct {
|
||||
input chan lp.CCMetric
|
||||
outputs []Sink
|
||||
done chan bool
|
||||
wg *sync.WaitGroup
|
||||
config []sinkConfig
|
||||
input chan lp.CCMetric // input channel
|
||||
outputs []Sink // List of sinks to use
|
||||
done chan bool // channel to finish / stop metric sink manager
|
||||
wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector
|
||||
config []sinkConfig // json encoded config for sink manager
|
||||
}
|
||||
|
||||
// Sink manager access functions
|
||||
type SinkManager interface {
|
||||
Init(wg *sync.WaitGroup, sinkConfigFile string) error
|
||||
AddInput(input chan lp.CCMetric)
|
||||
@ -39,6 +42,8 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
|
||||
sm.done = make(chan bool)
|
||||
sm.wg = wg
|
||||
sm.config = make([]sinkConfig, 0)
|
||||
|
||||
// Read sink config file
|
||||
if len(sinkConfigFile) > 0 {
|
||||
configFile, err := os.Open(sinkConfigFile)
|
||||
if err != nil {
|
||||
@ -64,27 +69,37 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
|
||||
}
|
||||
|
||||
func (sm *sinkManager) Start() {
|
||||
sm.wg.Add(1)
|
||||
batchcount := 20
|
||||
|
||||
sm.wg.Add(1)
|
||||
go func() {
|
||||
defer sm.wg.Done()
|
||||
|
||||
// Sink manager is done
|
||||
done := func() {
|
||||
for _, s := range sm.outputs {
|
||||
s.Flush()
|
||||
s.Close()
|
||||
}
|
||||
sm.wg.Done()
|
||||
|
||||
close(sm.done)
|
||||
cclog.ComponentDebug("SinkManager", "DONE")
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-sm.done:
|
||||
done()
|
||||
return
|
||||
|
||||
case p := <-sm.input:
|
||||
// Send received metric to all outputs
|
||||
cclog.ComponentDebug("SinkManager", "WRITE", p)
|
||||
for _, s := range sm.outputs {
|
||||
s.Write(p)
|
||||
}
|
||||
|
||||
// Flush all outputs
|
||||
if batchcount == 0 {
|
||||
cclog.ComponentDebug("SinkManager", "FLUSH")
|
||||
for _, s := range sm.outputs {
|
||||
@ -96,9 +111,12 @@ func (sm *sinkManager) Start() {
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Sink manager is started
|
||||
cclog.ComponentDebug("SinkManager", "STARTED")
|
||||
}
|
||||
|
||||
// AddInput adds the input channel to the sink manager
|
||||
func (sm *sinkManager) AddInput(input chan lp.CCMetric) {
|
||||
sm.input = input
|
||||
}
|
||||
@ -129,11 +147,15 @@ func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close finishes / stops the sink manager
|
||||
func (sm *sinkManager) Close() {
|
||||
cclog.ComponentDebug("SinkManager", "CLOSE")
|
||||
sm.done <- true
|
||||
// wait for close of channel sm.done
|
||||
<-sm.done
|
||||
}
|
||||
|
||||
// New creates a new initialized sink manager
|
||||
func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error) {
|
||||
sm := &sinkManager{}
|
||||
err := sm.Init(wg, sinkConfigFile)
|
||||
|
Loading…
x
Reference in New Issue
Block a user