diff --git a/TODO.md b/TODO.md index 9b71ac6..a8f37a4 100644 --- a/TODO.md +++ b/TODO.md @@ -1,11 +1,12 @@ # TODO -- Delete this file and create more GitHub issues instead? +- Improve checkpoints/archives + - Store information in each buffer if already archived + - Do not create new checkpoint if all buffers already archived - Missing Testcases: - - Port at least all blackbox tests from the "old" `MemoryStore` to the new implementation + - General tests - Check for corner cases that should fail gracefully - Write a more realistic `ToArchive`/`FromArchive` tests - - Test edgecases for horizontal aggregations - Optimization: Once a buffer is full, calculate min, max and avg - Calculate averages buffer-wise, average weighted by length of buffer - Only the head-buffer needs to be fully traversed diff --git a/api.go b/api.go index 016ce7d..954cf34 100644 --- a/api.go +++ b/api.go @@ -222,8 +222,7 @@ func handleWrite(rw http.ResponseWriter, r *http.Request) { return } - reader := bufio.NewReader(r.Body) - dec := lineprotocol.NewDecoder(reader) + dec := lineprotocol.NewDecoder(bufio.NewReader(r.Body)) // Unlike the name suggests, handleLine can handle multiple lines if err := handleLine(dec); err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) @@ -305,6 +304,34 @@ func handleAllNodes(rw http.ResponseWriter, r *http.Request) { } } +// func handleCheckpoint(rw http.ResponseWriter, r *http.Request) { +// vars := mux.Vars(r) +// from, err := strconv.ParseInt(vars["from"], 10, 64) +// if err != nil { +// http.Error(rw, err.Error(), http.StatusBadRequest) +// return +// } +// to, err := strconv.ParseInt(vars["to"], 10, 64) +// if err != nil { +// http.Error(rw, err.Error(), http.StatusBadRequest) +// return +// } + +// log.Println("Checkpoint creation started...") +// n, err := memoryStore.ToCheckpoint(conf.Checkpoints.RootDir, from, to) +// if err != nil { +// log.Printf("Checkpoint creation failed: %s\n", err.Error()) +// rw.WriteHeader(http.StatusInternalServerError) +// return +// } else { +// log.Printf("Checkpoint finished (%d files)\n", n) +// } + +// memoryStore.FreeAll() + +// rw.WriteHeader(http.StatusOK) +// } + func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { authheader := r.Header.Get("Authorization") @@ -343,6 +370,7 @@ func StartApiServer(address string, ctx context.Context) error { r.HandleFunc("/api/{cluster}/peek", handlePeek) r.HandleFunc("/api/{cluster}/{from:[0-9]+}/{to:[0-9]+}/all-nodes", handleAllNodes) r.HandleFunc("/api/write", handleWrite) + // r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/checkpoint", handleCheckpoint) server := &http.Server{ Handler: r, diff --git a/archive.go b/archive.go index 5e32d49..bcd24cf 100644 --- a/archive.go +++ b/archive.go @@ -28,6 +28,8 @@ type CheckpointFile struct { Children map[string]*CheckpointFile `json:"children"` } +var ErrNoNewData error = errors.New("all data already archived") + // Metrics stored at the lowest 2 levels are not stored away (root and cluster)! // On a per-host basis a new JSON file is created. I have no idea if this will scale. // The good thing: Only a host at a time is locked, so this function can run @@ -46,15 +48,22 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { } m.root.lock.RUnlock() + n := 0 for i := 0; i < len(levels); i++ { dir := path.Join(dir, path.Join(selectors[i]...)) err := levels[i].toCheckpoint(dir, from, to, m) if err != nil { + if err == ErrNoNewData { + continue + } + return i, err } + + n += 1 } - return len(levels), nil + return 0, nil } func (l *level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFile, error) { @@ -73,6 +82,18 @@ func (l *level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil continue } + allArchived := true + b.iterFromTo(from, to, func(b *buffer) error { + if !b.archived { + allArchived = false + } + return nil + }) + + if allArchived { + continue + } + data := make([]Float, (to-from)/b.frequency+1) data, start, end, err := b.read(from, to, data) if err != nil { @@ -96,7 +117,13 @@ func (l *level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil return nil, err } - retval.Children[name] = val + if val != nil { + retval.Children[name] = val + } + } + + if len(retval.Children) == 0 && len(retval.Metrics) == 0 { + return nil, nil } return retval, nil @@ -108,6 +135,10 @@ func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { return err } + if cf == nil { + return ErrNoNewData + } + filepath := path.Join(dir, fmt.Sprintf("%d.json", from)) f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0644) if err != nil && os.IsNotExist(err) { @@ -146,6 +177,7 @@ func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error { data: metric.Data[0:n:n], // Space is wasted here :( prev: nil, next: nil, + archived: true, } minfo, ok := m.metrics[name] diff --git a/debug.go b/debug.go index a0b5888..81bd940 100644 --- a/debug.go +++ b/debug.go @@ -16,7 +16,7 @@ func (b *buffer) debugDump(w *bufio.Writer) { } to := b.start + b.frequency*int64(len(b.data)) - fmt.Fprintf(w, "buffer(from=%d, len=%d, to=%d)%s", b.start, len(b.data), to, end) + fmt.Fprintf(w, "buffer(from=%d, len=%d, to=%d, archived=%v)%s", b.start, len(b.data), to, b.archived, end) } func (l *level) debugDump(w *bufio.Writer, m *MemoryStore, indent string) error { @@ -32,7 +32,7 @@ func (l *level) debugDump(w *bufio.Writer, m *MemoryStore, indent string) error } } - if l.children != nil { + if l.children != nil && len(l.children) > 0 { fmt.Fprintf(w, "%schildren:\n", indent) for name, lvl := range l.children { fmt.Fprintf(w, "%s'%s':\n", indent, name) diff --git a/memstore.go b/memstore.go index a6e4a97..778b3c4 100644 --- a/memstore.go +++ b/memstore.go @@ -36,6 +36,7 @@ type buffer struct { start int64 // Timestamp of when `data[0]` was written. data []Float // The slice should never reallocacte as `cap(data)` is respected. prev, next *buffer // `prev` contains older data, `next` newer data. + archived bool // If true, this buffer is already archived } func newBuffer(ts, freq int64) *buffer { @@ -155,6 +156,24 @@ func (b *buffer) free(t int64) (int, error) { return 0, nil } +// Call `callback` on every buffer that contains data in the range from `from` to `to`. +func (b *buffer) iterFromTo(from, to int64, callback func(b *buffer) error) error { + if b == nil { + return nil + } + + if err := b.prev.iterFromTo(from, to, callback); err != nil { + return err + } + + end := b.start + int64(len(b.data))*b.frequency + if from <= b.start && end <= to { + return callback(b) + } + + return nil +} + // Could also be called "node" as this forms a node in a tree structure. // Called level because "node" might be confusing here. // Can be both a leaf or a inner node. In this tree structue, inner nodes can @@ -370,3 +389,11 @@ func (m *MemoryStore) Free(selector Selector, t int64) (int, error) { }) return n, err } + +func (m *MemoryStore) FreeAll() error { + for k := range m.root.children { + delete(m.root.children, k) + } + + return nil +} diff --git a/metric-store.go b/metric-store.go index 6ed25df..2618807 100644 --- a/metric-store.go +++ b/metric-store.go @@ -1,6 +1,7 @@ package main import ( + "bufio" "context" "encoding/json" "fmt" @@ -120,6 +121,7 @@ func handleLine(dec *lineprotocol.Decoder) error { return err } + // log.Printf("write: %s (%v) -> %v\n", string(measurement), selector, value) if err := memoryStore.Write(selector, t.Unix(), []Metric{ {Name: string(measurement), Value: value}, }); err != nil { @@ -217,18 +219,25 @@ func main() { if err != nil { log.Fatalf("Loading checkpoints failed: %s\n", err.Error()) } else { - log.Printf("Checkpoints loaded (%d files)\n", files) + log.Printf("Checkpoints loaded (%d files, from %s on)\n", files, restoreFrom.Format(time.RFC3339)) } ctx, shutdown := context.WithCancel(context.Background()) var wg sync.WaitGroup sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1) go func() { - <-sigs - log.Println("Shuting down...") - shutdown() + for { + sig := <-sigs + if sig == syscall.SIGUSR1 { + memoryStore.DebugDump(bufio.NewWriter(os.Stdout)) + continue + } + + log.Println("Shuting down...") + shutdown() + } }() intervals(&wg, ctx)