mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-04-06 05:35:54 +02:00
Merge branch 'develop' into diskstat-new
This commit is contained in:
commit
eb23d8b1ed
20
.github/workflows/Release.yml
vendored
20
.github/workflows/Release.yml
vendored
@ -73,21 +73,21 @@ jobs:
|
|||||||
NEW_SRPM=${OLD_SRPM/el8/alma8}
|
NEW_SRPM=${OLD_SRPM/el8/alma8}
|
||||||
mv "${OLD_RPM}" "${NEW_RPM}"
|
mv "${OLD_RPM}" "${NEW_RPM}"
|
||||||
mv "${OLD_SRPM}" "${NEW_SRPM}"
|
mv "${OLD_SRPM}" "${NEW_SRPM}"
|
||||||
echo "EL8_SRPM=${NEW_SRPM}" >> $GITHUB_OUTPUT
|
echo "SRPM=${NEW_SRPM}" >> $GITHUB_OUTPUT
|
||||||
echo "EL8_RPM=${NEW_RPM}" >> $GITHUB_OUTPUT
|
echo "RPM=${NEW_RPM}" >> $GITHUB_OUTPUT
|
||||||
|
|
||||||
# See: https://github.com/actions/upload-artifact
|
# See: https://github.com/actions/upload-artifact
|
||||||
- name: Save RPM as artifact
|
- name: Save RPM as artifact
|
||||||
uses: actions/upload-artifact@v4
|
uses: actions/upload-artifact@v4
|
||||||
with:
|
with:
|
||||||
name: cc-metric-collector RPM for AlmaLinux 8
|
name: cc-metric-collector RPM for AlmaLinux 8
|
||||||
path: ${{ steps.rpmrename.outputs.EL8_RPM }}
|
path: ${{ steps.rpmrename.outputs.RPM }}
|
||||||
overwrite: true
|
overwrite: true
|
||||||
- name: Save SRPM as artifact
|
- name: Save SRPM as artifact
|
||||||
uses: actions/upload-artifact@v4
|
uses: actions/upload-artifact@v4
|
||||||
with:
|
with:
|
||||||
name: cc-metric-collector SRPM for AlmaLinux 8
|
name: cc-metric-collector SRPM for AlmaLinux 8
|
||||||
path: ${{ steps.rpmrename.outputs.EL8_SRPM }}
|
path: ${{ steps.rpmrename.outputs.SRPM }}
|
||||||
overwrite: true
|
overwrite: true
|
||||||
|
|
||||||
#
|
#
|
||||||
@ -152,21 +152,21 @@ jobs:
|
|||||||
NEW_SRPM=${OLD_SRPM/el9/alma9}
|
NEW_SRPM=${OLD_SRPM/el9/alma9}
|
||||||
mv "${OLD_RPM}" "${NEW_RPM}"
|
mv "${OLD_RPM}" "${NEW_RPM}"
|
||||||
mv "${OLD_SRPM}" "${NEW_SRPM}"
|
mv "${OLD_SRPM}" "${NEW_SRPM}"
|
||||||
echo "EL9_SRPM=${NEW_SRPM}" >> $GITHUB_OUTPUT
|
echo "SRPM=${NEW_SRPM}" >> $GITHUB_OUTPUT
|
||||||
echo "EL9_RPM=${NEW_RPM}" >> $GITHUB_OUTPUT
|
echo "RPM=${NEW_RPM}" >> $GITHUB_OUTPUT
|
||||||
|
|
||||||
# See: https://github.com/actions/upload-artifact
|
# See: https://github.com/actions/upload-artifact
|
||||||
- name: Save RPM as artifact
|
- name: Save RPM as artifact
|
||||||
uses: actions/upload-artifact@v4
|
uses: actions/upload-artifact@v4
|
||||||
with:
|
with:
|
||||||
name: cc-metric-collector RPM for AlmaLinux 9
|
name: cc-metric-collector RPM for AlmaLinux 9
|
||||||
path: ${{ steps.rpmrename.outputs.EL9_RPM }}
|
path: ${{ steps.rpmrename.outputs.RPM }}
|
||||||
overwrite: true
|
overwrite: true
|
||||||
- name: Save SRPM as artifact
|
- name: Save SRPM as artifact
|
||||||
uses: actions/upload-artifact@v4
|
uses: actions/upload-artifact@v4
|
||||||
with:
|
with:
|
||||||
name: cc-metric-collector SRPM for AlmaLinux 9
|
name: cc-metric-collector SRPM for AlmaLinux 9
|
||||||
path: ${{ steps.rpmrename.outputs.EL9_SRPM }}
|
path: ${{ steps.rpmrename.outputs.SRPM }}
|
||||||
overwrite: true
|
overwrite: true
|
||||||
|
|
||||||
#
|
#
|
||||||
@ -235,6 +235,10 @@ jobs:
|
|||||||
# See: https://catalog.redhat.com/software/containers/ubi8/ubi/5c359854d70cc534b3a3784e?container-tabs=gti
|
# See: https://catalog.redhat.com/software/containers/ubi8/ubi/5c359854d70cc534b3a3784e?container-tabs=gti
|
||||||
container: redhat/ubi9
|
container: redhat/ubi9
|
||||||
# The job outputs link to the outputs of the 'rpmbuild' step
|
# The job outputs link to the outputs of the 'rpmbuild' step
|
||||||
|
# The job outputs link to the outputs of the 'rpmbuild' step
|
||||||
|
outputs:
|
||||||
|
rpm : ${{steps.rpmbuild.outputs.RPM}}
|
||||||
|
srpm : ${{steps.rpmbuild.outputs.SRPM}}
|
||||||
steps:
|
steps:
|
||||||
|
|
||||||
# Use dnf to install development packages
|
# Use dnf to install development packages
|
||||||
|
@ -10,15 +10,16 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||||
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
lp "github.com/ClusterCockpit/cc-lib/ccMessage"
|
||||||
)
|
)
|
||||||
|
|
||||||
const NETSTATFILE = "/proc/net/dev"
|
const NETSTATFILE = "/proc/net/dev"
|
||||||
|
|
||||||
type NetstatCollectorConfig struct {
|
type NetstatCollectorConfig struct {
|
||||||
IncludeDevices []string `json:"include_devices"`
|
IncludeDevices []string `json:"include_devices"`
|
||||||
SendAbsoluteValues bool `json:"send_abs_values"`
|
SendAbsoluteValues bool `json:"send_abs_values"`
|
||||||
SendDerivedValues bool `json:"send_derived_values"`
|
SendDerivedValues bool `json:"send_derived_values"`
|
||||||
|
InterfaceAliases map[string][]string `json:"interface_aliases,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type NetstatCollectorMetric struct {
|
type NetstatCollectorMetric struct {
|
||||||
@ -32,9 +33,26 @@ type NetstatCollectorMetric struct {
|
|||||||
|
|
||||||
type NetstatCollector struct {
|
type NetstatCollector struct {
|
||||||
metricCollector
|
metricCollector
|
||||||
config NetstatCollectorConfig
|
config NetstatCollectorConfig
|
||||||
matches map[string][]NetstatCollectorMetric
|
aliasToCanonical map[string]string
|
||||||
lastTimestamp time.Time
|
matches map[string][]NetstatCollectorMetric
|
||||||
|
lastTimestamp time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *NetstatCollector) buildAliasMapping() {
|
||||||
|
m.aliasToCanonical = make(map[string]string)
|
||||||
|
for canon, aliases := range m.config.InterfaceAliases {
|
||||||
|
for _, alias := range aliases {
|
||||||
|
m.aliasToCanonical[alias] = canon
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCanonicalName(raw string, aliasToCanonical map[string]string) string {
|
||||||
|
if canon, ok := aliasToCanonical[raw]; ok {
|
||||||
|
return canon
|
||||||
|
}
|
||||||
|
return raw
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *NetstatCollector) Init(config json.RawMessage) error {
|
func (m *NetstatCollector) Init(config json.RawMessage) error {
|
||||||
@ -77,6 +95,8 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.buildAliasMapping()
|
||||||
|
|
||||||
// Check access to net statistic file
|
// Check access to net statistic file
|
||||||
file, err := os.Open(NETSTATFILE)
|
file, err := os.Open(NETSTATFILE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -97,18 +117,20 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
|||||||
// Split line into fields
|
// Split line into fields
|
||||||
f := strings.Fields(l)
|
f := strings.Fields(l)
|
||||||
|
|
||||||
// Get net device entry
|
// Get raw and canonical names
|
||||||
dev := strings.Trim(f[0], ": ")
|
raw := strings.Trim(f[0], ": ")
|
||||||
|
canonical := getCanonicalName(raw, m.aliasToCanonical)
|
||||||
|
|
||||||
// Check if device is a included device
|
// Check if device is a included device
|
||||||
if _, ok := stringArrayContains(m.config.IncludeDevices, dev); ok {
|
if _, ok := stringArrayContains(m.config.IncludeDevices, canonical); ok {
|
||||||
tags := map[string]string{"stype": "network", "stype-id": dev, "type": "node"}
|
// Tag will contain original device name (raw).
|
||||||
|
tags := map[string]string{"stype": "network", "stype-id": raw, "type": "node"}
|
||||||
meta_unit_byte := map[string]string{"source": m.name, "group": "Network", "unit": "bytes"}
|
meta_unit_byte := map[string]string{"source": m.name, "group": "Network", "unit": "bytes"}
|
||||||
meta_unit_byte_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "bytes/sec"}
|
meta_unit_byte_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "bytes/sec"}
|
||||||
meta_unit_pkts := map[string]string{"source": m.name, "group": "Network", "unit": "packets"}
|
meta_unit_pkts := map[string]string{"source": m.name, "group": "Network", "unit": "packets"}
|
||||||
meta_unit_pkts_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "packets/sec"}
|
meta_unit_pkts_per_sec := map[string]string{"source": m.name, "group": "Network", "unit": "packets/sec"}
|
||||||
|
|
||||||
m.matches[dev] = []NetstatCollectorMetric{
|
m.matches[canonical] = []NetstatCollectorMetric{
|
||||||
{
|
{
|
||||||
name: "net_bytes_in",
|
name: "net_bytes_in",
|
||||||
index: fieldReceiveBytes,
|
index: fieldReceiveBytes,
|
||||||
@ -143,7 +165,6 @@ func (m *NetstatCollector) Init(config json.RawMessage) error {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(m.matches) == 0 {
|
if len(m.matches) == 0 {
|
||||||
@ -164,7 +185,7 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMessage
|
|||||||
// Save current timestamp
|
// Save current timestamp
|
||||||
m.lastTimestamp = now
|
m.lastTimestamp = now
|
||||||
|
|
||||||
file, err := os.Open(string(NETSTATFILE))
|
file, err := os.Open(NETSTATFILE)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(m.name, err.Error())
|
cclog.ComponentError(m.name, err.Error())
|
||||||
return
|
return
|
||||||
@ -183,11 +204,12 @@ func (m *NetstatCollector) Read(interval time.Duration, output chan lp.CCMessage
|
|||||||
// Split line into fields
|
// Split line into fields
|
||||||
f := strings.Fields(l)
|
f := strings.Fields(l)
|
||||||
|
|
||||||
// Get net device entry
|
// Get raw and canonical names
|
||||||
dev := strings.Trim(f[0], ":")
|
raw := strings.Trim(f[0], ":")
|
||||||
|
canonical := getCanonicalName(raw, m.aliasToCanonical)
|
||||||
|
|
||||||
// Check if device is a included device
|
// Check if device is a included device
|
||||||
if devmetrics, ok := m.matches[dev]; ok {
|
if devmetrics, ok := m.matches[canonical]; ok {
|
||||||
for i := range devmetrics {
|
for i := range devmetrics {
|
||||||
metric := &devmetrics[i]
|
metric := &devmetrics[i]
|
||||||
|
|
||||||
|
@ -4,14 +4,19 @@
|
|||||||
```json
|
```json
|
||||||
"netstat": {
|
"netstat": {
|
||||||
"include_devices": [
|
"include_devices": [
|
||||||
"eth0"
|
"eth0",
|
||||||
|
"eno1"
|
||||||
],
|
],
|
||||||
"send_abs_values" : true,
|
"send_abs_values": true,
|
||||||
"send_derived_values" : true
|
"send_derived_values": true,
|
||||||
|
"interface_aliases": {
|
||||||
|
"eno1": ["eno1np0", "eno1_alt"],
|
||||||
|
"eth0": ["eth0_alias"]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
The `netstat` collector reads data from `/proc/net/dev` and outputs a handful **node** metrics. With the `include_devices` list you can specify which network devices should be measured. **Note**: Most other collectors use an _exclude_ list instead of an include list.
|
The `netstat` collector reads data from `/proc/net/dev` and outputs a handful **node** metrics. With the `include_devices` list you can specify which network devices should be measured. **Note**: Most other collectors use an _exclude_ list instead of an include list. Optionally, you can define an interface_aliases mapping. For each canonical device (as listed in include_devices), you may provide an array of aliases that may be reported by the system. When an alias is detected, it is preferred for matching, while the output tag stype-id always shows the actual system-reported name.
|
||||||
|
|
||||||
Metrics:
|
Metrics:
|
||||||
* `net_bytes_in` (`unit=bytes`)
|
* `net_bytes_in` (`unit=bytes`)
|
||||||
@ -23,5 +28,4 @@ Metrics:
|
|||||||
* `net_pkts_in_bw` (`unit=packets/sec` if `send_derived_values == true`)
|
* `net_pkts_in_bw` (`unit=packets/sec` if `send_derived_values == true`)
|
||||||
* `net_pkts_out_bw` (`unit=packets/sec` if `send_derived_values == true`)
|
* `net_pkts_out_bw` (`unit=packets/sec` if `send_derived_values == true`)
|
||||||
|
|
||||||
The device name is added as tag `stype=network,stype-id=<device>`.
|
The device name is added as tag `stype=network,stype-id=<device>`.
|
||||||
|
|
@ -1,11 +1,9 @@
|
|||||||
# Running average power limit (RAPL) metric collector
|
## `rapl` collector
|
||||||
|
|
||||||
This collector reads running average power limit (RAPL) monitoring attributes to compute average power consumption metrics. See <https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes>.
|
This collector reads running average power limit (RAPL) monitoring attributes to compute average power consumption metrics. See <https://www.kernel.org/doc/html/latest/power/powercap/powercap.html#monitoring-attributes>.
|
||||||
|
|
||||||
The Likwid metric collector provides similar functionality.
|
The Likwid metric collector provides similar functionality.
|
||||||
|
|
||||||
## Configuration
|
|
||||||
|
|
||||||
```json
|
```json
|
||||||
"rapl": {
|
"rapl": {
|
||||||
"exclude_device_by_id": ["0:1", "0:2"],
|
"exclude_device_by_id": ["0:1", "0:2"],
|
||||||
@ -13,6 +11,5 @@ The Likwid metric collector provides similar functionality.
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
## Metrics
|
Metrics:
|
||||||
|
|
||||||
* `rapl_average_power`: average power consumption in Watt. The average is computed over the entire runtime from the last measurement to the current measurement
|
* `rapl_average_power`: average power consumption in Watt. The average is computed over the entire runtime from the last measurement to the current measurement
|
||||||
|
@ -40,7 +40,7 @@ type metricRouterConfig struct {
|
|||||||
NormalizeUnits bool `json:"normalize_units"` // Check unit meta flag and normalize it using cc-units
|
NormalizeUnits bool `json:"normalize_units"` // Check unit meta flag and normalize it using cc-units
|
||||||
ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the metrics
|
ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the metrics
|
||||||
// dropMetrics map[string]bool // Internal map for O(1) lookup
|
// dropMetrics map[string]bool // Internal map for O(1) lookup
|
||||||
MessageProcessor json.RawMessage `json:"process_message,omitempty"`
|
MessageProcessor json.RawMessage `json:"process_messages,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Metric router data structure
|
// Metric router data structure
|
||||||
|
@ -22,27 +22,27 @@ type messageProcessorTagConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type messageProcessorConfig struct {
|
type messageProcessorConfig struct {
|
||||||
StageOrder []string `json:"stage_order,omitempty"` // List of stages to execute them in the specified order and to skip unrequired ones
|
StageOrder []string `json:"stage_order,omitempty"` // List of stages to execute them in the specified order and to skip unrequired ones
|
||||||
DropMessages []string `json:"drop_messages,omitempty"` // List of metric names to drop. For fine-grained dropping use drop_messages_if
|
DropMessages []string `json:"drop_messages,omitempty"` // List of metric names to drop. For fine-grained dropping use drop_messages_if
|
||||||
DropMessagesIf []string `json:"drop_messages_if,omitempty"` // List of evaluatable terms to drop messages
|
DropMessagesIf []string `json:"drop_messages_if,omitempty"` // List of evaluatable terms to drop messages
|
||||||
RenameMessages map[string]string `json:"rename_messages,omitempty"` // Map of metric names to rename
|
RenameMessages map[string]string `json:"rename_messages,omitempty"` // Map of metric names to rename
|
||||||
RenameMessagesIf map[string]string `json:"rename_messages_if,omitempty"` // Map to rename metric name based on a condition
|
RenameMessagesIf map[string]string `json:"rename_messages_if,omitempty"` // Map to rename metric name based on a condition
|
||||||
NormalizeUnits bool `json:"normalize_units,omitempty"` // Check unit meta flag and normalize it using cc-units
|
NormalizeUnits bool `json:"normalize_units,omitempty"` // Check unit meta flag and normalize it using cc-units
|
||||||
ChangeUnitPrefix map[string]string `json:"change_unit_prefix,omitempty"` // Add prefix that should be applied to the messages
|
ChangeUnitPrefix map[string]string `json:"change_unit_prefix,omitempty"` // Add prefix that should be applied to the messages
|
||||||
AddTagsIf []messageProcessorTagConfig `json:"add_tags_if"` // List of tags that are added when the condition is met
|
AddTagsIf []messageProcessorTagConfig `json:"add_tags_if,omitempty"` // List of tags that are added when the condition is met
|
||||||
DelTagsIf []messageProcessorTagConfig `json:"delete_tags_if"` // List of tags that are removed when the condition is met
|
DelTagsIf []messageProcessorTagConfig `json:"delete_tags_if,omitempty"` // List of tags that are removed when the condition is met
|
||||||
AddMetaIf []messageProcessorTagConfig `json:"add_meta_if"` // List of meta infos that are added when the condition is met
|
AddMetaIf []messageProcessorTagConfig `json:"add_meta_if,omitempty"` // List of meta infos that are added when the condition is met
|
||||||
DelMetaIf []messageProcessorTagConfig `json:"delete_meta_if"` // List of meta infos that are removed when the condition is met
|
DelMetaIf []messageProcessorTagConfig `json:"delete_meta_if,omitempty"` // List of meta infos that are removed when the condition is met
|
||||||
AddFieldIf []messageProcessorTagConfig `json:"add_field_if"` // List of fields that are added when the condition is met
|
AddFieldIf []messageProcessorTagConfig `json:"add_field_if,omitempty"` // List of fields that are added when the condition is met
|
||||||
DelFieldIf []messageProcessorTagConfig `json:"delete_field_if"` // List of fields that are removed when the condition is met
|
DelFieldIf []messageProcessorTagConfig `json:"delete_field_if,omitempty"` // List of fields that are removed when the condition is met
|
||||||
DropByType []string `json:"drop_by_message_type"` // List of message types that should be dropped
|
DropByType []string `json:"drop_by_message_type,omitempty"` // List of message types that should be dropped
|
||||||
MoveTagToMeta []messageProcessorTagConfig `json:"move_tag_to_meta_if"`
|
MoveTagToMeta []messageProcessorTagConfig `json:"move_tag_to_meta_if,omitempty"`
|
||||||
MoveTagToField []messageProcessorTagConfig `json:"move_tag_to_field_if"`
|
MoveTagToField []messageProcessorTagConfig `json:"move_tag_to_field_if,omitempty"`
|
||||||
MoveMetaToTag []messageProcessorTagConfig `json:"move_meta_to_tag_if"`
|
MoveMetaToTag []messageProcessorTagConfig `json:"move_meta_to_tag_if,omitempty"`
|
||||||
MoveMetaToField []messageProcessorTagConfig `json:"move_meta_to_field_if"`
|
MoveMetaToField []messageProcessorTagConfig `json:"move_meta_to_field_if,omitempty"`
|
||||||
MoveFieldToTag []messageProcessorTagConfig `json:"move_field_to_tag_if"`
|
MoveFieldToTag []messageProcessorTagConfig `json:"move_field_to_tag_if,omitempty"`
|
||||||
MoveFieldToMeta []messageProcessorTagConfig `json:"move_field_to_meta_if"`
|
MoveFieldToMeta []messageProcessorTagConfig `json:"move_field_to_meta_if,omitempty"`
|
||||||
AddBaseEnv map[string]interface{} `json:"add_base_env"`
|
AddBaseEnv map[string]interface{} `json:"add_base_env,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type messageProcessor struct {
|
type messageProcessor struct {
|
||||||
|
@ -13,7 +13,6 @@ import (
|
|||||||
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
||||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||||
mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
|
mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
|
||||||
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const HTTP_RECEIVER_PORT = "8080"
|
const HTTP_RECEIVER_PORT = "8080"
|
||||||
@ -151,80 +150,22 @@ func (r *HttpReceiver) ServerHttp(w http.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if r.sink != nil {
|
if r.sink != nil {
|
||||||
d := influx.NewDecoder(req.Body)
|
buf := make([]byte, 0, req.ContentLength)
|
||||||
for d.Next() {
|
len, err := req.Body.Read(buf)
|
||||||
|
if err == nil && len > 0 {
|
||||||
// Decode measurement name
|
messages, err := lp.FromBytes(buf)
|
||||||
measurement, err := d.Measurement()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := "ServerHttp: Failed to decode measurement: " + err.Error()
|
msg := "ServerHttp: Failed to decode messages: " + err.Error()
|
||||||
cclog.ComponentError(r.name, msg)
|
cclog.ComponentError(r.name, msg)
|
||||||
http.Error(w, msg, http.StatusInternalServerError)
|
http.Error(w, msg, http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
for _, y := range messages {
|
||||||
// Decode tags
|
m, err := r.mp.ProcessMessage(y)
|
||||||
tags := make(map[string]string)
|
if err == nil && m != nil {
|
||||||
for {
|
r.sink <- m
|
||||||
key, value, err := d.NextTag()
|
|
||||||
if err != nil {
|
|
||||||
msg := "ServerHttp: Failed to decode tag: " + err.Error()
|
|
||||||
cclog.ComponentError(r.name, msg)
|
|
||||||
http.Error(w, msg, http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
if key == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
tags[string(key)] = string(value)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode fields
|
|
||||||
fields := make(map[string]interface{})
|
|
||||||
for {
|
|
||||||
key, value, err := d.NextField()
|
|
||||||
if err != nil {
|
|
||||||
msg := "ServerHttp: Failed to decode field: " + err.Error()
|
|
||||||
cclog.ComponentError(r.name, msg)
|
|
||||||
http.Error(w, msg, http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if key == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
fields[string(key)] = value.Interface()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Decode time stamp
|
|
||||||
t, err := d.Time(influx.Nanosecond, time.Time{})
|
|
||||||
if err != nil {
|
|
||||||
msg := "ServerHttp: Failed to decode time stamp: " + err.Error()
|
|
||||||
cclog.ComponentError(r.name, msg)
|
|
||||||
http.Error(w, msg, http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
y, _ := lp.NewMessage(
|
|
||||||
string(measurement),
|
|
||||||
tags,
|
|
||||||
nil,
|
|
||||||
fields,
|
|
||||||
t,
|
|
||||||
)
|
|
||||||
|
|
||||||
m, err := r.mp.ProcessMessage(y)
|
|
||||||
if err == nil && m != nil {
|
|
||||||
r.sink <- m
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
// Check for IO errors
|
|
||||||
err := d.Err()
|
|
||||||
if err != nil {
|
|
||||||
msg := "ServerHttp: Failed to decode: " + err.Error()
|
|
||||||
cclog.ComponentError(r.name, msg)
|
|
||||||
http.Error(w, msg, http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,20 +5,18 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
|
||||||
|
|
||||||
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
||||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||||
mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
|
mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
|
||||||
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
|
|
||||||
nats "github.com/nats-io/nats.go"
|
nats "github.com/nats-io/nats.go"
|
||||||
)
|
)
|
||||||
|
|
||||||
type NatsReceiverConfig struct {
|
type NatsReceiverConfig struct {
|
||||||
defaultReceiverConfig
|
defaultReceiverConfig
|
||||||
Addr string `json:"address"`
|
Addr string `json:"address"`
|
||||||
Port string `json:"port"`
|
Port string `json:"port"`
|
||||||
Subject string `json:"subject"`
|
Subject string `json:"subject"`
|
||||||
User string `json:"user,omitempty"`
|
User string `json:"user,omitempty"`
|
||||||
Password string `json:"password,omitempty"`
|
Password string `json:"password,omitempty"`
|
||||||
NkeyFile string `json:"nkey_file,omitempty"`
|
NkeyFile string `json:"nkey_file,omitempty"`
|
||||||
@ -42,67 +40,15 @@ func (r *NatsReceiver) Start() {
|
|||||||
func (r *NatsReceiver) _NatsReceive(m *nats.Msg) {
|
func (r *NatsReceiver) _NatsReceive(m *nats.Msg) {
|
||||||
|
|
||||||
if r.sink != nil {
|
if r.sink != nil {
|
||||||
d := influx.NewDecoderWithBytes(m.Data)
|
messages, err := lp.FromBytes(m.Data)
|
||||||
for d.Next() {
|
if err != nil {
|
||||||
|
msg := "_NatsReceive: Failed to decode messages: " + err.Error()
|
||||||
// Decode measurement name
|
cclog.ComponentError(r.name, msg)
|
||||||
measurement, err := d.Measurement()
|
}
|
||||||
if err != nil {
|
for _, y := range messages {
|
||||||
msg := "_NatsReceive: Failed to decode measurement: " + err.Error()
|
m, err := r.mp.ProcessMessage(y)
|
||||||
cclog.ComponentError(r.name, msg)
|
if err == nil && m != nil && r.sink != nil {
|
||||||
return
|
r.sink <- m
|
||||||
}
|
|
||||||
|
|
||||||
// Decode tags
|
|
||||||
tags := make(map[string]string)
|
|
||||||
for {
|
|
||||||
key, value, err := d.NextTag()
|
|
||||||
if err != nil {
|
|
||||||
msg := "_NatsReceive: Failed to decode tag: " + err.Error()
|
|
||||||
cclog.ComponentError(r.name, msg)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if key == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
tags[string(key)] = string(value)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Decode fields
|
|
||||||
fields := make(map[string]interface{})
|
|
||||||
for {
|
|
||||||
key, value, err := d.NextField()
|
|
||||||
if err != nil {
|
|
||||||
msg := "_NatsReceive: Failed to decode field: " + err.Error()
|
|
||||||
cclog.ComponentError(r.name, msg)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if key == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
fields[string(key)] = value.Interface()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Decode time stamp
|
|
||||||
t, err := d.Time(influx.Nanosecond, time.Time{})
|
|
||||||
if err != nil {
|
|
||||||
msg := "_NatsReceive: Failed to decode time: " + err.Error()
|
|
||||||
cclog.ComponentError(r.name, msg)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
y, err := lp.NewMessage(
|
|
||||||
string(measurement),
|
|
||||||
tags,
|
|
||||||
nil,
|
|
||||||
fields,
|
|
||||||
t,
|
|
||||||
)
|
|
||||||
if err == nil {
|
|
||||||
m, err := r.mp.ProcessMessage(y)
|
|
||||||
if err == nil && m != nil && r.sink != nil {
|
|
||||||
r.sink <- m
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
175
scripts/generate_docs.sh
Executable file
175
scripts/generate_docs.sh
Executable file
@ -0,0 +1,175 @@
|
|||||||
|
#!/bin/bash -l
|
||||||
|
|
||||||
|
SRCDIR="$(pwd)"
|
||||||
|
DESTDIR="$1"
|
||||||
|
|
||||||
|
if [ -z "$DESTDIR" ]; then
|
||||||
|
echo "Destination folder not provided"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
|
||||||
|
COLLECTORS=$(find "${SRCDIR}/collectors" -name "*Metric.md")
|
||||||
|
SINKS=$(find "${SRCDIR}/sinks" -name "*Sink.md")
|
||||||
|
RECEIVERS=$(find "${SRCDIR}/receivers" -name "*Receiver.md")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# Collectors
|
||||||
|
mkdir -p "${DESTDIR}/collectors"
|
||||||
|
for F in $COLLECTORS; do
|
||||||
|
echo "$F"
|
||||||
|
FNAME=$(basename "$F")
|
||||||
|
TITLE=$(grep -E "^##" "$F" | head -n 1 | sed -e 's+## ++g')
|
||||||
|
echo "'${TITLE//\`/}'"
|
||||||
|
if [ "${TITLE}" == "" ]; then continue; fi
|
||||||
|
rm --force "${DESTDIR}/collectors/${FNAME}"
|
||||||
|
cat << EOF >> "${DESTDIR}/collectors/${FNAME}"
|
||||||
|
---
|
||||||
|
title: ${TITLE//\`/}
|
||||||
|
description: >
|
||||||
|
Toplevel ${FNAME/.md/}
|
||||||
|
categories: [cc-metric-collector]
|
||||||
|
tags: [cc-metric-collector, Collector, ${FNAME/Metric.md/}]
|
||||||
|
weight: 2
|
||||||
|
---
|
||||||
|
|
||||||
|
EOF
|
||||||
|
cat "$F" >> "${DESTDIR}/collectors/${FNAME}"
|
||||||
|
done
|
||||||
|
|
||||||
|
if [ -e "${SRCDIR}/collectors/README.md" ]; then
|
||||||
|
cat << EOF > "${DESTDIR}/collectors/_index.md"
|
||||||
|
---
|
||||||
|
title: cc-metric-collector's collectors
|
||||||
|
description: Documentation of cc-metric-collector's collectors
|
||||||
|
categories: [cc-metric-collector]
|
||||||
|
tags: [cc-metric-collector, Collector, General]
|
||||||
|
weight: 40
|
||||||
|
---
|
||||||
|
|
||||||
|
EOF
|
||||||
|
cat "${SRCDIR}/collectors/README.md" >> "${DESTDIR}/collectors/_index.md"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Sinks
|
||||||
|
mkdir -p "${DESTDIR}/sinks"
|
||||||
|
for F in $SINKS; do
|
||||||
|
echo "$F"
|
||||||
|
FNAME=$(basename "$F")
|
||||||
|
TITLE=$(grep -E "^##" "$F" | head -n 1 | sed -e 's+## ++g')
|
||||||
|
echo "'${TITLE//\`/}'"
|
||||||
|
if [ "${TITLE}" == "" ]; then continue; fi
|
||||||
|
rm --force "${DESTDIR}/sinks/${FNAME}"
|
||||||
|
cat << EOF >> "${DESTDIR}/sinks/${FNAME}"
|
||||||
|
---
|
||||||
|
title: ${TITLE//\`/}
|
||||||
|
description: >
|
||||||
|
Toplevel ${FNAME/.md/}
|
||||||
|
categories: [cc-metric-collector]
|
||||||
|
tags: [cc-metric-collector, Sink, ${FNAME/Sink.md/}]
|
||||||
|
weight: 2
|
||||||
|
---
|
||||||
|
|
||||||
|
EOF
|
||||||
|
cat "$F" >> "${DESTDIR}/sinks/${FNAME}"
|
||||||
|
done
|
||||||
|
|
||||||
|
if [ -e "${SRCDIR}/collectors/README.md" ]; then
|
||||||
|
cat << EOF > "${DESTDIR}/sinks/_index.md"
|
||||||
|
---
|
||||||
|
title: cc-metric-collector's sinks
|
||||||
|
description: Documentation of cc-metric-collector's sinks
|
||||||
|
categories: [cc-metric-collector]
|
||||||
|
tags: [cc-metric-collector, Sink, General]
|
||||||
|
weight: 40
|
||||||
|
---
|
||||||
|
|
||||||
|
EOF
|
||||||
|
cat "${SRCDIR}/sinks/README.md" >> "${DESTDIR}/sinks/_index.md"
|
||||||
|
fi
|
||||||
|
|
||||||
|
|
||||||
|
# Receivers
|
||||||
|
mkdir -p "${DESTDIR}/receivers"
|
||||||
|
for F in $RECEIVERS; do
|
||||||
|
echo "$F"
|
||||||
|
FNAME=$(basename "$F")
|
||||||
|
TITLE=$(grep -E "^##" "$F" | head -n 1 | sed -e 's+## ++g')
|
||||||
|
echo "'${TITLE//\`/}'"
|
||||||
|
if [ "${TITLE}" == "" ]; then continue; fi
|
||||||
|
rm --force "${DESTDIR}/receivers/${FNAME}"
|
||||||
|
cat << EOF >> "${DESTDIR}/receivers/${FNAME}"
|
||||||
|
---
|
||||||
|
title: ${TITLE//\`/}
|
||||||
|
description: >
|
||||||
|
Toplevel ${FNAME/.md/}
|
||||||
|
categories: [cc-metric-collector]
|
||||||
|
tags: [cc-metric-collector, Receiver, ${FNAME/Receiver.md/}]
|
||||||
|
weight: 2
|
||||||
|
---
|
||||||
|
|
||||||
|
EOF
|
||||||
|
cat "$F" >> "${DESTDIR}/receivers/${FNAME}"
|
||||||
|
done
|
||||||
|
|
||||||
|
if [ -e "${SRCDIR}/receivers/README.md" ]; then
|
||||||
|
cat << EOF > "${DESTDIR}/receivers/_index.md"
|
||||||
|
---
|
||||||
|
title: cc-metric-collector's receivers
|
||||||
|
description: Documentation of cc-metric-collector's receivers
|
||||||
|
categories: [cc-metric-collector]
|
||||||
|
tags: [cc-metric-collector, Receiver, General]
|
||||||
|
weight: 40
|
||||||
|
---
|
||||||
|
|
||||||
|
EOF
|
||||||
|
cat "${SRCDIR}/receivers/README.md" >> "${DESTDIR}/receivers/_index.md"
|
||||||
|
fi
|
||||||
|
|
||||||
|
mkdir -p "${DESTDIR}/internal/metricRouter"
|
||||||
|
if [ -e "${SRCDIR}/internal/metricRouter/README.md" ]; then
|
||||||
|
cat << EOF > "${DESTDIR}/internal/metricRouter/_index.md"
|
||||||
|
---
|
||||||
|
title: cc-metric-collector's router
|
||||||
|
description: Documentation of cc-metric-collector's router
|
||||||
|
categories: [cc-metric-collector]
|
||||||
|
tags: [cc-metric-collector, Router, General]
|
||||||
|
weight: 40
|
||||||
|
---
|
||||||
|
|
||||||
|
EOF
|
||||||
|
cat "${SRCDIR}/internal/metricRouter/README.md" >> "${DESTDIR}/internal/metricRouter/_index.md"
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [ -e "${SRCDIR}/README.md" ]; then
|
||||||
|
cat << EOF > "${DESTDIR}/_index.md"
|
||||||
|
---
|
||||||
|
title: cc-metric-collector
|
||||||
|
description: Documentation of cc-metric-collector
|
||||||
|
categories: [cc-metric-collector]
|
||||||
|
tags: [cc-metric-collector, General]
|
||||||
|
weight: 40
|
||||||
|
---
|
||||||
|
|
||||||
|
EOF
|
||||||
|
cat "${SRCDIR}/README.md" >> "${DESTDIR}/_index.md"
|
||||||
|
sed -i -e 's+README.md+_index.md+g' "${DESTDIR}/_index.md"
|
||||||
|
fi
|
||||||
|
|
||||||
|
|
||||||
|
mkdir -p "${DESTDIR}/pkg/messageProcessor"
|
||||||
|
if [ -e "${SRCDIR}/pkg/messageProcessor/README.md" ]; then
|
||||||
|
cat << EOF > "${DESTDIR}/pkg/messageProcessor/_index.md"
|
||||||
|
---
|
||||||
|
title: cc-metric-collector's message processor
|
||||||
|
description: Documentation of cc-metric-collector's message processor
|
||||||
|
categories: [cc-metric-collector]
|
||||||
|
tags: [cc-metric-collector, Message Processor]
|
||||||
|
weight: 40
|
||||||
|
---
|
||||||
|
|
||||||
|
EOF
|
||||||
|
cat "${SRCDIR}/pkg/messageProcessor/README.md" >> "${DESTDIR}/pkg/messageProcessor/_index.md"
|
||||||
|
fi
|
||||||
|
|
Loading…
x
Reference in New Issue
Block a user