diff --git a/api.go b/api.go index 20f93eb..5da75d9 100644 --- a/api.go +++ b/api.go @@ -4,44 +4,30 @@ import ( "context" "encoding/json" "log" - "math" "net/http" "strconv" "time" - "github.com/ClusterCockpit/cc-metric-store/lineprotocol" "github.com/gorilla/mux" ) -type HostData struct { - Host string `json:"host"` - Start int64 `json:"start"` - Data []lineprotocol.Float `json:"data"` +// Example: +// [ +// { "selector": ["emmy", "host123"], "metrics": ["load_one"] } +// ] +type ApiRequestBody []struct { + Selector []string `json:"selector"` + Metrics []string `json:"metrics"` } -type MetricData struct { - Hosts []HostData `json:"hosts"` +type ApiMetricData struct { + From int64 `json:"from"` + To int64 `json:"to"` + Data []Float `json:"data"` } -type TimeseriesResponse map[string]MetricData - -type HostStats struct { - Host string `json:"host"` - Sampels int `json:"sampels"` - Avg lineprotocol.Float `json:"avg"` - Min lineprotocol.Float `json:"min"` - Max lineprotocol.Float `json:"max"` -} - -type MetricStats struct { - Hosts []HostStats `json:"hosts"` -} - -type StatsResponse map[string]MetricStats - func handleTimeseries(rw http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) - cluster := vars["cluster"] from, err := strconv.ParseInt(vars["from"], 10, 64) if err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) @@ -53,132 +39,40 @@ func handleTimeseries(rw http.ResponseWriter, r *http.Request) { return } - values := r.URL.Query() - hosts := values["host"] - metrics := values["metric"] - if len(hosts) < 1 || len(metrics) < 1 { - http.Error(rw, "no hosts or metrics specified", http.StatusBadRequest) + if r.Method != http.MethodPost { + http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed) return } - response := TimeseriesResponse{} - store, ok := metricStores[vars["class"]] - if !ok { - http.Error(rw, "invalid class", http.StatusInternalServerError) - return - } - - for _, metric := range metrics { - hostsdata := []HostData{} - for _, host := range hosts { - key := cluster + ":" + host - data, start, err := store.GetMetric(key, metric, from, to) - if err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - return - } - - hostsdata = append(hostsdata, HostData{ - Host: host, - Start: start, - Data: data, - }) - } - response[metric] = MetricData{ - Hosts: hostsdata, - } - } - - rw.Header().Set("Content-Type", "application/json") - err = json.NewEncoder(rw).Encode(response) - if err != nil { - log.Println(err.Error()) - } -} - -func handleStats(rw http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - cluster := vars["cluster"] - from, err := strconv.ParseInt(vars["from"], 10, 64) - if err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - to, err := strconv.ParseInt(vars["to"], 10, 64) + bodyDec := json.NewDecoder(r.Body) + var reqBody ApiRequestBody + err = bodyDec.Decode(&reqBody) if err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) return } - values := r.URL.Query() - hosts := values["host"] - metrics := values["metric"] - if len(hosts) < 1 || len(metrics) < 1 { - http.Error(rw, "no hosts or metrics specified", http.StatusBadRequest) - return - } - - response := StatsResponse{} - store, ok := metricStores[vars["class"]] - if !ok { - http.Error(rw, "invalid class", http.StatusInternalServerError) - return - } - - for _, metric := range metrics { - hoststats := []HostStats{} - for _, host := range hosts { - key := cluster + ":" + host - min, max := math.MaxFloat64, -math.MaxFloat64 - samples := 0 - - sum, err := store.Reduce(key, metric, from, to, func(t int64, sum, x lineprotocol.Float) lineprotocol.Float { - if math.IsNaN(float64(x)) { - return sum - } - - samples += 1 - min = math.Min(min, float64(x)) - max = math.Max(max, float64(x)) - return sum + x - }, 0.) + res := make([]map[string]ApiMetricData, 0, len(reqBody)) + for _, req := range reqBody { + metrics := make(map[string]ApiMetricData) + for _, metric := range req.Metrics { + data, f, t, err := memoryStore.Read(req.Selector, metric, from, to) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return } - hoststats = append(hoststats, HostStats{ - Host: host, - Sampels: samples, - Avg: sum / lineprotocol.Float(samples), - Min: lineprotocol.Float(min), - Max: lineprotocol.Float(max), - }) - } - response[metric] = MetricStats{ - Hosts: hoststats, + metrics[metric] = ApiMetricData{ + From: f, + To: t, + Data: data, + } } + res = append(res, metrics) } rw.Header().Set("Content-Type", "application/json") - err = json.NewEncoder(rw).Encode(response) - if err != nil { - log.Println(err.Error()) - } -} - -func handlePeak(rw http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - cluster := vars["cluster"] - store, ok := metricStores[vars["class"]] - if !ok { - http.Error(rw, "invalid class", http.StatusInternalServerError) - return - } - - response := store.Peak(cluster + ":") - rw.Header().Set("Content-Type", "application/json") - err := json.NewEncoder(rw).Encode(response) + err = json.NewEncoder(rw).Encode(res) if err != nil { log.Println(err.Error()) } @@ -187,9 +81,7 @@ func handlePeak(rw http.ResponseWriter, r *http.Request) { func StartApiServer(address string, done chan bool) error { r := mux.NewRouter() - r.HandleFunc("/api/{cluster}/{class:(?:node|socket|cpu)}/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries) - r.HandleFunc("/api/{cluster}/{class:(?:node|socket|cpu)}/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats) - r.HandleFunc("/api/{cluster}/{class:(?:node|socket|cpu)}/peak", handlePeak) + r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries) server := &http.Server{ Handler: r, diff --git a/archive.go b/archive.go new file mode 100644 index 0000000..2b67a0c --- /dev/null +++ b/archive.go @@ -0,0 +1,264 @@ +package main + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io/fs" + "os" + "path" + "sort" + "strconv" + "strings" +) + +type ArchiveMetrics struct { + Frequency int64 `json:"frequency"` + Start int64 `json:"start"` + Data []Float `json:"data"` +} + +type ArchiveFile struct { + From int64 `json:"from"` + Metrics map[string]*ArchiveMetrics `json:"metrics"` + Children map[string]*ArchiveFile `json:"children"` +} + +// Metrics stored at the lowest 2 levels are not stored away (root and cluster)! +// On a per-host basis a new JSON file is created. I have no idea if this will scale. +// The good thing: Only a host at a time is locked, so this function can run +// in parallel to writes/reads. +func (m *MemoryStore) ToArchive(archiveRoot string, from, to int64) (int, error) { + levels := make([]*level, 0) + selectors := make([][]string, 0) + m.root.lock.Lock() + for sel1, l1 := range m.root.children { + l1.lock.Lock() + for sel2, l2 := range l1.children { + levels = append(levels, l2) + selectors = append(selectors, []string{sel1, sel2}) + } + l1.lock.Unlock() + } + m.root.lock.Unlock() + + for i := 0; i < len(levels); i++ { + dir := path.Join(archiveRoot, path.Join(selectors[i]...)) + err := levels[i].toArchive(dir, from, to) + if err != nil { + return i, err + } + } + + return len(levels), nil +} + +func (l *level) toArchiveFile(from, to int64) (*ArchiveFile, error) { + l.lock.Lock() + defer l.lock.Unlock() + + retval := &ArchiveFile{ + From: from, + Metrics: make(map[string]*ArchiveMetrics), + Children: make(map[string]*ArchiveFile), + } + + for metric, b := range l.metrics { + data, start, _, err := b.read(from, to) + if err != nil { + return nil, err + } + + retval.Metrics[metric] = &ArchiveMetrics{ + Frequency: b.frequency, + Start: start, + Data: data, + } + } + + for name, child := range l.children { + val, err := child.toArchiveFile(from, to) + if err != nil { + return nil, err + } + + retval.Children[name] = val + } + + return retval, nil +} + +func (l *level) toArchive(dir string, from, to int64) error { + af, err := l.toArchiveFile(from, to) + if err != nil { + return err + } + + filepath := path.Join(dir, fmt.Sprintf("%d.json", from)) + f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil && os.IsNotExist(err) { + err = os.MkdirAll(dir, 0755) + if err == nil { + f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0644) + } + } + if err != nil { + return err + } + defer f.Close() + bw := bufio.NewWriter(f) + + err = json.NewEncoder(bw).Encode(af) + if err != nil { + return err + } + + return bw.Flush() +} + +// Metrics stored at the lowest 2 levels are not loaded (root and cluster)! +// This function can only be called once and before the very first write or read. +// Unlike ToArchive, this function is NOT thread-safe. +func (m *MemoryStore) FromArchive(archiveRoot string, from int64) (int, error) { + return m.root.fromArchive(archiveRoot, from) +} + +func (l *level) loadFile(af *ArchiveFile) error { + for name, metric := range af.Metrics { + n := len(metric.Data) + b := &buffer{ + frequency: metric.Frequency, + start: metric.Start, + data: metric.Data[0:n:n], // Space is wasted here :( + prev: nil, + next: nil, + } + + prev, ok := l.metrics[name] + if !ok { + l.metrics[name] = b + } else { + if prev.start > b.start { + return errors.New("wooops") + } + + b.prev = prev + prev.next = b + } + l.metrics[name] = b + } + + for sel, childAf := range af.Children { + child, ok := l.children[sel] + if !ok { + child = &level{ + metrics: make(map[string]*buffer), + children: make(map[string]*level), + } + l.children[sel] = child + } + + err := child.loadFile(childAf) + if err != nil { + return err + } + } + + return nil +} + +func (l *level) fromArchive(dir string, from int64) (int, error) { + direntries, err := os.ReadDir(dir) + if err != nil { + return 0, err + } + + jsonFiles := make([]fs.DirEntry, 0) + filesLoaded := 0 + for _, e := range direntries { + if e.IsDir() { + child := &level{ + metrics: make(map[string]*buffer), + children: make(map[string]*level), + } + + files, err := child.fromArchive(path.Join(dir, e.Name()), from) + filesLoaded += files + if err != nil { + return filesLoaded, err + } + + l.children[e.Name()] = child + } else if strings.HasSuffix(e.Name(), ".json") { + jsonFiles = append(jsonFiles, e) + } else { + return filesLoaded, errors.New("unexpected file in archive") + } + } + + files, err := findFiles(jsonFiles, from) + if err != nil { + return filesLoaded, err + } + + for _, filename := range files { + f, err := os.Open(path.Join(dir, filename)) + if err != nil { + return filesLoaded, err + } + + af := &ArchiveFile{} + err = json.NewDecoder(bufio.NewReader(f)).Decode(af) + if err != nil { + return filesLoaded, err + } + + err = l.loadFile(af) + if err != nil { + return filesLoaded, err + } + + filesLoaded += 1 + } + + return filesLoaded, nil +} + +// This will probably get very slow over time! +// A solution could be some sort of an index file in which all other files +// and the timespan they contain is listed. +func findFiles(direntries []fs.DirEntry, from int64) ([]string, error) { + nums := map[string]int64{} + for _, e := range direntries { + ts, err := strconv.ParseInt(strings.TrimSuffix(e.Name(), ".json"), 10, 64) + if err != nil { + return nil, err + } + nums[e.Name()] = ts + } + + sort.Slice(direntries, func(i, j int) bool { + a, b := direntries[i], direntries[j] + return nums[a.Name()] < nums[b.Name()] + }) + + filenames := make([]string, 0) + for i := 0; i < len(direntries); i++ { + e := direntries[i] + ts1 := nums[e.Name()] + + if from <= ts1 || i == len(direntries)-1 { + filenames = append(filenames, e.Name()) + continue + } + + enext := direntries[i+1] + ts2 := nums[enext.Name()] + if ts1 < from && from < ts2 { + filenames = append(filenames, e.Name()) + } + } + + return filenames, nil +} diff --git a/config.json b/config.json index 644ec7c..9a5879e 100644 --- a/config.json +++ b/config.json @@ -1,16 +1,20 @@ { "metrics": { - "node": { - "frequency": 3, - "metrics": ["load_five", "load_fifteen", "proc_total", "proc_run", "load_one"] - }, - "socket": { - "frequency": 3, - "metrics": ["power", "mem_bw"] - }, - "cpu": { - "frequency": 3, - "metrics": ["flops_sp", "flops_dp", "flops_any", "clock", "cpi"] - } - } + "load_one": { "frequency": 3, "aggregation": null, "scope": "node" }, + "load_five": { "frequency": 3, "aggregation": null, "scope": "node" }, + "load_fifteen": { "frequency": 3, "aggregation": null, "scope": "node" }, + "proc_run": { "frequency": 3, "aggregation": null, "scope": "node" }, + "proc_total": { "frequency": 3, "aggregation": null, "scope": "node" }, + "power": { "frequency": 3, "aggregation": "sum", "scope": "socket" }, + "mem_bw": { "frequency": 3, "aggregation": "sum", "scope": "socket" }, + "flops_sp": { "frequency": 3, "aggregation": "sum", "scope": "cpu" }, + "flops_dp": { "frequency": 3, "aggregation": "sum", "scope": "cpu" }, + "flops_any": { "frequency": 3, "aggregation": "sum", "scope": "cpu" }, + "clock": { "frequency": 3, "aggregation": "avg", "scope": "cpu" }, + "cpi": { "frequency": 3, "aggregation": "avg", "scope": "cpu" } + }, + "restore-last-hours": 20, + "checkpoint-interval-hours": 600, + "archive-root": "./archive", + "nats": "nats://localhost:4222" } diff --git a/memstore.go b/memstore.go index 7ce88eb..9e37ea7 100644 --- a/memstore.go +++ b/memstore.go @@ -22,6 +22,8 @@ var bufferPool sync.Pool = sync.Pool{ // Each metric on each level has it's own buffer. // This is where the actual values go. +// If `cap(data)` is reached, a new buffer is created and +// becomes the new head of a buffer list. type buffer struct { frequency int64 // Time between two "slots" start int64 // Timestamp of when `data[0]` was written. @@ -41,6 +43,8 @@ func newBuffer(ts, freq int64) *buffer { // If a new buffer was created, the new head is returnd. // Otherwise, the existing buffer is returnd. +// Normaly, only "newer" data should be written, but if the value would +// end up in the same buffer anyways it is allowed. func (b *buffer) write(ts int64, value Float) (*buffer, error) { if ts < b.start { return nil, errors.New("cannot write value to buffer from past") @@ -74,6 +78,8 @@ func (b *buffer) write(ts int64, value Float) (*buffer, error) { // represented by NaN. If values at the start or end are missing, // instead of NaN values, the second and thrid return values contain // the actual `from`/`to`. +// This function goes back the buffer chain if `from` is older than the +// currents buffer start. func (b *buffer) read(from, to int64) ([]Float, int64, int64, error) { if from < b.start { if b.prev != nil { @@ -107,7 +113,7 @@ func (b *buffer) read(from, to int64) ([]Float, int64, int64, error) { // Could also be called "node" as this forms a node in a tree structure. // Called level because "node" might be confusing here. -// Can be both a leaf or a inner node. In this structue, inner nodes can +// Can be both a leaf or a inner node. In this tree structue, inner nodes can // also hold data (in `metrics`). type level struct { lock sync.Mutex // There is performance to be gained by having different locks for `metrics` and `children` (Spinlock?). @@ -144,8 +150,10 @@ func (l *level) findLevelOrCreate(selector []string) *level { // a lot of short-lived allocations and copies if this is // not the "native" level for the requested metric. There // is a lot of optimization potential here! +// If this level does not have data for the requested metric, the data +// is aggregated timestep-wise from all the children (recursively). // Optimization suggestion: Pass a buffer as argument onto which the values should be added. -func (l *level) read(metric string, from, to int64, accumulation string) ([]Float, int64, int64, error) { +func (l *level) read(metric string, from, to int64, aggregation string) ([]Float, int64, int64, error) { if b, ok := l.metrics[metric]; ok { // Whoo, this is the "native" level of this metric: return b.read(from, to) @@ -158,7 +166,7 @@ func (l *level) read(metric string, from, to int64, accumulation string) ([]Floa if len(l.children) == 1 { for _, child := range l.children { child.lock.Lock() - data, from, to, err := child.read(metric, from, to, accumulation) + data, from, to, err := child.read(metric, from, to, aggregation) child.lock.Unlock() return data, from, to, err } @@ -168,7 +176,7 @@ func (l *level) read(metric string, from, to int64, accumulation string) ([]Floa var data []Float = nil for _, child := range l.children { child.lock.Lock() - cdata, cfrom, cto, err := child.read(metric, from, to, accumulation) + cdata, cfrom, cto, err := child.read(metric, from, to, aggregation) child.lock.Unlock() if err != nil { @@ -197,7 +205,7 @@ func (l *level) read(metric string, from, to int64, accumulation string) ([]Floa } } - switch accumulation { + switch aggregation { case "sum": return data, from, to, nil case "avg": @@ -207,7 +215,7 @@ func (l *level) read(metric string, from, to int64, accumulation string) ([]Floa } return data, from, to, nil default: - return nil, 0, 0, errors.New("invalid accumulation strategy: " + accumulation) + return nil, 0, 0, errors.New("invalid aggregation strategy: " + aggregation) } } diff --git a/memstore_test.go b/memstore_test.go new file mode 100644 index 0000000..4326651 --- /dev/null +++ b/memstore_test.go @@ -0,0 +1,321 @@ +package main + +import ( + "fmt" + "math" + "sync" + "testing" +) + +func TestMemoryStoreBasics(t *testing.T) { + frequency := int64(3) + count := int64(5000) + store := NewMemoryStore(map[string]MetricConfig{ + "a": {Frequency: frequency}, + "b": {Frequency: frequency * 2}, + }) + + for i := int64(0); i < count; i++ { + err := store.Write([]string{"testhost"}, i*frequency, []Metric{ + {Name: "a", Value: Float(i)}, + {Name: "b", Value: Float(i) * 0.5}, + }) + if err != nil { + t.Error(err) + return + } + } + + adata, from, to, err := store.Read([]string{"testhost"}, "a", 0, count*frequency) + if err != nil || from != 0 || to != count*frequency { + t.Error(err) + return + } + bdata, _, _, err := store.Read([]string{"testhost"}, "b", 0, count*frequency) + if err != nil { + t.Error(err) + return + } + + if len(adata) != int(count) || len(bdata) != int(count/2) { + t.Error("unexpected count of returned values") + return + } + + for i := 0; i < int(count); i++ { + if adata[i] != Float(i) { + t.Errorf("incorrect value for metric a (%f vs. %f)", adata[i], Float(i)) + return + } + } + + for i := 0; i < int(count/2); i++ { + expected := Float(i) + 0.5 + if bdata[i] != expected { + t.Errorf("incorrect value for metric b (%f vs. %f)", bdata[i], expected) + return + } + } + +} + +func TestMemoryStoreMissingDatapoints(t *testing.T) { + count := 3000 + store := NewMemoryStore(map[string]MetricConfig{ + "a": {Frequency: 1}, + }) + + for i := 0; i < count; i++ { + if i%3 != 0 { + continue + } + + err := store.Write([]string{"testhost"}, int64(i), []Metric{ + {Name: "a", Value: Float(i)}, + }) + if err != nil { + t.Error(err) + return + } + } + + adata, _, _, err := store.Read([]string{"testhost"}, "a", 0, int64(count)) + if err != nil { + t.Error(err) + return + } + + if len(adata) != count { + t.Error("unexpected len") + return + } + + for i := 0; i < count; i++ { + if i%3 == 0 { + if adata[i] != Float(i) { + t.Error("unexpected value") + return + } + } else { + if !math.IsNaN(float64(adata[i])) { + t.Errorf("NaN expected (i = %d, value = %f)\n", i, adata[i]) + return + } + } + } +} + +func TestMemoryStoreAggregation(t *testing.T) { + count := 3000 + store := NewMemoryStore(map[string]MetricConfig{ + "a": {Frequency: 1, Aggregation: "sum"}, + "b": {Frequency: 2, Aggregation: "avg"}, + }) + + for i := 0; i < count; i++ { + err := store.Write([]string{"host0", "cpu0"}, int64(i), []Metric{ + {Name: "a", Value: Float(i) / 2.}, + {Name: "b", Value: Float(i) * 2.}, + }) + if err != nil { + t.Error(err) + return + } + + err = store.Write([]string{"host0", "cpu1"}, int64(i), []Metric{ + {Name: "a", Value: Float(i) * 2.}, + {Name: "b", Value: Float(i) / 2.}, + }) + if err != nil { + t.Error(err) + return + } + } + + adata, from, to, err := store.Read([]string{"host0"}, "a", int64(0), int64(count)) + if err != nil { + t.Error(err) + return + } + + if len(adata) != count || from != 0 || to != int64(count) { + t.Error("unexpected length or time range of returned data") + return + } + + for i := 0; i < count; i++ { + expected := Float(i)/2. + Float(i)*2. + if adata[i] != expected { + t.Errorf("expected: %f, got: %f", expected, adata[i]) + return + } + } + + bdata, from, to, err := store.Read([]string{"host0"}, "b", int64(0), int64(count)) + if err != nil { + t.Error(err) + return + } + + if len(bdata) != count/2 || from != 0 || to != int64(count) { + t.Error("unexpected length or time range of returned data") + return + } + + for i := 0; i < count/2; i++ { + j := (i * 2) + 1 + expected := (Float(j)*2. + Float(j)*0.5) / 2. + if bdata[i] != expected { + t.Errorf("expected: %f, got: %f", expected, bdata[i]) + return + } + } +} + +func TestMemoryStoreArchive(t *testing.T) { + store1 := NewMemoryStore(map[string]MetricConfig{ + "a": {Frequency: 1}, + "b": {Frequency: 1}, + }) + + count := 2000 + for i := 0; i < count; i++ { + err := store1.Write([]string{"cluster", "host", "cpu0"}, 100+int64(i), []Metric{ + {Name: "a", Value: Float(i)}, + {Name: "b", Value: Float(i * 2)}, + }) + if err != nil { + t.Error(err) + return + } + } + + archiveRoot := t.TempDir() + _, err := store1.ToArchive(archiveRoot, 100, 100+int64(count/2)) + if err != nil { + t.Error(err) + return + } + + _, err = store1.ToArchive(archiveRoot, 100+int64(count/2), 100+int64(count)) + if err != nil { + t.Error(err) + return + } + + store2 := NewMemoryStore(map[string]MetricConfig{ + "a": {Frequency: 1}, + "b": {Frequency: 1}, + }) + n, err := store2.FromArchive(archiveRoot, 100) + if err != nil { + t.Error(err) + return + } + + adata, from, to, err := store2.Read([]string{"cluster", "host", "cpu0"}, "a", 100, int64(100+count)) + if err != nil { + t.Error(err) + return + } + + if n != 2 || len(adata) != count || from != 100 || to != int64(100+count) { + t.Errorf("unexpected: n=%d, len=%d, from=%d, to=%d\n", n, len(adata), from, to) + return + } + + for i := 0; i < count; i++ { + expected := Float(i) + if adata[i] != expected { + t.Errorf("expected: %f, got: %f", expected, adata[i]) + } + } +} + +func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) { + frequency := int64(5) + count := b.N + goroutines := 4 + store := NewMemoryStore(map[string]MetricConfig{ + "a": {Frequency: frequency}, + }) + + var wg sync.WaitGroup + wg.Add(goroutines) + + for g := 0; g < goroutines; g++ { + go func(g int) { + host := fmt.Sprintf("host%d", g) + for i := 0; i < count; i++ { + store.Write([]string{"cluster", host, "cpu0"}, int64(i)*frequency, []Metric{ + {Name: "a", Value: Float(i)}, + }) + } + wg.Done() + }(g) + } + + wg.Wait() + b.StopTimer() + + for g := 0; g < goroutines; g++ { + host := fmt.Sprintf("host%d", g) + adata, _, _, err := store.Read([]string{"cluster", host, "cpu0"}, "a", 0, int64(count)*frequency) + if err != nil { + b.Error(err) + return + } + + if len(adata) != count { + b.Error("unexpected count") + return + } + + for i := 0; i < count; i++ { + expected := Float(i) + if adata[i] != expected { + b.Error("incorrect value for metric a") + return + } + } + } +} + +func BenchmarkMemoryStoreAggregation(b *testing.B) { + b.StopTimer() + count := 2000 + store := NewMemoryStore(map[string]MetricConfig{ + "flops_any": {Frequency: 1, Aggregation: "avg"}, + }) + + sel := []string{"testcluster", "host123", "cpu0"} + for i := 0; i < count; i++ { + sel[2] = "cpu0" + err := store.Write(sel, int64(i), []Metric{ + {Name: "flops_any", Value: Float(i)}, + }) + if err != nil { + b.Fatal(err) + } + + sel[2] = "cpu1" + err = store.Write(sel, int64(i), []Metric{ + {Name: "flops_any", Value: Float(i)}, + }) + if err != nil { + b.Fatal(err) + } + } + + b.StartTimer() + for n := 0; n < b.N; n++ { + data, from, to, err := store.Read(sel[0:2], "flops_any", 0, int64(count)) + if err != nil { + b.Fatal(err) + } + + if len(data) != count || from != 0 || to != int64(count) { + b.Fatal() + } + } +} diff --git a/metric-store.go b/metric-store.go index 38a3564..a5f266b 100644 --- a/metric-store.go +++ b/metric-store.go @@ -2,34 +2,33 @@ package main import ( "encoding/json" - "errors" "fmt" "log" "os" "os/signal" "sync" "syscall" - - "github.com/ClusterCockpit/cc-metric-store/lineprotocol" + "time" ) -type MetricStore interface { - AddMetrics(key string, ts int64, metrics []lineprotocol.Metric) error - GetMetric(key string, metric string, from int64, to int64) ([]lineprotocol.Float, int64, error) - Reduce(key, metric string, from, to int64, f func(t int64, sum, x lineprotocol.Float) lineprotocol.Float, initialX lineprotocol.Float) (lineprotocol.Float, error) - Peak(prefix string) map[string]map[string]lineprotocol.Float +type MetricConfig struct { + Frequency int64 `json:"frequency"` + Aggregation string `json:"aggregation"` + Scope string `json:"scope"` } type Config struct { - MetricClasses map[string]struct { - Frequency int `json:"frequency"` - Metrics []string `json:"metrics"` - } `json:"metrics"` + Metrics map[string]MetricConfig `json:"metrics"` + RestoreLastHours int `json:"restore-last-hours"` + CheckpointIntervalHours int `json:"checkpoint-interval-hours"` + ArchiveRoot string `json:"archive-root"` + Nats string `json:"nats"` } -var conf Config +const KEY_SEPERATOR string = "." -var metricStores map[string]MetricStore = map[string]MetricStore{} +var conf Config +var memoryStore *MemoryStore = nil func loadConfiguration(file string) Config { var config Config @@ -43,82 +42,112 @@ func loadConfiguration(file string) Config { return config } -// TODO: Change MetricStore API so that we do not have to do string concat? -// Nested hashmaps could be an alternative. -func buildKey(line *lineprotocol.Line) (string, error) { +func handleLine(line *Line) { cluster, ok := line.Tags["cluster"] if !ok { - return "", errors.New("missing cluster tag") + log.Println("'cluster' tag missing") + return } host, ok := line.Tags["host"] if !ok { - return "", errors.New("missing host tag") - } - - socket, ok := line.Tags["socket"] - if ok { - return cluster + ":" + host + ":s" + socket, nil - } - - cpu, ok := line.Tags["cpu"] - if ok { - return cluster + ":" + host + ":c" + cpu, nil - } - - return cluster + ":" + host, nil -} - -func handleLine(line *lineprotocol.Line) { - store, ok := metricStores[line.Measurement] - if !ok { - log.Printf("unkown class: '%s'\n", line.Measurement) + log.Println("'host' tag missing") return } - key, err := buildKey(line) - if err != nil { - log.Println(err) - return + selector := []string{cluster, host} + if id, ok := line.Tags[line.Measurement]; ok { + selector = append(selector, line.Measurement, id) } - // log.Printf("t=%d, key='%s', values=%v\n", line.Ts.Unix(), key, line.Fields) - log.Printf("new data: t=%d, key='%s'", line.Ts.Unix(), key) - err = store.AddMetrics(key, line.Ts.Unix(), line.Fields) + ts := line.Ts.Unix() + log.Printf("ts=%d, tags=%v\n", ts, selector) + err := memoryStore.Write(selector, ts, line.Fields) if err != nil { - log.Println(err) + log.Printf("error: %s\n", err.Error()) } } func main() { + startupTime := time.Now() conf = loadConfiguration("config.json") - for class, info := range conf.MetricClasses { - metricStores[class] = newMemoryStore(info.Metrics, 1000, info.Frequency) + memoryStore = NewMemoryStore(conf.Metrics) + + if conf.ArchiveRoot != "" && conf.RestoreLastHours > 0 { + d := time.Duration(conf.RestoreLastHours) * time.Hour + from := startupTime.Add(-d).Unix() + log.Printf("Restoring data since %d from '%s'...\n", from, conf.ArchiveRoot) + files, err := memoryStore.FromArchive(conf.ArchiveRoot, from) + if err != nil { + log.Printf("Loading archive failed: %s\n", err.Error()) + } else { + log.Printf("Archive loaded (%d files)\n", files) + } } + var wg sync.WaitGroup sigs := make(chan os.Signal, 1) done := make(chan bool, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { _ = <-sigs + log.Println("Shuting down...") done <- true close(done) - log.Println("shuting down") }() - var wg sync.WaitGroup - wg.Add(1) + lastCheckpoint := startupTime + if conf.ArchiveRoot != "" && conf.CheckpointIntervalHours > 0 { + wg.Add(3) + go func() { + d := time.Duration(conf.CheckpointIntervalHours) * time.Hour + ticks := time.Tick(d) + for { + select { + case _, _ = <-done: + wg.Done() + return + case <-ticks: + log.Println("Start making checkpoint...") + _, err := memoryStore.ToArchive(conf.ArchiveRoot, lastCheckpoint.Unix(), time.Now().Unix()) + if err != nil { + log.Printf("Making checkpoint failed: %s\n", err.Error()) + } else { + log.Println("Checkpoint successfull!") + } + lastCheckpoint = time.Now() + } + } + }() + } else { + wg.Add(2) + } go func() { - StartApiServer(":8080", done) + err := StartApiServer(":8080", done) + if err != nil { + log.Fatal(err) + } wg.Done() }() - err := lineprotocol.ReceiveNats("nats://localhost:4222", handleLine, done) - if err != nil { - log.Fatal(err) - } + go func() { + err := ReceiveNats(conf.Nats, handleLine, done) + if err != nil { + log.Fatal(err) + } + wg.Done() + }() wg.Wait() + + if conf.ArchiveRoot != "" { + log.Printf("Writing to '%s'...\n", conf.ArchiveRoot) + files, err := memoryStore.ToArchive(conf.ArchiveRoot, lastCheckpoint.Unix(), time.Now().Unix()) + if err != nil { + log.Printf("Writing to archive failed: %s\n", err.Error()) + } + log.Printf("Done! (%d files written)\n", files) + } }