diff --git a/api.go b/api.go index 045a4fa..b9403ee 100644 --- a/api.go +++ b/api.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "fmt" "log" "net/http" "strconv" @@ -143,11 +144,55 @@ func handleStats(rw http.ResponseWriter, r *http.Request) { } } +func handleFree(rw http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + to, err := strconv.ParseInt(vars["to"], 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + // TODO: lastCheckpoint might be modified by different go-routines. + // Load it using the sync/atomic package? + freeUpTo := lastCheckpoint.Unix() + if to < freeUpTo { + freeUpTo = to + } + + if r.Method != http.MethodPost { + http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + bodyDec := json.NewDecoder(r.Body) + var selectors [][]string + err = bodyDec.Decode(&selectors) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + n := 0 + for _, sel := range selectors { + bn, err := memoryStore.Free(sel, freeUpTo) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + n += bn + } + + rw.WriteHeader(http.StatusOK) + rw.Write([]byte(fmt.Sprintf("buffers freed: %d\n", n))) +} + func StartApiServer(address string, done chan bool) error { r := mux.NewRouter() r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries) r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats) + r.HandleFunc("/api/{to:[0-9]+}/free", handleFree) server := &http.Server{ Handler: r, diff --git a/archive.go b/archive.go index 6a18f8d..22ae86a 100644 --- a/archive.go +++ b/archive.go @@ -65,11 +65,16 @@ func (l *level) toArchiveFile(from, to int64) (*ArchiveFile, error) { } for metric, b := range l.metrics { - data, start, _, err := b.read(from, to) + data := make([]Float, (to-from)/b.frequency) + data, start, end, err := b.read(from, to, data) if err != nil { return nil, err } + for i := int((end - start) / b.frequency); i < len(data); i++ { + data[i] = NaN + } + retval.Metrics[metric] = &ArchiveMetrics{ Frequency: b.frequency, Start: start, diff --git a/stats.go b/stats.go index 1d2b10b..e47f74d 100644 --- a/stats.go +++ b/stats.go @@ -13,7 +13,7 @@ type Stats struct { } // Return `Stats` by value for less allocations/GC? -func (b *buffer) stats(from, to int64) (*Stats, int64, int64, error) { +func (b *buffer) stats(from, to int64) (Stats, int64, int64, error) { if from < b.start { if b.prev != nil { return b.prev.stats(from, to) @@ -50,7 +50,7 @@ func (b *buffer) stats(from, to int64) (*Stats, int64, int64, error) { max = math.Max(max, xf) } - return &Stats{ + return Stats{ Samples: samples, Avg: Float(sum) / Float(samples), Min: Float(min), @@ -61,59 +61,56 @@ func (b *buffer) stats(from, to int64) (*Stats, int64, int64, error) { // This function assmumes that `l.lock` is LOCKED! // It basically works just like level.read but calculates min/max/avg for that data level.read would return. // TODO: Make this DRY? -func (l *level) stats(metric string, from, to int64, aggregation string) (*Stats, int64, int64, error) { +func (l *level) stats(metric string, from, to int64, aggregation string) (Stats, int64, int64, error) { if b, ok := l.metrics[metric]; ok { return b.stats(from, to) } if len(l.children) == 0 { - return nil, 0, 0, errors.New("no data for that metric/level") - } - - if len(l.children) == 1 { - for _, child := range l.children { - child.lock.Lock() - stats, from, to, err := child.stats(metric, from, to, aggregation) - child.lock.Unlock() - return stats, from, to, err - } + return Stats{}, 0, 0, ErrNoData } + n := 0 samples := 0 - avgSum, min, max := Float(0), Float(math.MaxFloat32), Float(-math.MaxFloat32) + avg, min, max := Float(0), Float(math.MaxFloat32), Float(-math.MaxFloat32) for _, child := range l.children { - child.lock.Lock() + child.lock.RLock() stats, cfrom, cto, err := child.stats(metric, from, to, aggregation) - child.lock.Unlock() + child.lock.RUnlock() + + if err == ErrNoData { + continue + } if err != nil { - return nil, 0, 0, err + return Stats{}, 0, 0, err } - if cfrom != from || cto != to { - // See level.read for more on this - if samples == 0 { - from = cfrom - to = cto - } else { - return nil, 0, 0, errors.New("data for metrics at child levels does not align") - } + if n == 0 { + from = cfrom + to = cto + } else if cfrom != from || cto != to { + return Stats{}, 0, 0, ErrDataDoesNotAlign } samples += stats.Samples - avgSum += stats.Avg + avg += stats.Avg min = Float(math.Min(float64(min), float64(stats.Min))) max = Float(math.Max(float64(max), float64(stats.Max))) + n += 1 + } + + if n == 0 { + return Stats{}, 0, 0, ErrNoData } - avg := avgSum if aggregation == "avg" { - avg /= Float(len(l.children)) + avg /= Float(n) } else if aggregation != "sum" { - return nil, 0, 0, errors.New("invalid aggregation strategy: " + aggregation) + return Stats{}, 0, 0, errors.New("invalid aggregation strategy: " + aggregation) } - return &Stats{ + return Stats{ Samples: samples, Avg: avg, Min: min, @@ -135,5 +132,6 @@ func (m *MemoryStore) Stats(selector []string, metric string, from, to int64) (* return nil, 0, 0, errors.New("unkown metric: " + metric) } - return l.stats(metric, from, to, minfo.Aggregation) + stats, from, to, err := l.stats(metric, from, to, minfo.Aggregation) + return &stats, from, to, err }