Handle new line-protrocol format in handleLine

This commit is contained in:
Lou Knauer 2021-10-07 14:59:07 +02:00
parent 85591e7a03
commit 2fc6ad284f
3 changed files with 77 additions and 21 deletions

View File

@ -192,6 +192,10 @@ func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, error) { func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, error) {
direntries, err := os.ReadDir(dir) direntries, err := os.ReadDir(dir)
if err != nil { if err != nil {
if os.IsNotExist(err) {
return 0, nil
}
return 0, err return 0, err
} }

View File

@ -283,7 +283,8 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error
for _, metric := range metrics { for _, metric := range metrics {
minfo, ok := m.metrics[metric.Name] minfo, ok := m.metrics[metric.Name]
if !ok { if !ok {
return errors.New("Unknown metric: " + metric.Name) // return errors.New("Unknown metric: " + metric.Name)
continue
} }
b := l.metrics[minfo.offset] b := l.metrics[minfo.offset]

View File

@ -10,6 +10,8 @@ import (
"sync" "sync"
"syscall" "syscall"
"time" "time"
"github.com/influxdata/line-protocol/v2/lineprotocol"
) )
type MetricConfig struct { type MetricConfig struct {
@ -50,29 +52,78 @@ func loadConfiguration(file string) Config {
return config return config
} }
func handleLine(line *Line) { func handleLine(dec *lineprotocol.Decoder) {
cluster, ok := line.Tags["cluster"] for dec.Next() {
if !ok { measurement, err := dec.Measurement()
log.Println("'cluster' tag missing")
return
}
host, ok := line.Tags["host"]
if !ok {
log.Println("'host' tag missing")
return
}
selector := []string{cluster, host}
if id, ok := line.Tags[line.Measurement]; ok {
selector = append(selector, line.Measurement+id)
}
ts := line.Ts.Unix()
// log.Printf("ts=%d, tags=%v\n", ts, selector)
err := memoryStore.Write(selector, ts, line.Fields)
if err != nil { if err != nil {
log.Printf("error: %s\n", err.Error()) log.Fatal(err)
}
var cluster, host, typeName, typeId string
for {
key, val, err := dec.NextTag()
if err != nil {
log.Fatal(err)
}
if key == nil {
break
}
switch string(key) {
case "cluster":
cluster = string(val)
case "host":
host = string(val)
case "type":
typeName = string(val)
case "type-id":
typeId = string(val)
default:
log.Fatalf("Unkown tag: '%s' (value: '%s')", string(key), string(val))
}
}
selector := make([]string, 0, 3)
selector = append(selector, cluster)
selector = append(selector, host)
if len(typeId) > 0 {
selector = append(selector, typeName+typeId)
}
var value Float
for {
key, val, err := dec.NextField()
if err != nil {
log.Fatal(err)
}
if key == nil {
break
}
if string(key) != "value" {
log.Fatalf("Unkown field: '%s' (value: %#v)", string(key), val)
}
if val.Kind() == lineprotocol.Float {
value = Float(val.FloatV())
} else if val.Kind() == lineprotocol.Int {
value = Float(val.IntV())
} else {
log.Fatalf("Unsupported value type in message: %s", val.Kind().String())
}
}
t, err := dec.Time(lineprotocol.Second, time.Now())
if err != nil {
log.Fatal(err)
}
if err := memoryStore.Write(selector, t.Unix(), []Metric{
{Name: string(measurement), Value: value},
}); err != nil {
log.Fatal(err)
}
} }
} }