Merge pull request #7 from ClusterCockpit/new-line-protocol-format

New line protocol format
This commit is contained in:
Lou 2021-10-11 16:29:16 +02:00 committed by GitHub
commit 26528151b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 161 additions and 304 deletions

2
.gitignore vendored
View File

@ -16,4 +16,4 @@
# vendor/
# Project specific ignores
/archive
/var

View File

@ -84,6 +84,8 @@ Example selectors:
### Config file
All durations are specified in seconds.
- `metrics`: Map of metric-name to objects with the following properties
- `frequency`: Timestep/Interval/Resolution of this metric (In seconds)
- `aggregation`: Can be `"sum"`, `"avg"` or `null`
@ -92,9 +94,7 @@ Example selectors:
- `"avg"` means that values from the child levels are averaged for the parent level
- `scope`: Unused at the moment, should be something like `"node"`, `"socket"` or `"cpu"`
- `nats`: Url of NATS.io server (The `updates` channel will be subscribed for metrics)
- `archive-root`: Directory to be used as archive
- `restore-last-hours`: After restart, load data from the past *X* hours back to memory
- `checkpoint-interval-hours`: Every *X* hours, write currently held data to disk
- `jwt-public-key`: Base64 encoded string, use this to verify requests to the HTTP API
### Test the complete setup (excluding ClusterCockpit itself)

19
api.go
View File

@ -1,6 +1,7 @@
package main
import (
"bufio"
"context"
"crypto/ed25519"
"encoding/base64"
@ -15,6 +16,7 @@ import (
"github.com/golang-jwt/jwt/v4"
"github.com/gorilla/mux"
"github.com/influxdata/line-protocol/v2/lineprotocol"
)
// Example:
@ -208,6 +210,22 @@ func handlePeek(rw http.ResponseWriter, r *http.Request) {
}
}
func handleWrite(rw http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
reader := bufio.NewReader(r.Body)
dec := lineprotocol.NewDecoder(reader)
// Unlike the name suggests, handleLine can handle multiple lines
if err := handleLine(dec); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
rw.WriteHeader(http.StatusOK)
}
func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
authheader := r.Header.Get("Authorization")
@ -244,6 +262,7 @@ func StartApiServer(address string, ctx context.Context) error {
r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats)
r.HandleFunc("/api/{to:[0-9]+}/free", handleFree)
r.HandleFunc("/api/{cluster}/peek", handlePeek)
r.HandleFunc("/api/write", handleWrite)
server := &http.Server{
Handler: r,

View File

@ -192,6 +192,10 @@ func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, error) {
direntries, err := os.ReadDir(dir)
if err != nil {
if os.IsNotExist(err) {
return 0, nil
}
return 0, err
}

3
go.mod
View File

@ -3,9 +3,10 @@ module github.com/ClusterCockpit/cc-metric-store
go 1.16
require (
github.com/golang-jwt/jwt/v4 v4.0.0 // indirect
github.com/golang-jwt/jwt/v4 v4.0.0
github.com/golang/protobuf v1.5.2 // indirect
github.com/gorilla/mux v1.8.0
github.com/influxdata/line-protocol/v2 v2.2.0
github.com/nats-io/nats-server/v2 v2.2.6 // indirect
github.com/nats-io/nats.go v1.11.0
)

19
go.sum
View File

@ -1,3 +1,7 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU=
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/golang-jwt/jwt/v4 v4.0.0 h1:RAqyYixv1p7uEnocuy8P1nru5wprCh/MH2BIlW5z5/o=
@ -14,11 +18,22 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/influxdata/line-protocol-corpus v0.0.0-20210519164801-ca6fa5da0184/go.mod h1:03nmhxzZ7Xk2pdG+lmMd7mHDfeVOYFyhOgwO61qWU98=
github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937/go.mod h1:BKR9c0uHSmRgM/se9JhFHtTT7JTO67X23MtKMHtZcpo=
github.com/influxdata/line-protocol/v2 v2.0.0-20210312151457-c52fdecb625a/go.mod h1:6+9Xt5Sq1rWx+glMgxhcg2c0DUaehK+5TDcPZ76GypY=
github.com/influxdata/line-protocol/v2 v2.1.0/go.mod h1:QKw43hdUBg3GTk2iC3iyCxksNj7PX9aUSeYOYE/ceHY=
github.com/influxdata/line-protocol/v2 v2.2.0 h1:UPmAqE15Hw5zu9E10SYhoXVLWnEJkWnuCbaCiRsA3c0=
github.com/influxdata/line-protocol/v2 v2.2.0/go.mod h1:DmB3Cnh+3oxmG6LOBIxce4oaL4CPj3OmMPgvauXh+tM=
github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk=
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
@ -34,6 +49,7 @@ github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as=
@ -61,3 +77,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -1,18 +1,13 @@
package main
import (
"bufio"
"context"
"errors"
"io"
"log"
"math"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/influxdata/line-protocol/v2/lineprotocol"
"github.com/nats-io/nats.go"
)
@ -56,185 +51,30 @@ type Metric struct {
Value Float
}
// measurement: node or cpu
// tags: host, cluster, cpu (cpu only if measurement is cpu)
// fields: metrics...
// t: timestamp (accuracy: seconds)
type Line struct {
Measurement string
Tags map[string]string
Fields []Metric
Ts time.Time
}
// Parse a single line as string.
//
// There is performance to be gained by implementing a parser
// that directly reads from a bufio.Scanner.
func ParseLine(rawline string) (*Line, error) {
line := &Line{}
parts := strings.Fields(rawline)
if len(parts) != 3 {
return nil, errors.New("line format error")
}
tagsAndMeasurement := strings.Split(parts[0], ",")
line.Measurement = tagsAndMeasurement[0]
line.Tags = map[string]string{}
for i := 1; i < len(tagsAndMeasurement); i++ {
pair := strings.Split(tagsAndMeasurement[i], "=")
if len(pair) != 2 {
return nil, errors.New("line format error")
}
line.Tags[pair[0]] = pair[1]
}
rawfields := strings.Split(parts[1], ",")
line.Fields = []Metric{}
for i := 0; i < len(rawfields); i++ {
pair := strings.Split(rawfields[i], "=")
if len(pair) != 2 {
return nil, errors.New("line format error")
}
field, err := strconv.ParseFloat(pair[1], 64)
if err != nil {
return nil, err
}
line.Fields = append(line.Fields, Metric{
Name: pair[0],
Value: Float(field),
})
}
unixTimestamp, err := strconv.ParseInt(parts[2], 10, 64)
if err != nil {
return nil, err
}
line.Ts = time.Unix(unixTimestamp, 0)
return line, nil
}
func ParseLines(raw string) ([]*Line, error) {
lines := make([]*Line, 0, 1)
for _, line := range strings.Split(raw, "\n") {
if len(line) == 0 {
continue
}
line, err := ParseLine(line)
if err != nil {
return nil, err
}
lines = append(lines, line)
}
return lines, nil
}
// Listen for connections sending metric data in the line protocol format.
//
// This is a blocking function, send `true` through the channel argument to shut down the server.
// `handleLine` will be called from different go routines for different connections.
//
func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) error {
ln, err := net.Listen("tcp", address)
if err != nil {
return err
}
handleConnection := func(conn net.Conn, handleLine func(line *Line)) {
reader := bufio.NewReader(conn)
for {
rawline, err := reader.ReadString('\n')
if err == io.EOF {
return
}
if err != nil {
log.Printf("reading from connection failed: %s\n", err.Error())
return
}
line, err := ParseLine(rawline)
if err != nil {
log.Printf("parsing line failed: %s\n", err.Error())
return
}
handleLine(line)
}
}
go func() {
for {
stop := <-done
if stop {
err := ln.Close()
if err != nil {
log.Printf("closing listener failed: %s\n", err.Error())
}
return
}
}
}()
for {
conn, err := ln.Accept()
if err != nil {
return err
}
go handleConnection(conn, handleLine)
}
}
// Connect to a nats server and subscribe to "updates". This is a blocking
// function. handleLine will be called for each line recieved via nats.
// Send `true` through the done channel for gracefull termination.
func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx context.Context) error {
func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder) error, workers int, ctx context.Context) error {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
return err
}
defer nc.Close()
var wg sync.WaitGroup
var sub *nats.Subscription
if workers < 2 {
sub, err = nc.Subscribe("updates", func(m *nats.Msg) {
lines, err := ParseLines(string(m.Data))
if err != nil {
log.Println(err.Error())
}
msgs := make(chan *nats.Msg, workers*2)
for _, line := range lines {
handleLine(line)
}
})
if err != nil {
return err
}
log.Printf("NATS subscription to 'updates' on '%s' established\n", address)
<-ctx.Done()
err = sub.Unsubscribe()
} else {
msgs := make(chan *nats.Msg, 16)
var wg sync.WaitGroup
if workers > 1 {
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
for m := range msgs {
lines, err := ParseLines(string(m.Data))
if err != nil {
log.Println(err.Error())
}
for _, line := range lines {
handleLine(line)
dec := lineprotocol.NewDecoderWithBytes(m.Data)
if err := handleLine(dec); err != nil {
log.Printf("error: %s\n", err.Error())
}
}
@ -245,22 +85,30 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx c
sub, err = nc.Subscribe("updates", func(m *nats.Msg) {
msgs <- m
})
if err != nil {
return err
}
log.Printf("NATS subscription to 'updates' on '%s' established\n", address)
<-ctx.Done()
err = sub.Unsubscribe()
close(msgs)
wg.Wait()
} else {
sub, err = nc.Subscribe("updates", func(m *nats.Msg) {
dec := lineprotocol.NewDecoderWithBytes(m.Data)
if err := handleLine(dec); err != nil {
log.Printf("error: %s\n", err.Error())
}
})
}
if err != nil {
return err
}
log.Printf("NATS subscription to 'updates' on '%s' established\n", address)
<-ctx.Done()
err = sub.Unsubscribe()
close(msgs)
wg.Wait()
if err != nil {
return err
}
nc.Close()
log.Println("NATS connection closed")
return nil

View File

@ -1,89 +0,0 @@
package main
import (
"bufio"
"reflect"
"strings"
"testing"
)
var raw = "node,host=lousxps,cluster=test mem_used=4692.252,proc_total=1083,load_five=0.91,cpu_user=1.424336e+06,cpu_guest_nice=0,cpu_guest=0,mem_available=9829.848,mem_slab=514.796,mem_free=4537.956,proc_run=2,cpu_idle=2.1589764e+07,swap_total=0,mem_cached=6368.5,swap_free=0,load_fifteen=0.93,cpu_nice=196,cpu_softirq=41456,mem_buffers=489.992,mem_total=16088.7,load_one=0.84,cpu_system=517223,cpu_iowait=8994,cpu_steal=0,cpu_irq=113265,mem_sreclaimable=362.452 1629356936\n"
var expectedMeasurement = `node`
var expectedTags = map[string]string{
"host": "lousxps",
"cluster": "test",
}
var expectedFields = []Metric{
{"mem_used", 4692.252},
{"proc_total", 1083},
{"load_five", 0.91},
{"cpu_user", 1.424336e+06},
{"cpu_guest_nice", 0},
{"cpu_guest", 0},
{"mem_available", 9829.848},
{"mem_slab", 514.796},
{"mem_free", 4537.956},
{"proc_run", 2},
{"cpu_idle", 2.1589764e+07},
{"swap_total", 0},
{"mem_cached", 6368.5},
{"swap_free", 0},
{"load_fifteen", 0.93},
{"cpu_nice", 196},
{"cpu_softirq", 41456},
{"mem_buffers", 489.992},
{"mem_total", 16088.7},
{"load_one", 0.84},
{"cpu_system", 517223},
{"cpu_iowait", 8994},
{"cpu_steal", 0},
{"cpu_irq", 113265},
{"mem_sreclaimable", 362.452},
}
var expectedTimestamp int64 = 1629356936
func TestParseLine(t *testing.T) {
line, err := ParseLine(raw)
if err != nil {
t.Error(err)
}
if line.Measurement != expectedMeasurement {
t.Error("measurement not as expected")
}
if line.Ts.Unix() != int64(expectedTimestamp) {
t.Error("timestamp not as expected")
}
if !reflect.DeepEqual(line.Tags, expectedTags) {
t.Error("tags not as expected")
}
if !reflect.DeepEqual(line.Fields, expectedFields) {
t.Error("fields not as expected")
}
}
func BenchmarkParseLine(b *testing.B) {
b.StopTimer()
lines := strings.Repeat(raw, b.N)
scanner := bufio.NewScanner(strings.NewReader(lines))
scanner.Split(bufio.ScanLines)
b.StartTimer()
for i := 0; i < b.N; i++ {
ok := scanner.Scan()
if !ok {
b.Error("woops")
return
}
line := scanner.Text()
_, err := ParseLine(line)
if err != nil {
b.Error(err)
return
}
}
}

View File

@ -16,7 +16,9 @@ const (
// So that we can reuse allocations
var bufferPool sync.Pool = sync.Pool{
New: func() interface{} {
return make([]Float, 0, BUFFER_CAP)
return &buffer{
data: make([]Float, 0, BUFFER_CAP),
}
},
}
@ -37,13 +39,12 @@ type buffer struct {
}
func newBuffer(ts, freq int64) *buffer {
return &buffer{
frequency: freq,
start: ts,
data: bufferPool.Get().([]Float)[:0],
prev: nil,
next: nil,
}
b := bufferPool.Get().(*buffer)
b.frequency = freq
b.start = ts
b.prev = nil
b.next = nil
return b
}
// If a new buffer was created, the new head is returnd.
@ -137,10 +138,11 @@ func (b *buffer) free(t int64) (int, error) {
}
n += 1
bufferPool.Put(b.data)
b.data = nil
b.frequency = 0
b.start = 0
b.next = nil
b.prev = nil
bufferPool.Put(b)
b = prev
}
return n, nil
@ -283,7 +285,8 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error
for _, metric := range metrics {
minfo, ok := m.metrics[metric.Name]
if !ok {
return errors.New("Unknown metric: " + metric.Name)
// return errors.New("Unknown metric: " + metric.Name)
continue
}
b := l.metrics[minfo.offset]

View File

@ -10,6 +10,8 @@ import (
"sync"
"syscall"
"time"
"github.com/influxdata/line-protocol/v2/lineprotocol"
)
type MetricConfig struct {
@ -50,30 +52,80 @@ func loadConfiguration(file string) Config {
return config
}
func handleLine(line *Line) {
cluster, ok := line.Tags["cluster"]
if !ok {
log.Println("'cluster' tag missing")
return
}
func handleLine(dec *lineprotocol.Decoder) error {
for dec.Next() {
measurement, err := dec.Measurement()
if err != nil {
return err
}
host, ok := line.Tags["host"]
if !ok {
log.Println("'host' tag missing")
return
}
var cluster, host, typeName, typeId string
for {
key, val, err := dec.NextTag()
if err != nil {
return err
}
if key == nil {
break
}
selector := []string{cluster, host}
if id, ok := line.Tags[line.Measurement]; ok {
selector = append(selector, line.Measurement+id)
}
switch string(key) {
case "cluster":
cluster = string(val)
case "hostname":
host = string(val)
case "type":
typeName = string(val)
case "type-id":
typeId = string(val)
default:
return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val))
}
}
ts := line.Ts.Unix()
// log.Printf("ts=%d, tags=%v\n", ts, selector)
err := memoryStore.Write(selector, ts, line.Fields)
if err != nil {
log.Printf("error: %s\n", err.Error())
selector := make([]string, 0, 3)
selector = append(selector, cluster)
selector = append(selector, host)
if len(typeId) > 0 {
selector = append(selector, typeName+typeId)
}
var value Float
for {
key, val, err := dec.NextField()
if err != nil {
return err
}
if key == nil {
break
}
if string(key) != "value" {
return fmt.Errorf("unkown field: '%s' (value: %#v)", string(key), val)
}
if val.Kind() == lineprotocol.Float {
value = Float(val.FloatV())
} else if val.Kind() == lineprotocol.Int {
value = Float(val.IntV())
} else {
return fmt.Errorf("unsupported value type in message: %s", val.Kind().String())
}
}
t, err := dec.Time(lineprotocol.Second, time.Now())
if err != nil {
return err
}
if err := memoryStore.Write(selector, t.Unix(), []Metric{
{Name: string(measurement), Value: value},
}); err != nil {
return err
}
}
return nil
}
func intervals(wg *sync.WaitGroup, ctx context.Context) {
@ -159,7 +211,7 @@ func main() {
conf = loadConfiguration("config.json")
memoryStore = NewMemoryStore(conf.Metrics)
restoreFrom := startupTime.Add(-time.Duration(conf.Checkpoints.Restore))
restoreFrom := startupTime.Add(-time.Duration(conf.Checkpoints.Restore) * time.Second)
files, err := memoryStore.FromCheckpoint(conf.Checkpoints.RootDir, restoreFrom.Unix())
if err != nil {
log.Fatalf("Loading checkpoints failed: %s\n", err.Error())