From 2fc6ad284f4954ecee9c8e181ca0f08399c1ee72 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Thu, 7 Oct 2021 14:59:07 +0200 Subject: [PATCH] Handle new line-protrocol format in handleLine --- archive.go | 4 +++ memstore.go | 3 +- metric-store.go | 91 ++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 77 insertions(+), 21 deletions(-) diff --git a/archive.go b/archive.go index b6f0087..5e32d49 100644 --- a/archive.go +++ b/archive.go @@ -192,6 +192,10 @@ func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error { func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, error) { direntries, err := os.ReadDir(dir) if err != nil { + if os.IsNotExist(err) { + return 0, nil + } + return 0, err } diff --git a/memstore.go b/memstore.go index 1bc1ce5..d9b81f2 100644 --- a/memstore.go +++ b/memstore.go @@ -283,7 +283,8 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error for _, metric := range metrics { minfo, ok := m.metrics[metric.Name] if !ok { - return errors.New("Unknown metric: " + metric.Name) + // return errors.New("Unknown metric: " + metric.Name) + continue } b := l.metrics[minfo.offset] diff --git a/metric-store.go b/metric-store.go index 5c7f806..a790494 100644 --- a/metric-store.go +++ b/metric-store.go @@ -10,6 +10,8 @@ import ( "sync" "syscall" "time" + + "github.com/influxdata/line-protocol/v2/lineprotocol" ) type MetricConfig struct { @@ -50,29 +52,78 @@ func loadConfiguration(file string) Config { return config } -func handleLine(line *Line) { - cluster, ok := line.Tags["cluster"] - if !ok { - log.Println("'cluster' tag missing") - return - } +func handleLine(dec *lineprotocol.Decoder) { + for dec.Next() { + measurement, err := dec.Measurement() + if err != nil { + log.Fatal(err) + } - host, ok := line.Tags["host"] - if !ok { - log.Println("'host' tag missing") - return - } + var cluster, host, typeName, typeId string + for { + key, val, err := dec.NextTag() + if err != nil { + log.Fatal(err) + } + if key == nil { + break + } - selector := []string{cluster, host} - if id, ok := line.Tags[line.Measurement]; ok { - selector = append(selector, line.Measurement+id) - } + switch string(key) { + case "cluster": + cluster = string(val) + case "host": + host = string(val) + case "type": + typeName = string(val) + case "type-id": + typeId = string(val) + default: + log.Fatalf("Unkown tag: '%s' (value: '%s')", string(key), string(val)) + } + } - ts := line.Ts.Unix() - // log.Printf("ts=%d, tags=%v\n", ts, selector) - err := memoryStore.Write(selector, ts, line.Fields) - if err != nil { - log.Printf("error: %s\n", err.Error()) + selector := make([]string, 0, 3) + selector = append(selector, cluster) + selector = append(selector, host) + if len(typeId) > 0 { + selector = append(selector, typeName+typeId) + } + + var value Float + for { + key, val, err := dec.NextField() + if err != nil { + log.Fatal(err) + } + + if key == nil { + break + } + + if string(key) != "value" { + log.Fatalf("Unkown field: '%s' (value: %#v)", string(key), val) + } + + if val.Kind() == lineprotocol.Float { + value = Float(val.FloatV()) + } else if val.Kind() == lineprotocol.Int { + value = Float(val.IntV()) + } else { + log.Fatalf("Unsupported value type in message: %s", val.Kind().String()) + } + } + + t, err := dec.Time(lineprotocol.Second, time.Now()) + if err != nil { + log.Fatal(err) + } + + if err := memoryStore.Write(selector, t.Unix(), []Metric{ + {Name: string(measurement), Value: value}, + }); err != nil { + log.Fatal(err) + } } }