From c50ab3047014144d7be73ac858cfb17c920d8f5f Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Tue, 7 Sep 2021 09:28:41 +0200 Subject: [PATCH] Introduce retention field in config.json --- TODO.md | 1 - config.json | 3 ++- lineprotocol.go | 52 ++++++++++++++++++++++++++++++++++++++++--------- metric-store.go | 28 ++++++++++++++++++-------- 4 files changed, 65 insertions(+), 19 deletions(-) diff --git a/TODO.md b/TODO.md index 8349e5f..6f985b5 100644 --- a/TODO.md +++ b/TODO.md @@ -14,6 +14,5 @@ - Calculate averages buffer-wise, average weighted by length of buffer - Only the head-buffer needs to be fully traversed - Implement basic support for query of most recent value for every metric on every host -- Optimize horizontal aggregations - All metrics are known in advance, including the level: Use this to replace `level.metrics` hashmap by slice? - ... diff --git a/config.json b/config.json index 9a5879e..0c007aa 100644 --- a/config.json +++ b/config.json @@ -13,8 +13,9 @@ "clock": { "frequency": 3, "aggregation": "avg", "scope": "cpu" }, "cpi": { "frequency": 3, "aggregation": "avg", "scope": "cpu" } }, + "retention-hours": 20, "restore-last-hours": 20, - "checkpoint-interval-hours": 600, + "checkpoint-interval-hours": 10, "archive-root": "./archive", "nats": "nats://localhost:4222" } diff --git a/lineprotocol.go b/lineprotocol.go index 7970550..b23330b 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -9,6 +9,7 @@ import ( "net" "strconv" "strings" + "sync" "time" "github.com/nats-io/nats.go" @@ -174,29 +175,62 @@ func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) err // Connect to a nats server and subscribe to "updates". This is a blocking // function. handleLine will be called for each line recieved via nats. // Send `true` through the done channel for gracefull termination. -func ReceiveNats(address string, handleLine func(line *Line), done chan bool) error { +func ReceiveNats(address string, handleLine func(line *Line), workers int, done chan bool) error { nc, err := nats.Connect(nats.DefaultURL) if err != nil { return err } defer nc.Close() - // Subscribe - if _, err := nc.Subscribe("updates", func(m *nats.Msg) { - line, err := Parse(string(m.Data)) - if err != nil { - log.Printf("parsing line failed: %s\n", err.Error()) - return + var sub *nats.Subscription + + if workers < 2 { + sub, err = nc.Subscribe("updates", func(m *nats.Msg) { + line, err := Parse(string(m.Data)) + if err != nil { + log.Printf("parsing line failed: %s\n", err.Error()) + return + } + + handleLine(line) + }) + } else { + msgs := make(chan *nats.Msg, 16) + var wg sync.WaitGroup + wg.Add(workers) + + for i := 0; i < workers; i++ { + go func() { + for msg := range msgs { + line, err := Parse(string(msg.Data)) + if err != nil { + log.Printf("parsing line failed: %s\n", err.Error()) + return + } + + handleLine(line) + } + }() } - handleLine(line) - }); err != nil { + sub, err = nc.Subscribe("updates", func(m *nats.Msg) { + msgs <- m + }) + + _ = <-done + close(msgs) + wg.Wait() + } + + if err != nil { return err } log.Printf("NATS subscription to 'updates' on '%s' established\n", address) for { _ = <-done + sub.Unsubscribe() + nc.Close() log.Println("NATS connection closed") return nil } diff --git a/metric-store.go b/metric-store.go index a5f266b..7eb5397 100644 --- a/metric-store.go +++ b/metric-store.go @@ -6,6 +6,7 @@ import ( "log" "os" "os/signal" + "runtime" "sync" "syscall" "time" @@ -19,16 +20,16 @@ type MetricConfig struct { type Config struct { Metrics map[string]MetricConfig `json:"metrics"` + RetentionHours int `json:"retention-hours"` RestoreLastHours int `json:"restore-last-hours"` CheckpointIntervalHours int `json:"checkpoint-interval-hours"` ArchiveRoot string `json:"archive-root"` Nats string `json:"nats"` } -const KEY_SEPERATOR string = "." - var conf Config var memoryStore *MemoryStore = nil +var lastCheckpoint time.Time func loadConfiguration(file string) Config { var config Config @@ -97,7 +98,7 @@ func main() { close(done) }() - lastCheckpoint := startupTime + lastCheckpoint = startupTime if conf.ArchiveRoot != "" && conf.CheckpointIntervalHours > 0 { wg.Add(3) go func() { @@ -105,18 +106,29 @@ func main() { ticks := time.Tick(d) for { select { - case _, _ = <-done: + case <-done: wg.Done() return case <-ticks: log.Println("Start making checkpoint...") - _, err := memoryStore.ToArchive(conf.ArchiveRoot, lastCheckpoint.Unix(), time.Now().Unix()) + now := time.Now() + n, err := memoryStore.ToArchive(conf.ArchiveRoot, lastCheckpoint.Unix(), now.Unix()) if err != nil { log.Printf("Making checkpoint failed: %s\n", err.Error()) } else { - log.Println("Checkpoint successfull!") + log.Printf("Checkpoint successfull (%d files written)\n", n) + } + lastCheckpoint = now + + if conf.RetentionHours > 0 { + log.Println("Freeing up memory...") + t := now.Add(-time.Duration(conf.RetentionHours) * time.Hour) + freed, err := memoryStore.Free([]string{}, t.Unix()) + if err != nil { + log.Printf("Freeing up memory failed: %s\n", err.Error()) + } + log.Printf("%d values freed\n", freed) } - lastCheckpoint = time.Now() } } }() @@ -133,7 +145,7 @@ func main() { }() go func() { - err := ReceiveNats(conf.Nats, handleLine, done) + err := ReceiveNats(conf.Nats, handleLine, runtime.NumCPU()-1, done) if err != nil { log.Fatal(err) }