mirror of
https://github.com/ClusterCockpit/cc-metric-store.git
synced 2024-12-28 09:39:05 +01:00
move decodeLine function
This commit is contained in:
parent
3fb95f88ee
commit
15733cb1b7
119
lineprotocol.go
119
lineprotocol.go
@ -2,10 +2,12 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"math"
|
"math"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/line-protocol/v2/lineprotocol"
|
"github.com/influxdata/line-protocol/v2/lineprotocol"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
@ -113,3 +115,120 @@ func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder) erro
|
|||||||
log.Println("NATS connection closed")
|
log.Println("NATS connection closed")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func decodeLine(dec *lineprotocol.Decoder) error {
|
||||||
|
for dec.Next() {
|
||||||
|
rawmeasurement, err := dec.Measurement()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var cluster, host, typeName, typeId, subType, subTypeId string
|
||||||
|
for {
|
||||||
|
key, val, err := dec.NextTag()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if key == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
switch string(key) {
|
||||||
|
case "cluster":
|
||||||
|
cluster = string(val)
|
||||||
|
case "hostname":
|
||||||
|
host = string(val)
|
||||||
|
case "type":
|
||||||
|
typeName = string(val)
|
||||||
|
case "type-id":
|
||||||
|
typeId = string(val)
|
||||||
|
case "subtype":
|
||||||
|
subType = string(val)
|
||||||
|
case "stype-id":
|
||||||
|
subTypeId = string(val)
|
||||||
|
default:
|
||||||
|
// Ignore unkown tags (cc-metric-collector might send us a unit for example that we do not need)
|
||||||
|
// return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
selector := make([]string, 2, 4)
|
||||||
|
selector[0] = cluster
|
||||||
|
selector[1] = host
|
||||||
|
if len(typeId) > 0 {
|
||||||
|
selector = append(selector, typeName+typeId)
|
||||||
|
if len(subTypeId) > 0 {
|
||||||
|
selector = append(selector, subType+subTypeId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics := make([]Metric, 0, 10)
|
||||||
|
measurement := string(rawmeasurement)
|
||||||
|
if measurement == "data" {
|
||||||
|
for {
|
||||||
|
key, val, err := dec.NextField()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if key == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
var value Float
|
||||||
|
if val.Kind() == lineprotocol.Float {
|
||||||
|
value = Float(val.FloatV())
|
||||||
|
} else if val.Kind() == lineprotocol.Int {
|
||||||
|
value = Float(val.IntV())
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("unsupported value type in message: %s", val.Kind().String())
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics = append(metrics, Metric{
|
||||||
|
Name: string(key),
|
||||||
|
Value: value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
var value Float
|
||||||
|
for {
|
||||||
|
key, val, err := dec.NextField()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if key == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(key) != "value" {
|
||||||
|
return fmt.Errorf("unkown field: '%s' (value: %#v)", string(key), val)
|
||||||
|
}
|
||||||
|
|
||||||
|
if val.Kind() == lineprotocol.Float {
|
||||||
|
value = Float(val.FloatV())
|
||||||
|
} else if val.Kind() == lineprotocol.Int {
|
||||||
|
value = Float(val.IntV())
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("unsupported value type in message: %s", val.Kind().String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics = append(metrics, Metric{
|
||||||
|
Name: measurement,
|
||||||
|
Value: value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
t, err := dec.Time(lineprotocol.Second, time.Now())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// log.Printf("write: %s (%v) -> %v\n", string(measurement), selector, value)
|
||||||
|
if err := memoryStore.Write(selector, t.Unix(), metrics); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -12,8 +12,6 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/line-protocol/v2/lineprotocol"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type MetricConfig struct {
|
type MetricConfig struct {
|
||||||
@ -55,91 +53,6 @@ func loadConfiguration(file string) Config {
|
|||||||
return config
|
return config
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleLine(dec *lineprotocol.Decoder) error {
|
|
||||||
for dec.Next() {
|
|
||||||
measurement, err := dec.Measurement()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var cluster, host, typeName, typeId, subType, subTypeId string
|
|
||||||
for {
|
|
||||||
key, val, err := dec.NextTag()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if key == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
switch string(key) {
|
|
||||||
case "cluster":
|
|
||||||
cluster = string(val)
|
|
||||||
case "hostname":
|
|
||||||
host = string(val)
|
|
||||||
case "type":
|
|
||||||
typeName = string(val)
|
|
||||||
case "type-id":
|
|
||||||
typeId = string(val)
|
|
||||||
case "subtype":
|
|
||||||
subType = string(val)
|
|
||||||
case "stype-id":
|
|
||||||
subTypeId = string(val)
|
|
||||||
default:
|
|
||||||
// Ignore unkown tags (cc-metric-collector might send us a unit for example that we do not need)
|
|
||||||
// return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
selector := make([]string, 2, 4)
|
|
||||||
selector[0] = cluster
|
|
||||||
selector[1] = host
|
|
||||||
if len(typeId) > 0 {
|
|
||||||
selector = append(selector, typeName+typeId)
|
|
||||||
if len(subTypeId) > 0 {
|
|
||||||
selector = append(selector, subType+subTypeId)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var value Float
|
|
||||||
for {
|
|
||||||
key, val, err := dec.NextField()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if key == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if string(key) != "value" {
|
|
||||||
return fmt.Errorf("unkown field: '%s' (value: %#v)", string(key), val)
|
|
||||||
}
|
|
||||||
|
|
||||||
if val.Kind() == lineprotocol.Float {
|
|
||||||
value = Float(val.FloatV())
|
|
||||||
} else if val.Kind() == lineprotocol.Int {
|
|
||||||
value = Float(val.IntV())
|
|
||||||
} else {
|
|
||||||
return fmt.Errorf("unsupported value type in message: %s", val.Kind().String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
t, err := dec.Time(lineprotocol.Second, time.Now())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// log.Printf("write: %s (%v) -> %v\n", string(measurement), selector, value)
|
|
||||||
if err := memoryStore.Write(selector, t.Unix(), []Metric{
|
|
||||||
{Name: string(measurement), Value: value},
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func intervals(wg *sync.WaitGroup, ctx context.Context) {
|
func intervals(wg *sync.WaitGroup, ctx context.Context) {
|
||||||
wg.Add(3)
|
wg.Add(3)
|
||||||
go func() {
|
go func() {
|
||||||
@ -270,8 +183,8 @@ func main() {
|
|||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
// err := ReceiveNats(conf.Nats, handleLine, runtime.NumCPU()-1, ctx)
|
// err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx)
|
||||||
err := ReceiveNats(conf.Nats, handleLine, 1, ctx)
|
err := ReceiveNats(conf.Nats, decodeLine, 1, ctx)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
Loading…
Reference in New Issue
Block a user