Fix metric-store.go and apo.go, new config file format

This commit is contained in:
Lou Knauer 2021-08-31 15:17:36 +02:00
parent 10f0da6000
commit 4c51f5c613
6 changed files with 730 additions and 212 deletions

166
api.go
View File

@ -4,44 +4,30 @@ import (
"context"
"encoding/json"
"log"
"math"
"net/http"
"strconv"
"time"
"github.com/ClusterCockpit/cc-metric-store/lineprotocol"
"github.com/gorilla/mux"
)
type HostData struct {
Host string `json:"host"`
Start int64 `json:"start"`
Data []lineprotocol.Float `json:"data"`
// Example:
// [
// { "selector": ["emmy", "host123"], "metrics": ["load_one"] }
// ]
type ApiRequestBody []struct {
Selector []string `json:"selector"`
Metrics []string `json:"metrics"`
}
type MetricData struct {
Hosts []HostData `json:"hosts"`
type ApiMetricData struct {
From int64 `json:"from"`
To int64 `json:"to"`
Data []Float `json:"data"`
}
type TimeseriesResponse map[string]MetricData
type HostStats struct {
Host string `json:"host"`
Sampels int `json:"sampels"`
Avg lineprotocol.Float `json:"avg"`
Min lineprotocol.Float `json:"min"`
Max lineprotocol.Float `json:"max"`
}
type MetricStats struct {
Hosts []HostStats `json:"hosts"`
}
type StatsResponse map[string]MetricStats
func handleTimeseries(rw http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
cluster := vars["cluster"]
from, err := strconv.ParseInt(vars["from"], 10, 64)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
@ -53,132 +39,40 @@ func handleTimeseries(rw http.ResponseWriter, r *http.Request) {
return
}
values := r.URL.Query()
hosts := values["host"]
metrics := values["metric"]
if len(hosts) < 1 || len(metrics) < 1 {
http.Error(rw, "no hosts or metrics specified", http.StatusBadRequest)
if r.Method != http.MethodPost {
http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
response := TimeseriesResponse{}
store, ok := metricStores[vars["class"]]
if !ok {
http.Error(rw, "invalid class", http.StatusInternalServerError)
bodyDec := json.NewDecoder(r.Body)
var reqBody ApiRequestBody
err = bodyDec.Decode(&reqBody)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
for _, metric := range metrics {
hostsdata := []HostData{}
for _, host := range hosts {
key := cluster + ":" + host
data, start, err := store.GetMetric(key, metric, from, to)
res := make([]map[string]ApiMetricData, 0, len(reqBody))
for _, req := range reqBody {
metrics := make(map[string]ApiMetricData)
for _, metric := range req.Metrics {
data, f, t, err := memoryStore.Read(req.Selector, metric, from, to)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
hostsdata = append(hostsdata, HostData{
Host: host,
Start: start,
metrics[metric] = ApiMetricData{
From: f,
To: t,
Data: data,
})
}
response[metric] = MetricData{
Hosts: hostsdata,
}
res = append(res, metrics)
}
rw.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(rw).Encode(response)
if err != nil {
log.Println(err.Error())
}
}
func handleStats(rw http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
cluster := vars["cluster"]
from, err := strconv.ParseInt(vars["from"], 10, 64)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
to, err := strconv.ParseInt(vars["to"], 10, 64)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
values := r.URL.Query()
hosts := values["host"]
metrics := values["metric"]
if len(hosts) < 1 || len(metrics) < 1 {
http.Error(rw, "no hosts or metrics specified", http.StatusBadRequest)
return
}
response := StatsResponse{}
store, ok := metricStores[vars["class"]]
if !ok {
http.Error(rw, "invalid class", http.StatusInternalServerError)
return
}
for _, metric := range metrics {
hoststats := []HostStats{}
for _, host := range hosts {
key := cluster + ":" + host
min, max := math.MaxFloat64, -math.MaxFloat64
samples := 0
sum, err := store.Reduce(key, metric, from, to, func(t int64, sum, x lineprotocol.Float) lineprotocol.Float {
if math.IsNaN(float64(x)) {
return sum
}
samples += 1
min = math.Min(min, float64(x))
max = math.Max(max, float64(x))
return sum + x
}, 0.)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
hoststats = append(hoststats, HostStats{
Host: host,
Sampels: samples,
Avg: sum / lineprotocol.Float(samples),
Min: lineprotocol.Float(min),
Max: lineprotocol.Float(max),
})
}
response[metric] = MetricStats{
Hosts: hoststats,
}
}
rw.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(rw).Encode(response)
if err != nil {
log.Println(err.Error())
}
}
func handlePeak(rw http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
cluster := vars["cluster"]
store, ok := metricStores[vars["class"]]
if !ok {
http.Error(rw, "invalid class", http.StatusInternalServerError)
return
}
response := store.Peak(cluster + ":")
rw.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(rw).Encode(response)
err = json.NewEncoder(rw).Encode(res)
if err != nil {
log.Println(err.Error())
}
@ -187,9 +81,7 @@ func handlePeak(rw http.ResponseWriter, r *http.Request) {
func StartApiServer(address string, done chan bool) error {
r := mux.NewRouter()
r.HandleFunc("/api/{cluster}/{class:(?:node|socket|cpu)}/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries)
r.HandleFunc("/api/{cluster}/{class:(?:node|socket|cpu)}/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats)
r.HandleFunc("/api/{cluster}/{class:(?:node|socket|cpu)}/peak", handlePeak)
r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries)
server := &http.Server{
Handler: r,

264
archive.go Normal file
View File

@ -0,0 +1,264 @@
package main
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io/fs"
"os"
"path"
"sort"
"strconv"
"strings"
)
type ArchiveMetrics struct {
Frequency int64 `json:"frequency"`
Start int64 `json:"start"`
Data []Float `json:"data"`
}
type ArchiveFile struct {
From int64 `json:"from"`
Metrics map[string]*ArchiveMetrics `json:"metrics"`
Children map[string]*ArchiveFile `json:"children"`
}
// Metrics stored at the lowest 2 levels are not stored away (root and cluster)!
// On a per-host basis a new JSON file is created. I have no idea if this will scale.
// The good thing: Only a host at a time is locked, so this function can run
// in parallel to writes/reads.
func (m *MemoryStore) ToArchive(archiveRoot string, from, to int64) (int, error) {
levels := make([]*level, 0)
selectors := make([][]string, 0)
m.root.lock.Lock()
for sel1, l1 := range m.root.children {
l1.lock.Lock()
for sel2, l2 := range l1.children {
levels = append(levels, l2)
selectors = append(selectors, []string{sel1, sel2})
}
l1.lock.Unlock()
}
m.root.lock.Unlock()
for i := 0; i < len(levels); i++ {
dir := path.Join(archiveRoot, path.Join(selectors[i]...))
err := levels[i].toArchive(dir, from, to)
if err != nil {
return i, err
}
}
return len(levels), nil
}
func (l *level) toArchiveFile(from, to int64) (*ArchiveFile, error) {
l.lock.Lock()
defer l.lock.Unlock()
retval := &ArchiveFile{
From: from,
Metrics: make(map[string]*ArchiveMetrics),
Children: make(map[string]*ArchiveFile),
}
for metric, b := range l.metrics {
data, start, _, err := b.read(from, to)
if err != nil {
return nil, err
}
retval.Metrics[metric] = &ArchiveMetrics{
Frequency: b.frequency,
Start: start,
Data: data,
}
}
for name, child := range l.children {
val, err := child.toArchiveFile(from, to)
if err != nil {
return nil, err
}
retval.Children[name] = val
}
return retval, nil
}
func (l *level) toArchive(dir string, from, to int64) error {
af, err := l.toArchiveFile(from, to)
if err != nil {
return err
}
filepath := path.Join(dir, fmt.Sprintf("%d.json", from))
f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil && os.IsNotExist(err) {
err = os.MkdirAll(dir, 0755)
if err == nil {
f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0644)
}
}
if err != nil {
return err
}
defer f.Close()
bw := bufio.NewWriter(f)
err = json.NewEncoder(bw).Encode(af)
if err != nil {
return err
}
return bw.Flush()
}
// Metrics stored at the lowest 2 levels are not loaded (root and cluster)!
// This function can only be called once and before the very first write or read.
// Unlike ToArchive, this function is NOT thread-safe.
func (m *MemoryStore) FromArchive(archiveRoot string, from int64) (int, error) {
return m.root.fromArchive(archiveRoot, from)
}
func (l *level) loadFile(af *ArchiveFile) error {
for name, metric := range af.Metrics {
n := len(metric.Data)
b := &buffer{
frequency: metric.Frequency,
start: metric.Start,
data: metric.Data[0:n:n], // Space is wasted here :(
prev: nil,
next: nil,
}
prev, ok := l.metrics[name]
if !ok {
l.metrics[name] = b
} else {
if prev.start > b.start {
return errors.New("wooops")
}
b.prev = prev
prev.next = b
}
l.metrics[name] = b
}
for sel, childAf := range af.Children {
child, ok := l.children[sel]
if !ok {
child = &level{
metrics: make(map[string]*buffer),
children: make(map[string]*level),
}
l.children[sel] = child
}
err := child.loadFile(childAf)
if err != nil {
return err
}
}
return nil
}
func (l *level) fromArchive(dir string, from int64) (int, error) {
direntries, err := os.ReadDir(dir)
if err != nil {
return 0, err
}
jsonFiles := make([]fs.DirEntry, 0)
filesLoaded := 0
for _, e := range direntries {
if e.IsDir() {
child := &level{
metrics: make(map[string]*buffer),
children: make(map[string]*level),
}
files, err := child.fromArchive(path.Join(dir, e.Name()), from)
filesLoaded += files
if err != nil {
return filesLoaded, err
}
l.children[e.Name()] = child
} else if strings.HasSuffix(e.Name(), ".json") {
jsonFiles = append(jsonFiles, e)
} else {
return filesLoaded, errors.New("unexpected file in archive")
}
}
files, err := findFiles(jsonFiles, from)
if err != nil {
return filesLoaded, err
}
for _, filename := range files {
f, err := os.Open(path.Join(dir, filename))
if err != nil {
return filesLoaded, err
}
af := &ArchiveFile{}
err = json.NewDecoder(bufio.NewReader(f)).Decode(af)
if err != nil {
return filesLoaded, err
}
err = l.loadFile(af)
if err != nil {
return filesLoaded, err
}
filesLoaded += 1
}
return filesLoaded, nil
}
// This will probably get very slow over time!
// A solution could be some sort of an index file in which all other files
// and the timespan they contain is listed.
func findFiles(direntries []fs.DirEntry, from int64) ([]string, error) {
nums := map[string]int64{}
for _, e := range direntries {
ts, err := strconv.ParseInt(strings.TrimSuffix(e.Name(), ".json"), 10, 64)
if err != nil {
return nil, err
}
nums[e.Name()] = ts
}
sort.Slice(direntries, func(i, j int) bool {
a, b := direntries[i], direntries[j]
return nums[a.Name()] < nums[b.Name()]
})
filenames := make([]string, 0)
for i := 0; i < len(direntries); i++ {
e := direntries[i]
ts1 := nums[e.Name()]
if from <= ts1 || i == len(direntries)-1 {
filenames = append(filenames, e.Name())
continue
}
enext := direntries[i+1]
ts2 := nums[enext.Name()]
if ts1 < from && from < ts2 {
filenames = append(filenames, e.Name())
}
}
return filenames, nil
}

View File

@ -1,16 +1,20 @@
{
"metrics": {
"node": {
"frequency": 3,
"metrics": ["load_five", "load_fifteen", "proc_total", "proc_run", "load_one"]
"load_one": { "frequency": 3, "aggregation": null, "scope": "node" },
"load_five": { "frequency": 3, "aggregation": null, "scope": "node" },
"load_fifteen": { "frequency": 3, "aggregation": null, "scope": "node" },
"proc_run": { "frequency": 3, "aggregation": null, "scope": "node" },
"proc_total": { "frequency": 3, "aggregation": null, "scope": "node" },
"power": { "frequency": 3, "aggregation": "sum", "scope": "socket" },
"mem_bw": { "frequency": 3, "aggregation": "sum", "scope": "socket" },
"flops_sp": { "frequency": 3, "aggregation": "sum", "scope": "cpu" },
"flops_dp": { "frequency": 3, "aggregation": "sum", "scope": "cpu" },
"flops_any": { "frequency": 3, "aggregation": "sum", "scope": "cpu" },
"clock": { "frequency": 3, "aggregation": "avg", "scope": "cpu" },
"cpi": { "frequency": 3, "aggregation": "avg", "scope": "cpu" }
},
"socket": {
"frequency": 3,
"metrics": ["power", "mem_bw"]
},
"cpu": {
"frequency": 3,
"metrics": ["flops_sp", "flops_dp", "flops_any", "clock", "cpi"]
}
}
"restore-last-hours": 20,
"checkpoint-interval-hours": 600,
"archive-root": "./archive",
"nats": "nats://localhost:4222"
}

View File

@ -22,6 +22,8 @@ var bufferPool sync.Pool = sync.Pool{
// Each metric on each level has it's own buffer.
// This is where the actual values go.
// If `cap(data)` is reached, a new buffer is created and
// becomes the new head of a buffer list.
type buffer struct {
frequency int64 // Time between two "slots"
start int64 // Timestamp of when `data[0]` was written.
@ -41,6 +43,8 @@ func newBuffer(ts, freq int64) *buffer {
// If a new buffer was created, the new head is returnd.
// Otherwise, the existing buffer is returnd.
// Normaly, only "newer" data should be written, but if the value would
// end up in the same buffer anyways it is allowed.
func (b *buffer) write(ts int64, value Float) (*buffer, error) {
if ts < b.start {
return nil, errors.New("cannot write value to buffer from past")
@ -74,6 +78,8 @@ func (b *buffer) write(ts int64, value Float) (*buffer, error) {
// represented by NaN. If values at the start or end are missing,
// instead of NaN values, the second and thrid return values contain
// the actual `from`/`to`.
// This function goes back the buffer chain if `from` is older than the
// currents buffer start.
func (b *buffer) read(from, to int64) ([]Float, int64, int64, error) {
if from < b.start {
if b.prev != nil {
@ -107,7 +113,7 @@ func (b *buffer) read(from, to int64) ([]Float, int64, int64, error) {
// Could also be called "node" as this forms a node in a tree structure.
// Called level because "node" might be confusing here.
// Can be both a leaf or a inner node. In this structue, inner nodes can
// Can be both a leaf or a inner node. In this tree structue, inner nodes can
// also hold data (in `metrics`).
type level struct {
lock sync.Mutex // There is performance to be gained by having different locks for `metrics` and `children` (Spinlock?).
@ -144,8 +150,10 @@ func (l *level) findLevelOrCreate(selector []string) *level {
// a lot of short-lived allocations and copies if this is
// not the "native" level for the requested metric. There
// is a lot of optimization potential here!
// If this level does not have data for the requested metric, the data
// is aggregated timestep-wise from all the children (recursively).
// Optimization suggestion: Pass a buffer as argument onto which the values should be added.
func (l *level) read(metric string, from, to int64, accumulation string) ([]Float, int64, int64, error) {
func (l *level) read(metric string, from, to int64, aggregation string) ([]Float, int64, int64, error) {
if b, ok := l.metrics[metric]; ok {
// Whoo, this is the "native" level of this metric:
return b.read(from, to)
@ -158,7 +166,7 @@ func (l *level) read(metric string, from, to int64, accumulation string) ([]Floa
if len(l.children) == 1 {
for _, child := range l.children {
child.lock.Lock()
data, from, to, err := child.read(metric, from, to, accumulation)
data, from, to, err := child.read(metric, from, to, aggregation)
child.lock.Unlock()
return data, from, to, err
}
@ -168,7 +176,7 @@ func (l *level) read(metric string, from, to int64, accumulation string) ([]Floa
var data []Float = nil
for _, child := range l.children {
child.lock.Lock()
cdata, cfrom, cto, err := child.read(metric, from, to, accumulation)
cdata, cfrom, cto, err := child.read(metric, from, to, aggregation)
child.lock.Unlock()
if err != nil {
@ -197,7 +205,7 @@ func (l *level) read(metric string, from, to int64, accumulation string) ([]Floa
}
}
switch accumulation {
switch aggregation {
case "sum":
return data, from, to, nil
case "avg":
@ -207,7 +215,7 @@ func (l *level) read(metric string, from, to int64, accumulation string) ([]Floa
}
return data, from, to, nil
default:
return nil, 0, 0, errors.New("invalid accumulation strategy: " + accumulation)
return nil, 0, 0, errors.New("invalid aggregation strategy: " + aggregation)
}
}

321
memstore_test.go Normal file
View File

@ -0,0 +1,321 @@
package main
import (
"fmt"
"math"
"sync"
"testing"
)
func TestMemoryStoreBasics(t *testing.T) {
frequency := int64(3)
count := int64(5000)
store := NewMemoryStore(map[string]MetricConfig{
"a": {Frequency: frequency},
"b": {Frequency: frequency * 2},
})
for i := int64(0); i < count; i++ {
err := store.Write([]string{"testhost"}, i*frequency, []Metric{
{Name: "a", Value: Float(i)},
{Name: "b", Value: Float(i) * 0.5},
})
if err != nil {
t.Error(err)
return
}
}
adata, from, to, err := store.Read([]string{"testhost"}, "a", 0, count*frequency)
if err != nil || from != 0 || to != count*frequency {
t.Error(err)
return
}
bdata, _, _, err := store.Read([]string{"testhost"}, "b", 0, count*frequency)
if err != nil {
t.Error(err)
return
}
if len(adata) != int(count) || len(bdata) != int(count/2) {
t.Error("unexpected count of returned values")
return
}
for i := 0; i < int(count); i++ {
if adata[i] != Float(i) {
t.Errorf("incorrect value for metric a (%f vs. %f)", adata[i], Float(i))
return
}
}
for i := 0; i < int(count/2); i++ {
expected := Float(i) + 0.5
if bdata[i] != expected {
t.Errorf("incorrect value for metric b (%f vs. %f)", bdata[i], expected)
return
}
}
}
func TestMemoryStoreMissingDatapoints(t *testing.T) {
count := 3000
store := NewMemoryStore(map[string]MetricConfig{
"a": {Frequency: 1},
})
for i := 0; i < count; i++ {
if i%3 != 0 {
continue
}
err := store.Write([]string{"testhost"}, int64(i), []Metric{
{Name: "a", Value: Float(i)},
})
if err != nil {
t.Error(err)
return
}
}
adata, _, _, err := store.Read([]string{"testhost"}, "a", 0, int64(count))
if err != nil {
t.Error(err)
return
}
if len(adata) != count {
t.Error("unexpected len")
return
}
for i := 0; i < count; i++ {
if i%3 == 0 {
if adata[i] != Float(i) {
t.Error("unexpected value")
return
}
} else {
if !math.IsNaN(float64(adata[i])) {
t.Errorf("NaN expected (i = %d, value = %f)\n", i, adata[i])
return
}
}
}
}
func TestMemoryStoreAggregation(t *testing.T) {
count := 3000
store := NewMemoryStore(map[string]MetricConfig{
"a": {Frequency: 1, Aggregation: "sum"},
"b": {Frequency: 2, Aggregation: "avg"},
})
for i := 0; i < count; i++ {
err := store.Write([]string{"host0", "cpu0"}, int64(i), []Metric{
{Name: "a", Value: Float(i) / 2.},
{Name: "b", Value: Float(i) * 2.},
})
if err != nil {
t.Error(err)
return
}
err = store.Write([]string{"host0", "cpu1"}, int64(i), []Metric{
{Name: "a", Value: Float(i) * 2.},
{Name: "b", Value: Float(i) / 2.},
})
if err != nil {
t.Error(err)
return
}
}
adata, from, to, err := store.Read([]string{"host0"}, "a", int64(0), int64(count))
if err != nil {
t.Error(err)
return
}
if len(adata) != count || from != 0 || to != int64(count) {
t.Error("unexpected length or time range of returned data")
return
}
for i := 0; i < count; i++ {
expected := Float(i)/2. + Float(i)*2.
if adata[i] != expected {
t.Errorf("expected: %f, got: %f", expected, adata[i])
return
}
}
bdata, from, to, err := store.Read([]string{"host0"}, "b", int64(0), int64(count))
if err != nil {
t.Error(err)
return
}
if len(bdata) != count/2 || from != 0 || to != int64(count) {
t.Error("unexpected length or time range of returned data")
return
}
for i := 0; i < count/2; i++ {
j := (i * 2) + 1
expected := (Float(j)*2. + Float(j)*0.5) / 2.
if bdata[i] != expected {
t.Errorf("expected: %f, got: %f", expected, bdata[i])
return
}
}
}
func TestMemoryStoreArchive(t *testing.T) {
store1 := NewMemoryStore(map[string]MetricConfig{
"a": {Frequency: 1},
"b": {Frequency: 1},
})
count := 2000
for i := 0; i < count; i++ {
err := store1.Write([]string{"cluster", "host", "cpu0"}, 100+int64(i), []Metric{
{Name: "a", Value: Float(i)},
{Name: "b", Value: Float(i * 2)},
})
if err != nil {
t.Error(err)
return
}
}
archiveRoot := t.TempDir()
_, err := store1.ToArchive(archiveRoot, 100, 100+int64(count/2))
if err != nil {
t.Error(err)
return
}
_, err = store1.ToArchive(archiveRoot, 100+int64(count/2), 100+int64(count))
if err != nil {
t.Error(err)
return
}
store2 := NewMemoryStore(map[string]MetricConfig{
"a": {Frequency: 1},
"b": {Frequency: 1},
})
n, err := store2.FromArchive(archiveRoot, 100)
if err != nil {
t.Error(err)
return
}
adata, from, to, err := store2.Read([]string{"cluster", "host", "cpu0"}, "a", 100, int64(100+count))
if err != nil {
t.Error(err)
return
}
if n != 2 || len(adata) != count || from != 100 || to != int64(100+count) {
t.Errorf("unexpected: n=%d, len=%d, from=%d, to=%d\n", n, len(adata), from, to)
return
}
for i := 0; i < count; i++ {
expected := Float(i)
if adata[i] != expected {
t.Errorf("expected: %f, got: %f", expected, adata[i])
}
}
}
func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) {
frequency := int64(5)
count := b.N
goroutines := 4
store := NewMemoryStore(map[string]MetricConfig{
"a": {Frequency: frequency},
})
var wg sync.WaitGroup
wg.Add(goroutines)
for g := 0; g < goroutines; g++ {
go func(g int) {
host := fmt.Sprintf("host%d", g)
for i := 0; i < count; i++ {
store.Write([]string{"cluster", host, "cpu0"}, int64(i)*frequency, []Metric{
{Name: "a", Value: Float(i)},
})
}
wg.Done()
}(g)
}
wg.Wait()
b.StopTimer()
for g := 0; g < goroutines; g++ {
host := fmt.Sprintf("host%d", g)
adata, _, _, err := store.Read([]string{"cluster", host, "cpu0"}, "a", 0, int64(count)*frequency)
if err != nil {
b.Error(err)
return
}
if len(adata) != count {
b.Error("unexpected count")
return
}
for i := 0; i < count; i++ {
expected := Float(i)
if adata[i] != expected {
b.Error("incorrect value for metric a")
return
}
}
}
}
func BenchmarkMemoryStoreAggregation(b *testing.B) {
b.StopTimer()
count := 2000
store := NewMemoryStore(map[string]MetricConfig{
"flops_any": {Frequency: 1, Aggregation: "avg"},
})
sel := []string{"testcluster", "host123", "cpu0"}
for i := 0; i < count; i++ {
sel[2] = "cpu0"
err := store.Write(sel, int64(i), []Metric{
{Name: "flops_any", Value: Float(i)},
})
if err != nil {
b.Fatal(err)
}
sel[2] = "cpu1"
err = store.Write(sel, int64(i), []Metric{
{Name: "flops_any", Value: Float(i)},
})
if err != nil {
b.Fatal(err)
}
}
b.StartTimer()
for n := 0; n < b.N; n++ {
data, from, to, err := store.Read(sel[0:2], "flops_any", 0, int64(count))
if err != nil {
b.Fatal(err)
}
if len(data) != count || from != 0 || to != int64(count) {
b.Fatal()
}
}
}

View File

@ -2,34 +2,33 @@ package main
import (
"encoding/json"
"errors"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"github.com/ClusterCockpit/cc-metric-store/lineprotocol"
"time"
)
type MetricStore interface {
AddMetrics(key string, ts int64, metrics []lineprotocol.Metric) error
GetMetric(key string, metric string, from int64, to int64) ([]lineprotocol.Float, int64, error)
Reduce(key, metric string, from, to int64, f func(t int64, sum, x lineprotocol.Float) lineprotocol.Float, initialX lineprotocol.Float) (lineprotocol.Float, error)
Peak(prefix string) map[string]map[string]lineprotocol.Float
type MetricConfig struct {
Frequency int64 `json:"frequency"`
Aggregation string `json:"aggregation"`
Scope string `json:"scope"`
}
type Config struct {
MetricClasses map[string]struct {
Frequency int `json:"frequency"`
Metrics []string `json:"metrics"`
} `json:"metrics"`
Metrics map[string]MetricConfig `json:"metrics"`
RestoreLastHours int `json:"restore-last-hours"`
CheckpointIntervalHours int `json:"checkpoint-interval-hours"`
ArchiveRoot string `json:"archive-root"`
Nats string `json:"nats"`
}
var conf Config
const KEY_SEPERATOR string = "."
var metricStores map[string]MetricStore = map[string]MetricStore{}
var conf Config
var memoryStore *MemoryStore = nil
func loadConfiguration(file string) Config {
var config Config
@ -43,82 +42,112 @@ func loadConfiguration(file string) Config {
return config
}
// TODO: Change MetricStore API so that we do not have to do string concat?
// Nested hashmaps could be an alternative.
func buildKey(line *lineprotocol.Line) (string, error) {
func handleLine(line *Line) {
cluster, ok := line.Tags["cluster"]
if !ok {
return "", errors.New("missing cluster tag")
log.Println("'cluster' tag missing")
return
}
host, ok := line.Tags["host"]
if !ok {
return "", errors.New("missing host tag")
}
socket, ok := line.Tags["socket"]
if ok {
return cluster + ":" + host + ":s" + socket, nil
}
cpu, ok := line.Tags["cpu"]
if ok {
return cluster + ":" + host + ":c" + cpu, nil
}
return cluster + ":" + host, nil
}
func handleLine(line *lineprotocol.Line) {
store, ok := metricStores[line.Measurement]
if !ok {
log.Printf("unkown class: '%s'\n", line.Measurement)
log.Println("'host' tag missing")
return
}
key, err := buildKey(line)
if err != nil {
log.Println(err)
return
selector := []string{cluster, host}
if id, ok := line.Tags[line.Measurement]; ok {
selector = append(selector, line.Measurement, id)
}
// log.Printf("t=%d, key='%s', values=%v\n", line.Ts.Unix(), key, line.Fields)
log.Printf("new data: t=%d, key='%s'", line.Ts.Unix(), key)
err = store.AddMetrics(key, line.Ts.Unix(), line.Fields)
ts := line.Ts.Unix()
log.Printf("ts=%d, tags=%v\n", ts, selector)
err := memoryStore.Write(selector, ts, line.Fields)
if err != nil {
log.Println(err)
log.Printf("error: %s\n", err.Error())
}
}
func main() {
startupTime := time.Now()
conf = loadConfiguration("config.json")
for class, info := range conf.MetricClasses {
metricStores[class] = newMemoryStore(info.Metrics, 1000, info.Frequency)
memoryStore = NewMemoryStore(conf.Metrics)
if conf.ArchiveRoot != "" && conf.RestoreLastHours > 0 {
d := time.Duration(conf.RestoreLastHours) * time.Hour
from := startupTime.Add(-d).Unix()
log.Printf("Restoring data since %d from '%s'...\n", from, conf.ArchiveRoot)
files, err := memoryStore.FromArchive(conf.ArchiveRoot, from)
if err != nil {
log.Printf("Loading archive failed: %s\n", err.Error())
} else {
log.Printf("Archive loaded (%d files)\n", files)
}
}
var wg sync.WaitGroup
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
_ = <-sigs
log.Println("Shuting down...")
done <- true
close(done)
log.Println("shuting down")
}()
var wg sync.WaitGroup
wg.Add(1)
lastCheckpoint := startupTime
if conf.ArchiveRoot != "" && conf.CheckpointIntervalHours > 0 {
wg.Add(3)
go func() {
d := time.Duration(conf.CheckpointIntervalHours) * time.Hour
ticks := time.Tick(d)
for {
select {
case _, _ = <-done:
wg.Done()
return
case <-ticks:
log.Println("Start making checkpoint...")
_, err := memoryStore.ToArchive(conf.ArchiveRoot, lastCheckpoint.Unix(), time.Now().Unix())
if err != nil {
log.Printf("Making checkpoint failed: %s\n", err.Error())
} else {
log.Println("Checkpoint successfull!")
}
lastCheckpoint = time.Now()
}
}
}()
} else {
wg.Add(2)
}
go func() {
StartApiServer(":8080", done)
wg.Done()
}()
err := lineprotocol.ReceiveNats("nats://localhost:4222", handleLine, done)
err := StartApiServer(":8080", done)
if err != nil {
log.Fatal(err)
}
wg.Done()
}()
go func() {
err := ReceiveNats(conf.Nats, handleLine, done)
if err != nil {
log.Fatal(err)
}
wg.Done()
}()
wg.Wait()
if conf.ArchiveRoot != "" {
log.Printf("Writing to '%s'...\n", conf.ArchiveRoot)
files, err := memoryStore.ToArchive(conf.ArchiveRoot, lastCheckpoint.Unix(), time.Now().Unix())
if err != nil {
log.Printf("Writing to archive failed: %s\n", err.Error())
}
log.Printf("Done! (%d files written)\n", files)
}
}