mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-04-05 13:15:55 +02:00
Merge branch 'develop' into derived-values-nfsiostat
This commit is contained in:
commit
b1e0ec66af
20
.github/workflows/Release.yml
vendored
20
.github/workflows/Release.yml
vendored
@ -73,21 +73,21 @@ jobs:
|
||||
NEW_SRPM=${OLD_SRPM/el8/alma8}
|
||||
mv "${OLD_RPM}" "${NEW_RPM}"
|
||||
mv "${OLD_SRPM}" "${NEW_SRPM}"
|
||||
echo "EL8_SRPM=${NEW_SRPM}" >> $GITHUB_OUTPUT
|
||||
echo "EL8_RPM=${NEW_RPM}" >> $GITHUB_OUTPUT
|
||||
echo "SRPM=${NEW_SRPM}" >> $GITHUB_OUTPUT
|
||||
echo "RPM=${NEW_RPM}" >> $GITHUB_OUTPUT
|
||||
|
||||
# See: https://github.com/actions/upload-artifact
|
||||
- name: Save RPM as artifact
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: cc-metric-collector RPM for AlmaLinux 8
|
||||
path: ${{ steps.rpmrename.outputs.EL8_RPM }}
|
||||
path: ${{ steps.rpmrename.outputs.RPM }}
|
||||
overwrite: true
|
||||
- name: Save SRPM as artifact
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: cc-metric-collector SRPM for AlmaLinux 8
|
||||
path: ${{ steps.rpmrename.outputs.EL8_SRPM }}
|
||||
path: ${{ steps.rpmrename.outputs.SRPM }}
|
||||
overwrite: true
|
||||
|
||||
#
|
||||
@ -152,21 +152,21 @@ jobs:
|
||||
NEW_SRPM=${OLD_SRPM/el9/alma9}
|
||||
mv "${OLD_RPM}" "${NEW_RPM}"
|
||||
mv "${OLD_SRPM}" "${NEW_SRPM}"
|
||||
echo "EL9_SRPM=${NEW_SRPM}" >> $GITHUB_OUTPUT
|
||||
echo "EL9_RPM=${NEW_RPM}" >> $GITHUB_OUTPUT
|
||||
echo "SRPM=${NEW_SRPM}" >> $GITHUB_OUTPUT
|
||||
echo "RPM=${NEW_RPM}" >> $GITHUB_OUTPUT
|
||||
|
||||
# See: https://github.com/actions/upload-artifact
|
||||
- name: Save RPM as artifact
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: cc-metric-collector RPM for AlmaLinux 9
|
||||
path: ${{ steps.rpmrename.outputs.EL9_RPM }}
|
||||
path: ${{ steps.rpmrename.outputs.RPM }}
|
||||
overwrite: true
|
||||
- name: Save SRPM as artifact
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: cc-metric-collector SRPM for AlmaLinux 9
|
||||
path: ${{ steps.rpmrename.outputs.EL9_SRPM }}
|
||||
path: ${{ steps.rpmrename.outputs.SRPM }}
|
||||
overwrite: true
|
||||
|
||||
#
|
||||
@ -235,6 +235,10 @@ jobs:
|
||||
# See: https://catalog.redhat.com/software/containers/ubi8/ubi/5c359854d70cc534b3a3784e?container-tabs=gti
|
||||
container: redhat/ubi9
|
||||
# The job outputs link to the outputs of the 'rpmbuild' step
|
||||
# The job outputs link to the outputs of the 'rpmbuild' step
|
||||
outputs:
|
||||
rpm : ${{steps.rpmbuild.outputs.RPM}}
|
||||
srpm : ${{steps.rpmbuild.outputs.SRPM}}
|
||||
steps:
|
||||
|
||||
# Use dnf to install development packages
|
||||
|
@ -9,22 +9,20 @@ import (
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
||||
lp "github.com/ClusterCockpit/cc-lib/ccMessage"
|
||||
)
|
||||
|
||||
// "log"
|
||||
|
||||
const MOUNTFILE = `/proc/self/mounts`
|
||||
|
||||
type DiskstatCollectorConfig struct {
|
||||
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
|
||||
ExcludeMounts []string `json:"exclude_mounts,omitempty"`
|
||||
}
|
||||
|
||||
type DiskstatCollector struct {
|
||||
metricCollector
|
||||
//matches map[string]int
|
||||
config IOstatCollectorConfig
|
||||
//devices map[string]IOstatCollectorEntry
|
||||
config DiskstatCollectorConfig
|
||||
allowedMetrics map[string]bool
|
||||
}
|
||||
|
||||
func (m *DiskstatCollector) Init(config json.RawMessage) error {
|
||||
@ -33,12 +31,21 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error {
|
||||
m.meta = map[string]string{"source": m.name, "group": "Disk"}
|
||||
m.setup()
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &m.config)
|
||||
if err != nil {
|
||||
if err := json.Unmarshal(config, &m.config); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
file, err := os.Open(string(MOUNTFILE))
|
||||
m.allowedMetrics = map[string]bool{
|
||||
"disk_total": true,
|
||||
"disk_free": true,
|
||||
"part_max_used": true,
|
||||
}
|
||||
for _, excl := range m.config.ExcludeMetrics {
|
||||
if _, ok := m.allowedMetrics[excl]; ok {
|
||||
m.allowedMetrics[excl] = false
|
||||
}
|
||||
}
|
||||
file, err := os.Open(MOUNTFILE)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return err
|
||||
@ -53,7 +60,7 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMessag
|
||||
return
|
||||
}
|
||||
|
||||
file, err := os.Open(string(MOUNTFILE))
|
||||
file, err := os.Open(MOUNTFILE)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return
|
||||
@ -62,6 +69,7 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMessag
|
||||
|
||||
part_max_used := uint64(0)
|
||||
scanner := bufio.NewScanner(file)
|
||||
mountLoop:
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if len(line) == 0 {
|
||||
@ -77,13 +85,17 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMessag
|
||||
if strings.Contains(linefields[1], "boot") {
|
||||
continue
|
||||
}
|
||||
path := strings.Replace(linefields[1], `\040`, " ", -1)
|
||||
stat := syscall.Statfs_t{
|
||||
Blocks: 0,
|
||||
Bsize: 0,
|
||||
Bfree: 0,
|
||||
|
||||
mountPath := strings.Replace(linefields[1], `\040`, " ", -1)
|
||||
|
||||
for _, excl := range m.config.ExcludeMounts {
|
||||
if strings.Contains(mountPath, excl) {
|
||||
continue mountLoop
|
||||
}
|
||||
}
|
||||
err := syscall.Statfs(path, &stat)
|
||||
|
||||
stat := syscall.Statfs_t{}
|
||||
err := syscall.Statfs(mountPath, &stat)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@ -92,16 +104,20 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMessag
|
||||
}
|
||||
tags := map[string]string{"type": "node", "device": linefields[0]}
|
||||
total := (stat.Blocks * uint64(stat.Bsize)) / uint64(1000000000)
|
||||
y, err := lp.NewMessage("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now())
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "GBytes")
|
||||
output <- y
|
||||
if m.allowedMetrics["disk_total"] {
|
||||
y, err := lp.NewMessage("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now())
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "GBytes")
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
free := (stat.Bfree * uint64(stat.Bsize)) / uint64(1000000000)
|
||||
y, err = lp.NewMessage("disk_free", tags, m.meta, map[string]interface{}{"value": free}, time.Now())
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "GBytes")
|
||||
output <- y
|
||||
if m.allowedMetrics["disk_free"] {
|
||||
y, err := lp.NewMessage("disk_free", tags, m.meta, map[string]interface{}{"value": free}, time.Now())
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "GBytes")
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
if total > 0 {
|
||||
perc := (100 * (total - free)) / total
|
||||
@ -110,10 +126,12 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMessag
|
||||
}
|
||||
}
|
||||
}
|
||||
y, err := lp.NewMessage("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now())
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "percent")
|
||||
output <- y
|
||||
if m.allowedMetrics["part_max_used"] {
|
||||
y, err := lp.NewMessage("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now())
|
||||
if err == nil {
|
||||
y.AddMeta("unit", "percent")
|
||||
output <- y
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6,10 +6,13 @@
|
||||
"exclude_metrics": [
|
||||
"disk_total"
|
||||
],
|
||||
"exclude_mounts": [
|
||||
"slurm-tmpfs"
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
The `diskstat` collector reads data from `/proc/self/mounts` and outputs a handful **node** metrics. If a metric is not required, it can be excluded from forwarding it to the sink.
|
||||
The `diskstat` collector reads data from `/proc/self/mounts` and outputs a handful **node** metrics. If a metric is not required, it can be excluded from forwarding it to the sink. Additionally, any mount point containing one of the strings specified in `exclude_mounts` will be skipped during metric collection.
|
||||
|
||||
Metrics per device (with `device` tag):
|
||||
* `disk_total` (unit `GBytes`)
|
||||
|
@ -10,15 +10,16 @@ import (
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
||||
lp "github.com/ClusterCockpit/cc-lib/ccMessage"
|
||||
)
|
||||
|
||||
const NETSTATFILE = "/proc/net/dev"
|
||||
|
||||
type NetstatCollectorConfig struct {
|
||||
IncludeDevices []string `json:"include_devices"`
|
||||
SendAbsoluteValues bool `json:"send_abs_values"`
|
||||
SendDerivedValues bool `json:"send_derived_values"`
|
||||
IncludeDevices []string `json:"include_devices"`
|
||||
SendAbsoluteValues bool `json:"send_abs_values"`
|
||||
SendDerivedValues bool `json:"send_derived_values"`
|
||||
InterfaceAliases map[string][]string `json:"interface_aliases,omitempty"`
|
||||
}
|
||||
|
||||
type NetstatCollectorMetric struct {
|
||||
@ -32,9 +33,26 @@ type NetstatCollectorMetric struct {
|
||||
|
||||
type NetstatCollector struct {
|
||||
metricCollector
|
||||
config NetstatCollectorConfig
|
||||
matches map[string][]NetstatCollectorMetric
|
||||
lastTimestamp time.Time
|
||||
config NetstatCollectorConfig
|
||||
aliasToCanonical map[string]string
|
||||
matches map[string][]NetstatCollectorMetric
|
||||
lastTimestamp time.Time
|
||||
}
|
||||
|
||||
func (m *NetstatCollector) buildAliasMapping() {
|
||||
m.aliasToCanonical = make(map[string]string)
|
||||
for canon, aliases := range m.config.InterfaceAliases {
|
||||
for _, alias := range aliases {
|
||||
m.aliasToCanonical[alias] = canon
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getCanonicalName(raw string, aliasToCanonical map[string]string) string {
|
||||
if canon, ok := aliasToCanonical[raw]; ok {
|
||||
return canon
|
||||
}
|
||||
return raw
|
||||
}
|
||||
|
||||
func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||
@ -77,6 +95,8 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||
}
|
||||
}
|
||||
|
||||
m.buildAliasMapping()
|
||||
|
||||
// Check access to net statistic file
|
||||
file, err := os.Open(NETSTATFILE)
|
||||
if err != nil {
|
||||
@ -97,18 +117,20 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||
// Split line into fields
|
||||
f := strings.Fields(l)
|
||||
|
||||
// Get net device entry
|
||||
dev := strings.Trim(f[0], ": ")
|
||||
// Get raw and canonical names
|
||||
raw := strings.Trim(f[0], ": ")
|
||||
canonical := getCanonicalName(raw, m.aliasToCanonical)
|
||||
|
||||
// Check if device is a included device
|
||||
if _, ok := stringArrayContains(m.config.IncludeDevices, dev); ok {
|
||||
tags := map[string]string{"stype": "network", "stype-id": dev, "type": "node"}
|
||||
if _, ok := stringArrayContains(m.config.IncludeDevices, canonical); ok {
|
||||
// Tag will contain original device name (raw).
|
||||
tags := map[string]string{"stype": "network", "stype-id": raw, "type": "node"}
|
||||
meta_unit_byte := map[string]string{"source": m.name, "group": "Network", "unit": "bytes"}
|
||||
meta_unit_byte_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "bytes/sec"}
|
||||
meta_unit_pkts := map[string]string{"source": m.name, "group": "Network", "unit": "packets"}
|
||||
meta_unit_pkts_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "packets/sec"}
|
||||
|
||||
m.matches[dev] = []NetstatCollectorMetric{
|
||||
m.matches[canonical] = []NetstatCollectorMetric{
|
||||
{
|
||||
name: "net_bytes_in",
|
||||
index: fieldReceiveBytes,
|
||||
@ -143,7 +165,6 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if len(m.matches) == 0 {
|
||||
@ -164,7 +185,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMessage
|
||||
// Save current timestamp
|
||||
m.lastTimestamp = now
|
||||
|
||||
file, err := os.Open(string(NETSTATFILE))
|
||||
file, err := os.Open(NETSTATFILE)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, err.Error())
|
||||
return
|
||||
@ -183,11 +204,12 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMessage
|
||||
// Split line into fields
|
||||
f := strings.Fields(l)
|
||||
|
||||
// Get net device entry
|
||||
dev := strings.Trim(f[0], ":")
|
||||
// Get raw and canonical names
|
||||
raw := strings.Trim(f[0], ":")
|
||||
canonical := getCanonicalName(raw, m.aliasToCanonical)
|
||||
|
||||
// Check if device is a included device
|
||||
if devmetrics, ok := m.matches[dev]; ok {
|
||||
if devmetrics, ok := m.matches[canonical]; ok {
|
||||
for i := range devmetrics {
|
||||
metric := &devmetrics[i]
|
||||
|
||||
|
@ -4,14 +4,19 @@
|
||||
```json
|
||||
"netstat": {
|
||||
"include_devices": [
|
||||
"eth0"
|
||||
"eth0",
|
||||
"eno1"
|
||||
],
|
||||
"send_abs_values" : true,
|
||||
"send_derived_values" : true
|
||||
"send_abs_values": true,
|
||||
"send_derived_values": true,
|
||||
"interface_aliases": {
|
||||
"eno1": ["eno1np0", "eno1_alt"],
|
||||
"eth0": ["eth0_alias"]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `netstat` collector reads data from `/proc/net/dev` and outputs a handful **node** metrics. With the `include_devices` list you can specify which network devices should be measured. **Note**: Most other collectors use an _exclude_ list instead of an include list.
|
||||
The `netstat` collector reads data from `/proc/net/dev` and outputs a handful **node** metrics. With the `include_devices` list you can specify which network devices should be measured. **Note**: Most other collectors use an _exclude_ list instead of an include list. Optionally, you can define an interface_aliases mapping. For each canonical device (as listed in include_devices), you may provide an array of aliases that may be reported by the system. When an alias is detected, it is preferred for matching, while the output tag stype-id always shows the actual system-reported name.
|
||||
|
||||
Metrics:
|
||||
* `net_bytes_in` (`unit=bytes`)
|
||||
@ -23,5 +28,4 @@ Metrics:
|
||||
* `net_pkts_in_bw` (`unit=packets/sec` if `send_derived_values == true`)
|
||||
* `net_pkts_out_bw` (`unit=packets/sec` if `send_derived_values == true`)
|
||||
|
||||
The device name is added as tag `stype=network,stype-id=<device>`.
|
||||
|
||||
The device name is added as tag `stype=network,stype-id=<device>`.
|
@ -1,11 +1,9 @@
|
||||
# Running average power limit (RAPL) metric collector
|
||||
## `rapl` collector
|
||||
|
||||
This collector reads running average power limit (RAPL) monitoring attributes to compute average power consumption metrics. See <https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes>.
|
||||
|
||||
The Likwid metric collector provides similar functionality.
|
||||
|
||||
## Configuration
|
||||
|
||||
```json
|
||||
"rapl": {
|
||||
"exclude_device_by_id": ["0:1", "0:2"],
|
||||
@ -13,6 +11,5 @@ The Likwid metric collector provides similar functionality.
|
||||
}
|
||||
```
|
||||
|
||||
## Metrics
|
||||
|
||||
Metrics:
|
||||
* `rapl_average_power`: average power consumption in Watt. The average is computed over the entire runtime from the last measurement to the current measurement
|
||||
|
@ -40,7 +40,7 @@ type metricRouterConfig struct {
|
||||
NormalizeUnits bool `json:"normalize_units"` // Check unit meta flag and normalize it using cc-units
|
||||
ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the metrics
|
||||
// dropMetrics map[string]bool // Internal map for O(1) lookup
|
||||
MessageProcessor json.RawMessage `json:"process_message,omitempty"`
|
||||
MessageProcessor json.RawMessage `json:"process_messages,omitempty"`
|
||||
}
|
||||
|
||||
// Metric router data structure
|
||||
|
@ -22,27 +22,27 @@ type messageProcessorTagConfig struct {
|
||||
}
|
||||
|
||||
type messageProcessorConfig struct {
|
||||
StageOrder []string `json:"stage_order,omitempty"` // List of stages to execute them in the specified order and to skip unrequired ones
|
||||
DropMessages []string `json:"drop_messages,omitempty"` // List of metric names to drop. For fine-grained dropping use drop_messages_if
|
||||
DropMessagesIf []string `json:"drop_messages_if,omitempty"` // List of evaluatable terms to drop messages
|
||||
RenameMessages map[string]string `json:"rename_messages,omitempty"` // Map of metric names to rename
|
||||
RenameMessagesIf map[string]string `json:"rename_messages_if,omitempty"` // Map to rename metric name based on a condition
|
||||
NormalizeUnits bool `json:"normalize_units,omitempty"` // Check unit meta flag and normalize it using cc-units
|
||||
ChangeUnitPrefix map[string]string `json:"change_unit_prefix,omitempty"` // Add prefix that should be applied to the messages
|
||||
AddTagsIf []messageProcessorTagConfig `json:"add_tags_if"` // List of tags that are added when the condition is met
|
||||
DelTagsIf []messageProcessorTagConfig `json:"delete_tags_if"` // List of tags that are removed when the condition is met
|
||||
AddMetaIf []messageProcessorTagConfig `json:"add_meta_if"` // List of meta infos that are added when the condition is met
|
||||
DelMetaIf []messageProcessorTagConfig `json:"delete_meta_if"` // List of meta infos that are removed when the condition is met
|
||||
AddFieldIf []messageProcessorTagConfig `json:"add_field_if"` // List of fields that are added when the condition is met
|
||||
DelFieldIf []messageProcessorTagConfig `json:"delete_field_if"` // List of fields that are removed when the condition is met
|
||||
DropByType []string `json:"drop_by_message_type"` // List of message types that should be dropped
|
||||
MoveTagToMeta []messageProcessorTagConfig `json:"move_tag_to_meta_if"`
|
||||
MoveTagToField []messageProcessorTagConfig `json:"move_tag_to_field_if"`
|
||||
MoveMetaToTag []messageProcessorTagConfig `json:"move_meta_to_tag_if"`
|
||||
MoveMetaToField []messageProcessorTagConfig `json:"move_meta_to_field_if"`
|
||||
MoveFieldToTag []messageProcessorTagConfig `json:"move_field_to_tag_if"`
|
||||
MoveFieldToMeta []messageProcessorTagConfig `json:"move_field_to_meta_if"`
|
||||
AddBaseEnv map[string]interface{} `json:"add_base_env"`
|
||||
StageOrder []string `json:"stage_order,omitempty"` // List of stages to execute them in the specified order and to skip unrequired ones
|
||||
DropMessages []string `json:"drop_messages,omitempty"` // List of metric names to drop. For fine-grained dropping use drop_messages_if
|
||||
DropMessagesIf []string `json:"drop_messages_if,omitempty"` // List of evaluatable terms to drop messages
|
||||
RenameMessages map[string]string `json:"rename_messages,omitempty"` // Map of metric names to rename
|
||||
RenameMessagesIf map[string]string `json:"rename_messages_if,omitempty"` // Map to rename metric name based on a condition
|
||||
NormalizeUnits bool `json:"normalize_units,omitempty"` // Check unit meta flag and normalize it using cc-units
|
||||
ChangeUnitPrefix map[string]string `json:"change_unit_prefix,omitempty"` // Add prefix that should be applied to the messages
|
||||
AddTagsIf []messageProcessorTagConfig `json:"add_tags_if,omitempty"` // List of tags that are added when the condition is met
|
||||
DelTagsIf []messageProcessorTagConfig `json:"delete_tags_if,omitempty"` // List of tags that are removed when the condition is met
|
||||
AddMetaIf []messageProcessorTagConfig `json:"add_meta_if,omitempty"` // List of meta infos that are added when the condition is met
|
||||
DelMetaIf []messageProcessorTagConfig `json:"delete_meta_if,omitempty"` // List of meta infos that are removed when the condition is met
|
||||
AddFieldIf []messageProcessorTagConfig `json:"add_field_if,omitempty"` // List of fields that are added when the condition is met
|
||||
DelFieldIf []messageProcessorTagConfig `json:"delete_field_if,omitempty"` // List of fields that are removed when the condition is met
|
||||
DropByType []string `json:"drop_by_message_type,omitempty"` // List of message types that should be dropped
|
||||
MoveTagToMeta []messageProcessorTagConfig `json:"move_tag_to_meta_if,omitempty"`
|
||||
MoveTagToField []messageProcessorTagConfig `json:"move_tag_to_field_if,omitempty"`
|
||||
MoveMetaToTag []messageProcessorTagConfig `json:"move_meta_to_tag_if,omitempty"`
|
||||
MoveMetaToField []messageProcessorTagConfig `json:"move_meta_to_field_if,omitempty"`
|
||||
MoveFieldToTag []messageProcessorTagConfig `json:"move_field_to_tag_if,omitempty"`
|
||||
MoveFieldToMeta []messageProcessorTagConfig `json:"move_field_to_meta_if,omitempty"`
|
||||
AddBaseEnv map[string]interface{} `json:"add_base_env,omitempty"`
|
||||
}
|
||||
|
||||
type messageProcessor struct {
|
||||
|
@ -13,7 +13,6 @@ import (
|
||||
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
|
||||
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
|
||||
)
|
||||
|
||||
const HTTP_RECEIVER_PORT = "8080"
|
||||
@ -151,80 +150,22 @@ func (r *HttpReceiver) ServerHttp(w http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
}
|
||||
if r.sink != nil {
|
||||
d := influx.NewDecoder(req.Body)
|
||||
for d.Next() {
|
||||
|
||||
// Decode measurement name
|
||||
measurement, err := d.Measurement()
|
||||
buf := make([]byte, 0, req.ContentLength)
|
||||
len, err := req.Body.Read(buf)
|
||||
if err == nil && len > 0 {
|
||||
messages, err := lp.FromBytes(buf)
|
||||
if err != nil {
|
||||
msg := "ServerHttp: Failed to decode measurement: " + err.Error()
|
||||
msg := "ServerHttp: Failed to decode messages: " + err.Error()
|
||||
cclog.ComponentError(r.name, msg)
|
||||
http.Error(w, msg, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Decode tags
|
||||
tags := make(map[string]string)
|
||||
for {
|
||||
key, value, err := d.NextTag()
|
||||
if err != nil {
|
||||
msg := "ServerHttp: Failed to decode tag: " + err.Error()
|
||||
cclog.ComponentError(r.name, msg)
|
||||
http.Error(w, msg, http.StatusInternalServerError)
|
||||
return
|
||||
for _, y := range messages {
|
||||
m, err := r.mp.ProcessMessage(y)
|
||||
if err == nil && m != nil {
|
||||
r.sink <- m
|
||||
}
|
||||
if key == nil {
|
||||
break
|
||||
}
|
||||
tags[string(key)] = string(value)
|
||||
}
|
||||
|
||||
// Decode fields
|
||||
fields := make(map[string]interface{})
|
||||
for {
|
||||
key, value, err := d.NextField()
|
||||
if err != nil {
|
||||
msg := "ServerHttp: Failed to decode field: " + err.Error()
|
||||
cclog.ComponentError(r.name, msg)
|
||||
http.Error(w, msg, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if key == nil {
|
||||
break
|
||||
}
|
||||
fields[string(key)] = value.Interface()
|
||||
}
|
||||
|
||||
// Decode time stamp
|
||||
t, err := d.Time(influx.Nanosecond, time.Time{})
|
||||
if err != nil {
|
||||
msg := "ServerHttp: Failed to decode time stamp: " + err.Error()
|
||||
cclog.ComponentError(r.name, msg)
|
||||
http.Error(w, msg, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
y, _ := lp.NewMessage(
|
||||
string(measurement),
|
||||
tags,
|
||||
nil,
|
||||
fields,
|
||||
t,
|
||||
)
|
||||
|
||||
m, err := r.mp.ProcessMessage(y)
|
||||
if err == nil && m != nil {
|
||||
r.sink <- m
|
||||
}
|
||||
|
||||
}
|
||||
// Check for IO errors
|
||||
err := d.Err()
|
||||
if err != nil {
|
||||
msg := "ServerHttp: Failed to decode: " + err.Error()
|
||||
cclog.ComponentError(r.name, msg)
|
||||
http.Error(w, msg, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5,20 +5,18 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
|
||||
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
|
||||
nats "github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
type NatsReceiverConfig struct {
|
||||
defaultReceiverConfig
|
||||
Addr string `json:"address"`
|
||||
Port string `json:"port"`
|
||||
Subject string `json:"subject"`
|
||||
Addr string `json:"address"`
|
||||
Port string `json:"port"`
|
||||
Subject string `json:"subject"`
|
||||
User string `json:"user,omitempty"`
|
||||
Password string `json:"password,omitempty"`
|
||||
NkeyFile string `json:"nkey_file,omitempty"`
|
||||
@ -42,67 +40,15 @@ func (r *NatsReceiver) Start() {
|
||||
func (r *NatsReceiver) _NatsReceive(m *nats.Msg) {
|
||||
|
||||
if r.sink != nil {
|
||||
d := influx.NewDecoderWithBytes(m.Data)
|
||||
for d.Next() {
|
||||
|
||||
// Decode measurement name
|
||||
measurement, err := d.Measurement()
|
||||
if err != nil {
|
||||
msg := "_NatsReceive: Failed to decode measurement: " + err.Error()
|
||||
cclog.ComponentError(r.name, msg)
|
||||
return
|
||||
}
|
||||
|
||||
// Decode tags
|
||||
tags := make(map[string]string)
|
||||
for {
|
||||
key, value, err := d.NextTag()
|
||||
if err != nil {
|
||||
msg := "_NatsReceive: Failed to decode tag: " + err.Error()
|
||||
cclog.ComponentError(r.name, msg)
|
||||
return
|
||||
}
|
||||
if key == nil {
|
||||
break
|
||||
}
|
||||
tags[string(key)] = string(value)
|
||||
}
|
||||
|
||||
// Decode fields
|
||||
fields := make(map[string]interface{})
|
||||
for {
|
||||
key, value, err := d.NextField()
|
||||
if err != nil {
|
||||
msg := "_NatsReceive: Failed to decode field: " + err.Error()
|
||||
cclog.ComponentError(r.name, msg)
|
||||
return
|
||||
}
|
||||
if key == nil {
|
||||
break
|
||||
}
|
||||
fields[string(key)] = value.Interface()
|
||||
}
|
||||
|
||||
// Decode time stamp
|
||||
t, err := d.Time(influx.Nanosecond, time.Time{})
|
||||
if err != nil {
|
||||
msg := "_NatsReceive: Failed to decode time: " + err.Error()
|
||||
cclog.ComponentError(r.name, msg)
|
||||
return
|
||||
}
|
||||
|
||||
y, err := lp.NewMessage(
|
||||
string(measurement),
|
||||
tags,
|
||||
nil,
|
||||
fields,
|
||||
t,
|
||||
)
|
||||
if err == nil {
|
||||
m, err := r.mp.ProcessMessage(y)
|
||||
if err == nil && m != nil && r.sink != nil {
|
||||
r.sink <- m
|
||||
}
|
||||
messages, err := lp.FromBytes(m.Data)
|
||||
if err != nil {
|
||||
msg := "_NatsReceive: Failed to decode messages: " + err.Error()
|
||||
cclog.ComponentError(r.name, msg)
|
||||
}
|
||||
for _, y := range messages {
|
||||
m, err := r.mp.ProcessMessage(y)
|
||||
if err == nil && m != nil && r.sink != nil {
|
||||
r.sink <- m
|
||||
}
|
||||
}
|
||||
}
|
||||
|
175
scripts/generate_docs.sh
Executable file
175
scripts/generate_docs.sh
Executable file
@ -0,0 +1,175 @@
|
||||
#!/bin/bash -l
|
||||
|
||||
SRCDIR="$(pwd)"
|
||||
DESTDIR="$1"
|
||||
|
||||
if [ -z "$DESTDIR" ]; then
|
||||
echo "Destination folder not provided"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
|
||||
COLLECTORS=$(find "${SRCDIR}/collectors" -name "*Metric.md")
|
||||
SINKS=$(find "${SRCDIR}/sinks" -name "*Sink.md")
|
||||
RECEIVERS=$(find "${SRCDIR}/receivers" -name "*Receiver.md")
|
||||
|
||||
|
||||
|
||||
# Collectors
|
||||
mkdir -p "${DESTDIR}/collectors"
|
||||
for F in $COLLECTORS; do
|
||||
echo "$F"
|
||||
FNAME=$(basename "$F")
|
||||
TITLE=$(grep -E "^##" "$F" | head -n 1 | sed -e 's+## ++g')
|
||||
echo "'${TITLE//\`/}'"
|
||||
if [ "${TITLE}" == "" ]; then continue; fi
|
||||
rm --force "${DESTDIR}/collectors/${FNAME}"
|
||||
cat << EOF >> "${DESTDIR}/collectors/${FNAME}"
|
||||
---
|
||||
title: ${TITLE//\`/}
|
||||
description: >
|
||||
Toplevel ${FNAME/.md/}
|
||||
categories: [cc-metric-collector]
|
||||
tags: [cc-metric-collector, Collector, ${FNAME/Metric.md/}]
|
||||
weight: 2
|
||||
---
|
||||
|
||||
EOF
|
||||
cat "$F" >> "${DESTDIR}/collectors/${FNAME}"
|
||||
done
|
||||
|
||||
if [ -e "${SRCDIR}/collectors/README.md" ]; then
|
||||
cat << EOF > "${DESTDIR}/collectors/_index.md"
|
||||
---
|
||||
title: cc-metric-collector's collectors
|
||||
description: Documentation of cc-metric-collector's collectors
|
||||
categories: [cc-metric-collector]
|
||||
tags: [cc-metric-collector, Collector, General]
|
||||
weight: 40
|
||||
---
|
||||
|
||||
EOF
|
||||
cat "${SRCDIR}/collectors/README.md" >> "${DESTDIR}/collectors/_index.md"
|
||||
fi
|
||||
|
||||
# Sinks
|
||||
mkdir -p "${DESTDIR}/sinks"
|
||||
for F in $SINKS; do
|
||||
echo "$F"
|
||||
FNAME=$(basename "$F")
|
||||
TITLE=$(grep -E "^##" "$F" | head -n 1 | sed -e 's+## ++g')
|
||||
echo "'${TITLE//\`/}'"
|
||||
if [ "${TITLE}" == "" ]; then continue; fi
|
||||
rm --force "${DESTDIR}/sinks/${FNAME}"
|
||||
cat << EOF >> "${DESTDIR}/sinks/${FNAME}"
|
||||
---
|
||||
title: ${TITLE//\`/}
|
||||
description: >
|
||||
Toplevel ${FNAME/.md/}
|
||||
categories: [cc-metric-collector]
|
||||
tags: [cc-metric-collector, Sink, ${FNAME/Sink.md/}]
|
||||
weight: 2
|
||||
---
|
||||
|
||||
EOF
|
||||
cat "$F" >> "${DESTDIR}/sinks/${FNAME}"
|
||||
done
|
||||
|
||||
if [ -e "${SRCDIR}/collectors/README.md" ]; then
|
||||
cat << EOF > "${DESTDIR}/sinks/_index.md"
|
||||
---
|
||||
title: cc-metric-collector's sinks
|
||||
description: Documentation of cc-metric-collector's sinks
|
||||
categories: [cc-metric-collector]
|
||||
tags: [cc-metric-collector, Sink, General]
|
||||
weight: 40
|
||||
---
|
||||
|
||||
EOF
|
||||
cat "${SRCDIR}/sinks/README.md" >> "${DESTDIR}/sinks/_index.md"
|
||||
fi
|
||||
|
||||
|
||||
# Receivers
|
||||
mkdir -p "${DESTDIR}/receivers"
|
||||
for F in $RECEIVERS; do
|
||||
echo "$F"
|
||||
FNAME=$(basename "$F")
|
||||
TITLE=$(grep -E "^##" "$F" | head -n 1 | sed -e 's+## ++g')
|
||||
echo "'${TITLE//\`/}'"
|
||||
if [ "${TITLE}" == "" ]; then continue; fi
|
||||
rm --force "${DESTDIR}/receivers/${FNAME}"
|
||||
cat << EOF >> "${DESTDIR}/receivers/${FNAME}"
|
||||
---
|
||||
title: ${TITLE//\`/}
|
||||
description: >
|
||||
Toplevel ${FNAME/.md/}
|
||||
categories: [cc-metric-collector]
|
||||
tags: [cc-metric-collector, Receiver, ${FNAME/Receiver.md/}]
|
||||
weight: 2
|
||||
---
|
||||
|
||||
EOF
|
||||
cat "$F" >> "${DESTDIR}/receivers/${FNAME}"
|
||||
done
|
||||
|
||||
if [ -e "${SRCDIR}/receivers/README.md" ]; then
|
||||
cat << EOF > "${DESTDIR}/receivers/_index.md"
|
||||
---
|
||||
title: cc-metric-collector's receivers
|
||||
description: Documentation of cc-metric-collector's receivers
|
||||
categories: [cc-metric-collector]
|
||||
tags: [cc-metric-collector, Receiver, General]
|
||||
weight: 40
|
||||
---
|
||||
|
||||
EOF
|
||||
cat "${SRCDIR}/receivers/README.md" >> "${DESTDIR}/receivers/_index.md"
|
||||
fi
|
||||
|
||||
mkdir -p "${DESTDIR}/internal/metricRouter"
|
||||
if [ -e "${SRCDIR}/internal/metricRouter/README.md" ]; then
|
||||
cat << EOF > "${DESTDIR}/internal/metricRouter/_index.md"
|
||||
---
|
||||
title: cc-metric-collector's router
|
||||
description: Documentation of cc-metric-collector's router
|
||||
categories: [cc-metric-collector]
|
||||
tags: [cc-metric-collector, Router, General]
|
||||
weight: 40
|
||||
---
|
||||
|
||||
EOF
|
||||
cat "${SRCDIR}/internal/metricRouter/README.md" >> "${DESTDIR}/internal/metricRouter/_index.md"
|
||||
fi
|
||||
|
||||
if [ -e "${SRCDIR}/README.md" ]; then
|
||||
cat << EOF > "${DESTDIR}/_index.md"
|
||||
---
|
||||
title: cc-metric-collector
|
||||
description: Documentation of cc-metric-collector
|
||||
categories: [cc-metric-collector]
|
||||
tags: [cc-metric-collector, General]
|
||||
weight: 40
|
||||
---
|
||||
|
||||
EOF
|
||||
cat "${SRCDIR}/README.md" >> "${DESTDIR}/_index.md"
|
||||
sed -i -e 's+README.md+_index.md+g' "${DESTDIR}/_index.md"
|
||||
fi
|
||||
|
||||
|
||||
mkdir -p "${DESTDIR}/pkg/messageProcessor"
|
||||
if [ -e "${SRCDIR}/pkg/messageProcessor/README.md" ]; then
|
||||
cat << EOF > "${DESTDIR}/pkg/messageProcessor/_index.md"
|
||||
---
|
||||
title: cc-metric-collector's message processor
|
||||
description: Documentation of cc-metric-collector's message processor
|
||||
categories: [cc-metric-collector]
|
||||
tags: [cc-metric-collector, Message Processor]
|
||||
weight: 40
|
||||
---
|
||||
|
||||
EOF
|
||||
cat "${SRCDIR}/pkg/messageProcessor/README.md" >> "${DESTDIR}/pkg/messageProcessor/_index.md"
|
||||
fi
|
||||
|
Loading…
x
Reference in New Issue
Block a user