diff --git a/.github/workflows/Release.yml b/.github/workflows/Release.yml index cb02a1a..a2dfff4 100644 --- a/.github/workflows/Release.yml +++ b/.github/workflows/Release.yml @@ -73,21 +73,21 @@ jobs: NEW_SRPM=${OLD_SRPM/el8/alma8} mv "${OLD_RPM}" "${NEW_RPM}" mv "${OLD_SRPM}" "${NEW_SRPM}" - echo "EL8_SRPM=${NEW_SRPM}" >> $GITHUB_OUTPUT - echo "EL8_RPM=${NEW_RPM}" >> $GITHUB_OUTPUT + echo "SRPM=${NEW_SRPM}" >> $GITHUB_OUTPUT + echo "RPM=${NEW_RPM}" >> $GITHUB_OUTPUT # See: https://github.com/actions/upload-artifact - name: Save RPM as artifact uses: actions/upload-artifact@v4 with: name: cc-metric-collector RPM for AlmaLinux 8 - path: ${{ steps.rpmrename.outputs.EL8_RPM }} + path: ${{ steps.rpmrename.outputs.RPM }} overwrite: true - name: Save SRPM as artifact uses: actions/upload-artifact@v4 with: name: cc-metric-collector SRPM for AlmaLinux 8 - path: ${{ steps.rpmrename.outputs.EL8_SRPM }} + path: ${{ steps.rpmrename.outputs.SRPM }} overwrite: true # @@ -152,21 +152,21 @@ jobs: NEW_SRPM=${OLD_SRPM/el9/alma9} mv "${OLD_RPM}" "${NEW_RPM}" mv "${OLD_SRPM}" "${NEW_SRPM}" - echo "EL9_SRPM=${NEW_SRPM}" >> $GITHUB_OUTPUT - echo "EL9_RPM=${NEW_RPM}" >> $GITHUB_OUTPUT + echo "SRPM=${NEW_SRPM}" >> $GITHUB_OUTPUT + echo "RPM=${NEW_RPM}" >> $GITHUB_OUTPUT # See: https://github.com/actions/upload-artifact - name: Save RPM as artifact uses: actions/upload-artifact@v4 with: name: cc-metric-collector RPM for AlmaLinux 9 - path: ${{ steps.rpmrename.outputs.EL9_RPM }} + path: ${{ steps.rpmrename.outputs.RPM }} overwrite: true - name: Save SRPM as artifact uses: actions/upload-artifact@v4 with: name: cc-metric-collector SRPM for AlmaLinux 9 - path: ${{ steps.rpmrename.outputs.EL9_SRPM }} + path: ${{ steps.rpmrename.outputs.SRPM }} overwrite: true # @@ -235,6 +235,10 @@ jobs: # See: https://catalog.redhat.com/software/containers/ubi8/ubi/5c359854d70cc534b3a3784e?container-tabs=gti container: redhat/ubi9 # The job outputs link to the outputs of the 'rpmbuild' step + # The job outputs link to the outputs of the 'rpmbuild' step + outputs: + rpm : ${{steps.rpmbuild.outputs.RPM}} + srpm : ${{steps.rpmbuild.outputs.SRPM}} steps: # Use dnf to install development packages diff --git a/collectors/raplMetric.md b/collectors/raplMetric.md index f857d7c..8eb792f 100644 --- a/collectors/raplMetric.md +++ b/collectors/raplMetric.md @@ -1,11 +1,9 @@ -# Running average power limit (RAPL) metric collector +## `rapl` collector This collector reads running average power limit (RAPL) monitoring attributes to compute average power consumption metrics. See . The Likwid metric collector provides similar functionality. -## Configuration - ```json "rapl": { "exclude_device_by_id": ["0:1", "0:2"], @@ -13,6 +11,5 @@ The Likwid metric collector provides similar functionality. } ``` -## Metrics - +Metrics: * `rapl_average_power`: average power consumption in Watt. The average is computed over the entire runtime from the last measurement to the current measurement diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index e30e436..ed08ccb 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -40,7 +40,7 @@ type metricRouterConfig struct { NormalizeUnits bool `json:"normalize_units"` // Check unit meta flag and normalize it using cc-units ChangeUnitPrefix map[string]string `json:"change_unit_prefix"` // Add prefix that should be applied to the metrics // dropMetrics map[string]bool // Internal map for O(1) lookup - MessageProcessor json.RawMessage `json:"process_message,omitempty"` + MessageProcessor json.RawMessage `json:"process_messages,omitempty"` } // Metric router data structure diff --git a/pkg/messageProcessor/messageProcessor.go b/pkg/messageProcessor/messageProcessor.go index 9bcc54a..6163fa8 100644 --- a/pkg/messageProcessor/messageProcessor.go +++ b/pkg/messageProcessor/messageProcessor.go @@ -22,27 +22,27 @@ type messageProcessorTagConfig struct { } type messageProcessorConfig struct { - StageOrder []string `json:"stage_order,omitempty"` // List of stages to execute them in the specified order and to skip unrequired ones - DropMessages []string `json:"drop_messages,omitempty"` // List of metric names to drop. For fine-grained dropping use drop_messages_if - DropMessagesIf []string `json:"drop_messages_if,omitempty"` // List of evaluatable terms to drop messages - RenameMessages map[string]string `json:"rename_messages,omitempty"` // Map of metric names to rename - RenameMessagesIf map[string]string `json:"rename_messages_if,omitempty"` // Map to rename metric name based on a condition - NormalizeUnits bool `json:"normalize_units,omitempty"` // Check unit meta flag and normalize it using cc-units - ChangeUnitPrefix map[string]string `json:"change_unit_prefix,omitempty"` // Add prefix that should be applied to the messages - AddTagsIf []messageProcessorTagConfig `json:"add_tags_if"` // List of tags that are added when the condition is met - DelTagsIf []messageProcessorTagConfig `json:"delete_tags_if"` // List of tags that are removed when the condition is met - AddMetaIf []messageProcessorTagConfig `json:"add_meta_if"` // List of meta infos that are added when the condition is met - DelMetaIf []messageProcessorTagConfig `json:"delete_meta_if"` // List of meta infos that are removed when the condition is met - AddFieldIf []messageProcessorTagConfig `json:"add_field_if"` // List of fields that are added when the condition is met - DelFieldIf []messageProcessorTagConfig `json:"delete_field_if"` // List of fields that are removed when the condition is met - DropByType []string `json:"drop_by_message_type"` // List of message types that should be dropped - MoveTagToMeta []messageProcessorTagConfig `json:"move_tag_to_meta_if"` - MoveTagToField []messageProcessorTagConfig `json:"move_tag_to_field_if"` - MoveMetaToTag []messageProcessorTagConfig `json:"move_meta_to_tag_if"` - MoveMetaToField []messageProcessorTagConfig `json:"move_meta_to_field_if"` - MoveFieldToTag []messageProcessorTagConfig `json:"move_field_to_tag_if"` - MoveFieldToMeta []messageProcessorTagConfig `json:"move_field_to_meta_if"` - AddBaseEnv map[string]interface{} `json:"add_base_env"` + StageOrder []string `json:"stage_order,omitempty"` // List of stages to execute them in the specified order and to skip unrequired ones + DropMessages []string `json:"drop_messages,omitempty"` // List of metric names to drop. For fine-grained dropping use drop_messages_if + DropMessagesIf []string `json:"drop_messages_if,omitempty"` // List of evaluatable terms to drop messages + RenameMessages map[string]string `json:"rename_messages,omitempty"` // Map of metric names to rename + RenameMessagesIf map[string]string `json:"rename_messages_if,omitempty"` // Map to rename metric name based on a condition + NormalizeUnits bool `json:"normalize_units,omitempty"` // Check unit meta flag and normalize it using cc-units + ChangeUnitPrefix map[string]string `json:"change_unit_prefix,omitempty"` // Add prefix that should be applied to the messages + AddTagsIf []messageProcessorTagConfig `json:"add_tags_if,omitempty"` // List of tags that are added when the condition is met + DelTagsIf []messageProcessorTagConfig `json:"delete_tags_if,omitempty"` // List of tags that are removed when the condition is met + AddMetaIf []messageProcessorTagConfig `json:"add_meta_if,omitempty"` // List of meta infos that are added when the condition is met + DelMetaIf []messageProcessorTagConfig `json:"delete_meta_if,omitempty"` // List of meta infos that are removed when the condition is met + AddFieldIf []messageProcessorTagConfig `json:"add_field_if,omitempty"` // List of fields that are added when the condition is met + DelFieldIf []messageProcessorTagConfig `json:"delete_field_if,omitempty"` // List of fields that are removed when the condition is met + DropByType []string `json:"drop_by_message_type,omitempty"` // List of message types that should be dropped + MoveTagToMeta []messageProcessorTagConfig `json:"move_tag_to_meta_if,omitempty"` + MoveTagToField []messageProcessorTagConfig `json:"move_tag_to_field_if,omitempty"` + MoveMetaToTag []messageProcessorTagConfig `json:"move_meta_to_tag_if,omitempty"` + MoveMetaToField []messageProcessorTagConfig `json:"move_meta_to_field_if,omitempty"` + MoveFieldToTag []messageProcessorTagConfig `json:"move_field_to_tag_if,omitempty"` + MoveFieldToMeta []messageProcessorTagConfig `json:"move_field_to_meta_if,omitempty"` + AddBaseEnv map[string]interface{} `json:"add_base_env,omitempty"` } type messageProcessor struct { diff --git a/receivers/httpReceiver.go b/receivers/httpReceiver.go index d7965c6..ae6d87b 100644 --- a/receivers/httpReceiver.go +++ b/receivers/httpReceiver.go @@ -13,7 +13,6 @@ import ( lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" - influx "github.com/influxdata/line-protocol/v2/lineprotocol" ) const HTTP_RECEIVER_PORT = "8080" @@ -151,80 +150,22 @@ func (r *HttpReceiver) ServerHttp(w http.ResponseWriter, req *http.Request) { } } if r.sink != nil { - d := influx.NewDecoder(req.Body) - for d.Next() { - - // Decode measurement name - measurement, err := d.Measurement() + buf := make([]byte, 0, req.ContentLength) + len, err := req.Body.Read(buf) + if err == nil && len > 0 { + messages, err := lp.FromBytes(buf) if err != nil { - msg := "ServerHttp: Failed to decode measurement: " + err.Error() + msg := "ServerHttp: Failed to decode messages: " + err.Error() cclog.ComponentError(r.name, msg) http.Error(w, msg, http.StatusInternalServerError) return } - - // Decode tags - tags := make(map[string]string) - for { - key, value, err := d.NextTag() - if err != nil { - msg := "ServerHttp: Failed to decode tag: " + err.Error() - cclog.ComponentError(r.name, msg) - http.Error(w, msg, http.StatusInternalServerError) - return + for _, y := range messages { + m, err := r.mp.ProcessMessage(y) + if err == nil && m != nil { + r.sink <- m } - if key == nil { - break - } - tags[string(key)] = string(value) } - - // Decode fields - fields := make(map[string]interface{}) - for { - key, value, err := d.NextField() - if err != nil { - msg := "ServerHttp: Failed to decode field: " + err.Error() - cclog.ComponentError(r.name, msg) - http.Error(w, msg, http.StatusInternalServerError) - return - } - if key == nil { - break - } - fields[string(key)] = value.Interface() - } - - // Decode time stamp - t, err := d.Time(influx.Nanosecond, time.Time{}) - if err != nil { - msg := "ServerHttp: Failed to decode time stamp: " + err.Error() - cclog.ComponentError(r.name, msg) - http.Error(w, msg, http.StatusInternalServerError) - return - } - - y, _ := lp.NewMessage( - string(measurement), - tags, - nil, - fields, - t, - ) - - m, err := r.mp.ProcessMessage(y) - if err == nil && m != nil { - r.sink <- m - } - - } - // Check for IO errors - err := d.Err() - if err != nil { - msg := "ServerHttp: Failed to decode: " + err.Error() - cclog.ComponentError(r.name, msg) - http.Error(w, msg, http.StatusInternalServerError) - return } } diff --git a/receivers/natsReceiver.go b/receivers/natsReceiver.go index 4f9f552..50072ec 100644 --- a/receivers/natsReceiver.go +++ b/receivers/natsReceiver.go @@ -5,20 +5,18 @@ import ( "errors" "fmt" "os" - "time" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" - influx "github.com/influxdata/line-protocol/v2/lineprotocol" nats "github.com/nats-io/nats.go" ) type NatsReceiverConfig struct { defaultReceiverConfig - Addr string `json:"address"` - Port string `json:"port"` - Subject string `json:"subject"` + Addr string `json:"address"` + Port string `json:"port"` + Subject string `json:"subject"` User string `json:"user,omitempty"` Password string `json:"password,omitempty"` NkeyFile string `json:"nkey_file,omitempty"` @@ -42,67 +40,15 @@ func (r *NatsReceiver) Start() { func (r *NatsReceiver) _NatsReceive(m *nats.Msg) { if r.sink != nil { - d := influx.NewDecoderWithBytes(m.Data) - for d.Next() { - - // Decode measurement name - measurement, err := d.Measurement() - if err != nil { - msg := "_NatsReceive: Failed to decode measurement: " + err.Error() - cclog.ComponentError(r.name, msg) - return - } - - // Decode tags - tags := make(map[string]string) - for { - key, value, err := d.NextTag() - if err != nil { - msg := "_NatsReceive: Failed to decode tag: " + err.Error() - cclog.ComponentError(r.name, msg) - return - } - if key == nil { - break - } - tags[string(key)] = string(value) - } - - // Decode fields - fields := make(map[string]interface{}) - for { - key, value, err := d.NextField() - if err != nil { - msg := "_NatsReceive: Failed to decode field: " + err.Error() - cclog.ComponentError(r.name, msg) - return - } - if key == nil { - break - } - fields[string(key)] = value.Interface() - } - - // Decode time stamp - t, err := d.Time(influx.Nanosecond, time.Time{}) - if err != nil { - msg := "_NatsReceive: Failed to decode time: " + err.Error() - cclog.ComponentError(r.name, msg) - return - } - - y, err := lp.NewMessage( - string(measurement), - tags, - nil, - fields, - t, - ) - if err == nil { - m, err := r.mp.ProcessMessage(y) - if err == nil && m != nil && r.sink != nil { - r.sink <- m - } + messages, err := lp.FromBytes(m.Data) + if err != nil { + msg := "_NatsReceive: Failed to decode messages: " + err.Error() + cclog.ComponentError(r.name, msg) + } + for _, y := range messages { + m, err := r.mp.ProcessMessage(y) + if err == nil && m != nil && r.sink != nil { + r.sink <- m } } } diff --git a/scripts/generate_docs.sh b/scripts/generate_docs.sh new file mode 100755 index 0000000..5f4a8c9 --- /dev/null +++ b/scripts/generate_docs.sh @@ -0,0 +1,175 @@ +#!/bin/bash -l + +SRCDIR="$(pwd)" +DESTDIR="$1" + +if [ -z "$DESTDIR" ]; then + echo "Destination folder not provided" + exit 1 +fi + + +COLLECTORS=$(find "${SRCDIR}/collectors" -name "*Metric.md") +SINKS=$(find "${SRCDIR}/sinks" -name "*Sink.md") +RECEIVERS=$(find "${SRCDIR}/receivers" -name "*Receiver.md") + + + +# Collectors +mkdir -p "${DESTDIR}/collectors" +for F in $COLLECTORS; do + echo "$F" + FNAME=$(basename "$F") + TITLE=$(grep -E "^##" "$F" | head -n 1 | sed -e 's+## ++g') + echo "'${TITLE//\`/}'" + if [ "${TITLE}" == "" ]; then continue; fi + rm --force "${DESTDIR}/collectors/${FNAME}" + cat << EOF >> "${DESTDIR}/collectors/${FNAME}" +--- +title: ${TITLE//\`/} +description: > + Toplevel ${FNAME/.md/} +categories: [cc-metric-collector] +tags: [cc-metric-collector, Collector, ${FNAME/Metric.md/}] +weight: 2 +--- + +EOF + cat "$F" >> "${DESTDIR}/collectors/${FNAME}" +done + +if [ -e "${SRCDIR}/collectors/README.md" ]; then + cat << EOF > "${DESTDIR}/collectors/_index.md" +--- +title: cc-metric-collector's collectors +description: Documentation of cc-metric-collector's collectors +categories: [cc-metric-collector] +tags: [cc-metric-collector, Collector, General] +weight: 40 +--- + +EOF + cat "${SRCDIR}/collectors/README.md" >> "${DESTDIR}/collectors/_index.md" +fi + +# Sinks +mkdir -p "${DESTDIR}/sinks" +for F in $SINKS; do + echo "$F" + FNAME=$(basename "$F") + TITLE=$(grep -E "^##" "$F" | head -n 1 | sed -e 's+## ++g') + echo "'${TITLE//\`/}'" + if [ "${TITLE}" == "" ]; then continue; fi + rm --force "${DESTDIR}/sinks/${FNAME}" + cat << EOF >> "${DESTDIR}/sinks/${FNAME}" +--- +title: ${TITLE//\`/} +description: > + Toplevel ${FNAME/.md/} +categories: [cc-metric-collector] +tags: [cc-metric-collector, Sink, ${FNAME/Sink.md/}] +weight: 2 +--- + +EOF + cat "$F" >> "${DESTDIR}/sinks/${FNAME}" +done + +if [ -e "${SRCDIR}/collectors/README.md" ]; then + cat << EOF > "${DESTDIR}/sinks/_index.md" +--- +title: cc-metric-collector's sinks +description: Documentation of cc-metric-collector's sinks +categories: [cc-metric-collector] +tags: [cc-metric-collector, Sink, General] +weight: 40 +--- + +EOF + cat "${SRCDIR}/sinks/README.md" >> "${DESTDIR}/sinks/_index.md" +fi + + +# Receivers +mkdir -p "${DESTDIR}/receivers" +for F in $RECEIVERS; do + echo "$F" + FNAME=$(basename "$F") + TITLE=$(grep -E "^##" "$F" | head -n 1 | sed -e 's+## ++g') + echo "'${TITLE//\`/}'" + if [ "${TITLE}" == "" ]; then continue; fi + rm --force "${DESTDIR}/receivers/${FNAME}" + cat << EOF >> "${DESTDIR}/receivers/${FNAME}" +--- +title: ${TITLE//\`/} +description: > + Toplevel ${FNAME/.md/} +categories: [cc-metric-collector] +tags: [cc-metric-collector, Receiver, ${FNAME/Receiver.md/}] +weight: 2 +--- + +EOF + cat "$F" >> "${DESTDIR}/receivers/${FNAME}" +done + +if [ -e "${SRCDIR}/receivers/README.md" ]; then + cat << EOF > "${DESTDIR}/receivers/_index.md" +--- +title: cc-metric-collector's receivers +description: Documentation of cc-metric-collector's receivers +categories: [cc-metric-collector] +tags: [cc-metric-collector, Receiver, General] +weight: 40 +--- + +EOF + cat "${SRCDIR}/receivers/README.md" >> "${DESTDIR}/receivers/_index.md" +fi + +mkdir -p "${DESTDIR}/internal/metricRouter" +if [ -e "${SRCDIR}/internal/metricRouter/README.md" ]; then + cat << EOF > "${DESTDIR}/internal/metricRouter/_index.md" +--- +title: cc-metric-collector's router +description: Documentation of cc-metric-collector's router +categories: [cc-metric-collector] +tags: [cc-metric-collector, Router, General] +weight: 40 +--- + +EOF + cat "${SRCDIR}/internal/metricRouter/README.md" >> "${DESTDIR}/internal/metricRouter/_index.md" +fi + +if [ -e "${SRCDIR}/README.md" ]; then + cat << EOF > "${DESTDIR}/_index.md" +--- +title: cc-metric-collector +description: Documentation of cc-metric-collector +categories: [cc-metric-collector] +tags: [cc-metric-collector, General] +weight: 40 +--- + +EOF + cat "${SRCDIR}/README.md" >> "${DESTDIR}/_index.md" + sed -i -e 's+README.md+_index.md+g' "${DESTDIR}/_index.md" +fi + + +mkdir -p "${DESTDIR}/pkg/messageProcessor" +if [ -e "${SRCDIR}/pkg/messageProcessor/README.md" ]; then + cat << EOF > "${DESTDIR}/pkg/messageProcessor/_index.md" +--- +title: cc-metric-collector's message processor +description: Documentation of cc-metric-collector's message processor +categories: [cc-metric-collector] +tags: [cc-metric-collector, Message Processor] +weight: 40 +--- + +EOF + cat "${SRCDIR}/pkg/messageProcessor/README.md" >> "${DESTDIR}/pkg/messageProcessor/_index.md" +fi +