From 15733cb1b7816aed385b32c694b66c133bf0e402 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Thu, 20 Jan 2022 10:14:28 +0100 Subject: [PATCH] move decodeLine function --- lineprotocol.go | 119 ++++++++++++++++++++++++++++++++++++++++++++++++ metric-store.go | 91 +----------------------------------- 2 files changed, 121 insertions(+), 89 deletions(-) diff --git a/lineprotocol.go b/lineprotocol.go index 33c1e0f..a5d99f6 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -2,10 +2,12 @@ package main import ( "context" + "fmt" "log" "math" "strconv" "sync" + "time" "github.com/influxdata/line-protocol/v2/lineprotocol" "github.com/nats-io/nats.go" @@ -113,3 +115,120 @@ func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder) erro log.Println("NATS connection closed") return nil } + +func decodeLine(dec *lineprotocol.Decoder) error { + for dec.Next() { + rawmeasurement, err := dec.Measurement() + if err != nil { + return err + } + + var cluster, host, typeName, typeId, subType, subTypeId string + for { + key, val, err := dec.NextTag() + if err != nil { + return err + } + if key == nil { + break + } + + switch string(key) { + case "cluster": + cluster = string(val) + case "hostname": + host = string(val) + case "type": + typeName = string(val) + case "type-id": + typeId = string(val) + case "subtype": + subType = string(val) + case "stype-id": + subTypeId = string(val) + default: + // Ignore unkown tags (cc-metric-collector might send us a unit for example that we do not need) + // return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val)) + } + } + + selector := make([]string, 2, 4) + selector[0] = cluster + selector[1] = host + if len(typeId) > 0 { + selector = append(selector, typeName+typeId) + if len(subTypeId) > 0 { + selector = append(selector, subType+subTypeId) + } + } + + metrics := make([]Metric, 0, 10) + measurement := string(rawmeasurement) + if measurement == "data" { + for { + key, val, err := dec.NextField() + if err != nil { + return err + } + + if key == nil { + break + } + + var value Float + if val.Kind() == lineprotocol.Float { + value = Float(val.FloatV()) + } else if val.Kind() == lineprotocol.Int { + value = Float(val.IntV()) + } else { + return fmt.Errorf("unsupported value type in message: %s", val.Kind().String()) + } + + metrics = append(metrics, Metric{ + Name: string(key), + Value: value, + }) + } + } else { + var value Float + for { + key, val, err := dec.NextField() + if err != nil { + return err + } + + if key == nil { + break + } + + if string(key) != "value" { + return fmt.Errorf("unkown field: '%s' (value: %#v)", string(key), val) + } + + if val.Kind() == lineprotocol.Float { + value = Float(val.FloatV()) + } else if val.Kind() == lineprotocol.Int { + value = Float(val.IntV()) + } else { + return fmt.Errorf("unsupported value type in message: %s", val.Kind().String()) + } + } + + metrics = append(metrics, Metric{ + Name: measurement, + Value: value, + }) + } + + t, err := dec.Time(lineprotocol.Second, time.Now()) + if err != nil { + return err + } + + // log.Printf("write: %s (%v) -> %v\n", string(measurement), selector, value) + if err := memoryStore.Write(selector, t.Unix(), metrics); err != nil { + return err + } + } + return nil +} diff --git a/metric-store.go b/metric-store.go index 7ba7622..1665dae 100644 --- a/metric-store.go +++ b/metric-store.go @@ -12,8 +12,6 @@ import ( "sync" "syscall" "time" - - "github.com/influxdata/line-protocol/v2/lineprotocol" ) type MetricConfig struct { @@ -55,91 +53,6 @@ func loadConfiguration(file string) Config { return config } -func handleLine(dec *lineprotocol.Decoder) error { - for dec.Next() { - measurement, err := dec.Measurement() - if err != nil { - return err - } - - var cluster, host, typeName, typeId, subType, subTypeId string - for { - key, val, err := dec.NextTag() - if err != nil { - return err - } - if key == nil { - break - } - - switch string(key) { - case "cluster": - cluster = string(val) - case "hostname": - host = string(val) - case "type": - typeName = string(val) - case "type-id": - typeId = string(val) - case "subtype": - subType = string(val) - case "stype-id": - subTypeId = string(val) - default: - // Ignore unkown tags (cc-metric-collector might send us a unit for example that we do not need) - // return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val)) - } - } - - selector := make([]string, 2, 4) - selector[0] = cluster - selector[1] = host - if len(typeId) > 0 { - selector = append(selector, typeName+typeId) - if len(subTypeId) > 0 { - selector = append(selector, subType+subTypeId) - } - } - - var value Float - for { - key, val, err := dec.NextField() - if err != nil { - return err - } - - if key == nil { - break - } - - if string(key) != "value" { - return fmt.Errorf("unkown field: '%s' (value: %#v)", string(key), val) - } - - if val.Kind() == lineprotocol.Float { - value = Float(val.FloatV()) - } else if val.Kind() == lineprotocol.Int { - value = Float(val.IntV()) - } else { - return fmt.Errorf("unsupported value type in message: %s", val.Kind().String()) - } - } - - t, err := dec.Time(lineprotocol.Second, time.Now()) - if err != nil { - return err - } - - // log.Printf("write: %s (%v) -> %v\n", string(measurement), selector, value) - if err := memoryStore.Write(selector, t.Unix(), []Metric{ - {Name: string(measurement), Value: value}, - }); err != nil { - return err - } - } - return nil -} - func intervals(wg *sync.WaitGroup, ctx context.Context) { wg.Add(3) go func() { @@ -270,8 +183,8 @@ func main() { wg.Add(1) go func() { - // err := ReceiveNats(conf.Nats, handleLine, runtime.NumCPU()-1, ctx) - err := ReceiveNats(conf.Nats, handleLine, 1, ctx) + // err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx) + err := ReceiveNats(conf.Nats, decodeLine, 1, ctx) if err != nil { log.Fatal(err)