github.com/influxdata/line-protocol -> github.com/influxdata/line-protocol/v2/lineprotocol

This commit is contained in:
Holger Obermaier 2023-09-15 15:59:03 +02:00
parent baa45b833b
commit e34b0166f9

View File

@ -5,15 +5,15 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
"github.com/gorilla/mux"
influx "github.com/influxdata/line-protocol"
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
)
const HTTP_RECEIVER_PORT = "8080"
@ -27,13 +27,11 @@ type HttpReceiverConfig struct {
type HttpReceiver struct {
receiver
handler *influx.MetricHandler
parser *influx.Parser
meta map[string]string
config HttpReceiverConfig
router *mux.Router
server *http.Server
wg sync.WaitGroup
meta map[string]string
config HttpReceiverConfig
router *mux.Router
server *http.Server
wg sync.WaitGroup
}
func (r *HttpReceiver) Init(name string, config json.RawMessage) error {
@ -57,9 +55,6 @@ func (r *HttpReceiver) Init(name string, config json.RawMessage) error {
addr := fmt.Sprintf("%s:%s", r.config.Addr, r.config.Port)
uri := addr + p
cclog.ComponentDebug(r.name, "INIT", "listen on:", uri)
r.handler = influx.NewMetricHandler()
r.parser = influx.NewParser(r.handler)
r.parser.SetTimeFunc(DefaultTime)
// Create new router and register p as path
r.router = mux.NewRouter()
@ -91,26 +86,80 @@ func (r *HttpReceiver) ServerHttp(w http.ResponseWriter, req *http.Request) {
return
}
body, err := io.ReadAll(req.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
metrics, err := r.parser.Parse(body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
d := influx.NewDecoder(req.Body)
for d.Next() {
for _, m := range metrics {
y := lp.FromInfluxMetric(m)
for k, v := range r.meta {
y.AddMeta(k, v)
// Decode measurement name
measurement, err := d.Measurement()
if err != nil {
msg := "ServerHttp: Failed to decode measurement: " + err.Error()
cclog.ComponentError(r.name, msg)
http.Error(w, msg, http.StatusInternalServerError)
return
}
// Decode tags
tags := make(map[string]string)
for {
key, value, err := d.NextTag()
if err != nil {
msg := "ServerHttp: Failed to decode tag: " + err.Error()
cclog.ComponentError(r.name, msg)
http.Error(w, msg, http.StatusInternalServerError)
return
}
if key == nil {
break
}
tags[string(key)] = string(value)
}
// Decode fields
fields := make(map[string]interface{})
for {
key, value, err := d.NextField()
if err != nil {
msg := "ServerHttp: Failed to decode field: " + err.Error()
cclog.ComponentError(r.name, msg)
http.Error(w, msg, http.StatusInternalServerError)
return
}
if key == nil {
break
}
fields[string(key)] = value.Interface()
}
t, err := d.Time(influx.Nanosecond, time.Time{})
if err != nil {
msg := "ServerHttp: Failed to decode time: " + err.Error()
cclog.ComponentError(r.name, msg)
http.Error(w, msg, http.StatusInternalServerError)
return
}
y, _ := lp.New(
string(measurement),
tags,
r.meta,
fields,
t,
)
if r.sink != nil {
r.sink <- y
}
}
// Check for IO errors
err := d.Err()
if err != nil {
msg := "ServerHttp: Failed to decode: " + err.Error()
cclog.ComponentError(r.name, msg)
http.Error(w, msg, http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}