From 7b343d0bab7082f5e37b95700f05d5782ebca9e6 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Fri, 27 Dec 2024 15:22:59 +0000 Subject: [PATCH] Use CCMessage FromBytes instead of Influx's decoder --- receivers/httpReceiver.go | 77 +++++--------------------------------- receivers/natsReceiver.go | 78 ++++++--------------------------------- 2 files changed, 21 insertions(+), 134 deletions(-) diff --git a/receivers/httpReceiver.go b/receivers/httpReceiver.go index d7965c6..ae6d87b 100644 --- a/receivers/httpReceiver.go +++ b/receivers/httpReceiver.go @@ -13,7 +13,6 @@ import ( lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" - influx "github.com/influxdata/line-protocol/v2/lineprotocol" ) const HTTP_RECEIVER_PORT = "8080" @@ -151,80 +150,22 @@ func (r *HttpReceiver) ServerHttp(w http.ResponseWriter, req *http.Request) { } } if r.sink != nil { - d := influx.NewDecoder(req.Body) - for d.Next() { - - // Decode measurement name - measurement, err := d.Measurement() + buf := make([]byte, 0, req.ContentLength) + len, err := req.Body.Read(buf) + if err == nil && len > 0 { + messages, err := lp.FromBytes(buf) if err != nil { - msg := "ServerHttp: Failed to decode measurement: " + err.Error() + msg := "ServerHttp: Failed to decode messages: " + err.Error() cclog.ComponentError(r.name, msg) http.Error(w, msg, http.StatusInternalServerError) return } - - // Decode tags - tags := make(map[string]string) - for { - key, value, err := d.NextTag() - if err != nil { - msg := "ServerHttp: Failed to decode tag: " + err.Error() - cclog.ComponentError(r.name, msg) - http.Error(w, msg, http.StatusInternalServerError) - return + for _, y := range messages { + m, err := r.mp.ProcessMessage(y) + if err == nil && m != nil { + r.sink <- m } - if key == nil { - break - } - tags[string(key)] = string(value) } - - // Decode fields - fields := make(map[string]interface{}) - for { - key, value, err := d.NextField() - if err != nil { - msg := "ServerHttp: Failed to decode field: " + err.Error() - cclog.ComponentError(r.name, msg) - http.Error(w, msg, http.StatusInternalServerError) - return - } - if key == nil { - break - } - fields[string(key)] = value.Interface() - } - - // Decode time stamp - t, err := d.Time(influx.Nanosecond, time.Time{}) - if err != nil { - msg := "ServerHttp: Failed to decode time stamp: " + err.Error() - cclog.ComponentError(r.name, msg) - http.Error(w, msg, http.StatusInternalServerError) - return - } - - y, _ := lp.NewMessage( - string(measurement), - tags, - nil, - fields, - t, - ) - - m, err := r.mp.ProcessMessage(y) - if err == nil && m != nil { - r.sink <- m - } - - } - // Check for IO errors - err := d.Err() - if err != nil { - msg := "ServerHttp: Failed to decode: " + err.Error() - cclog.ComponentError(r.name, msg) - http.Error(w, msg, http.StatusInternalServerError) - return } } diff --git a/receivers/natsReceiver.go b/receivers/natsReceiver.go index 4f9f552..50072ec 100644 --- a/receivers/natsReceiver.go +++ b/receivers/natsReceiver.go @@ -5,20 +5,18 @@ import ( "errors" "fmt" "os" - "time" lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" - influx "github.com/influxdata/line-protocol/v2/lineprotocol" nats "github.com/nats-io/nats.go" ) type NatsReceiverConfig struct { defaultReceiverConfig - Addr string `json:"address"` - Port string `json:"port"` - Subject string `json:"subject"` + Addr string `json:"address"` + Port string `json:"port"` + Subject string `json:"subject"` User string `json:"user,omitempty"` Password string `json:"password,omitempty"` NkeyFile string `json:"nkey_file,omitempty"` @@ -42,67 +40,15 @@ func (r *NatsReceiver) Start() { func (r *NatsReceiver) _NatsReceive(m *nats.Msg) { if r.sink != nil { - d := influx.NewDecoderWithBytes(m.Data) - for d.Next() { - - // Decode measurement name - measurement, err := d.Measurement() - if err != nil { - msg := "_NatsReceive: Failed to decode measurement: " + err.Error() - cclog.ComponentError(r.name, msg) - return - } - - // Decode tags - tags := make(map[string]string) - for { - key, value, err := d.NextTag() - if err != nil { - msg := "_NatsReceive: Failed to decode tag: " + err.Error() - cclog.ComponentError(r.name, msg) - return - } - if key == nil { - break - } - tags[string(key)] = string(value) - } - - // Decode fields - fields := make(map[string]interface{}) - for { - key, value, err := d.NextField() - if err != nil { - msg := "_NatsReceive: Failed to decode field: " + err.Error() - cclog.ComponentError(r.name, msg) - return - } - if key == nil { - break - } - fields[string(key)] = value.Interface() - } - - // Decode time stamp - t, err := d.Time(influx.Nanosecond, time.Time{}) - if err != nil { - msg := "_NatsReceive: Failed to decode time: " + err.Error() - cclog.ComponentError(r.name, msg) - return - } - - y, err := lp.NewMessage( - string(measurement), - tags, - nil, - fields, - t, - ) - if err == nil { - m, err := r.mp.ProcessMessage(y) - if err == nil && m != nil && r.sink != nil { - r.sink <- m - } + messages, err := lp.FromBytes(m.Data) + if err != nil { + msg := "_NatsReceive: Failed to decode messages: " + err.Error() + cclog.ComponentError(r.name, msg) + } + for _, y := range messages { + m, err := r.mp.ProcessMessage(y) + if err == nil && m != nil && r.sink != nil { + r.sink <- m } } }