Introduce retention field in config.json

This commit is contained in:
Lou Knauer 2021-09-07 09:28:41 +02:00
parent 930974a8df
commit c50ab30470
4 changed files with 65 additions and 19 deletions

View File

@ -14,6 +14,5 @@
- Calculate averages buffer-wise, average weighted by length of buffer - Calculate averages buffer-wise, average weighted by length of buffer
- Only the head-buffer needs to be fully traversed - Only the head-buffer needs to be fully traversed
- Implement basic support for query of most recent value for every metric on every host - Implement basic support for query of most recent value for every metric on every host
- Optimize horizontal aggregations
- All metrics are known in advance, including the level: Use this to replace `level.metrics` hashmap by slice? - All metrics are known in advance, including the level: Use this to replace `level.metrics` hashmap by slice?
- ... - ...

View File

@ -13,8 +13,9 @@
"clock": { "frequency": 3, "aggregation": "avg", "scope": "cpu" }, "clock": { "frequency": 3, "aggregation": "avg", "scope": "cpu" },
"cpi": { "frequency": 3, "aggregation": "avg", "scope": "cpu" } "cpi": { "frequency": 3, "aggregation": "avg", "scope": "cpu" }
}, },
"retention-hours": 20,
"restore-last-hours": 20, "restore-last-hours": 20,
"checkpoint-interval-hours": 600, "checkpoint-interval-hours": 10,
"archive-root": "./archive", "archive-root": "./archive",
"nats": "nats://localhost:4222" "nats": "nats://localhost:4222"
} }

View File

@ -9,6 +9,7 @@ import (
"net" "net"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
@ -174,15 +175,17 @@ func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) err
// Connect to a nats server and subscribe to "updates". This is a blocking // Connect to a nats server and subscribe to "updates". This is a blocking
// function. handleLine will be called for each line recieved via nats. // function. handleLine will be called for each line recieved via nats.
// Send `true` through the done channel for gracefull termination. // Send `true` through the done channel for gracefull termination.
func ReceiveNats(address string, handleLine func(line *Line), done chan bool) error { func ReceiveNats(address string, handleLine func(line *Line), workers int, done chan bool) error {
nc, err := nats.Connect(nats.DefaultURL) nc, err := nats.Connect(nats.DefaultURL)
if err != nil { if err != nil {
return err return err
} }
defer nc.Close() defer nc.Close()
// Subscribe var sub *nats.Subscription
if _, err := nc.Subscribe("updates", func(m *nats.Msg) {
if workers < 2 {
sub, err = nc.Subscribe("updates", func(m *nats.Msg) {
line, err := Parse(string(m.Data)) line, err := Parse(string(m.Data))
if err != nil { if err != nil {
log.Printf("parsing line failed: %s\n", err.Error()) log.Printf("parsing line failed: %s\n", err.Error())
@ -190,13 +193,44 @@ func ReceiveNats(address string, handleLine func(line *Line), done chan bool) er
} }
handleLine(line) handleLine(line)
}); err != nil { })
} else {
msgs := make(chan *nats.Msg, 16)
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
for msg := range msgs {
line, err := Parse(string(msg.Data))
if err != nil {
log.Printf("parsing line failed: %s\n", err.Error())
return
}
handleLine(line)
}
}()
}
sub, err = nc.Subscribe("updates", func(m *nats.Msg) {
msgs <- m
})
_ = <-done
close(msgs)
wg.Wait()
}
if err != nil {
return err return err
} }
log.Printf("NATS subscription to 'updates' on '%s' established\n", address) log.Printf("NATS subscription to 'updates' on '%s' established\n", address)
for { for {
_ = <-done _ = <-done
sub.Unsubscribe()
nc.Close()
log.Println("NATS connection closed") log.Println("NATS connection closed")
return nil return nil
} }

View File

@ -6,6 +6,7 @@ import (
"log" "log"
"os" "os"
"os/signal" "os/signal"
"runtime"
"sync" "sync"
"syscall" "syscall"
"time" "time"
@ -19,16 +20,16 @@ type MetricConfig struct {
type Config struct { type Config struct {
Metrics map[string]MetricConfig `json:"metrics"` Metrics map[string]MetricConfig `json:"metrics"`
RetentionHours int `json:"retention-hours"`
RestoreLastHours int `json:"restore-last-hours"` RestoreLastHours int `json:"restore-last-hours"`
CheckpointIntervalHours int `json:"checkpoint-interval-hours"` CheckpointIntervalHours int `json:"checkpoint-interval-hours"`
ArchiveRoot string `json:"archive-root"` ArchiveRoot string `json:"archive-root"`
Nats string `json:"nats"` Nats string `json:"nats"`
} }
const KEY_SEPERATOR string = "."
var conf Config var conf Config
var memoryStore *MemoryStore = nil var memoryStore *MemoryStore = nil
var lastCheckpoint time.Time
func loadConfiguration(file string) Config { func loadConfiguration(file string) Config {
var config Config var config Config
@ -97,7 +98,7 @@ func main() {
close(done) close(done)
}() }()
lastCheckpoint := startupTime lastCheckpoint = startupTime
if conf.ArchiveRoot != "" && conf.CheckpointIntervalHours > 0 { if conf.ArchiveRoot != "" && conf.CheckpointIntervalHours > 0 {
wg.Add(3) wg.Add(3)
go func() { go func() {
@ -105,18 +106,29 @@ func main() {
ticks := time.Tick(d) ticks := time.Tick(d)
for { for {
select { select {
case _, _ = <-done: case <-done:
wg.Done() wg.Done()
return return
case <-ticks: case <-ticks:
log.Println("Start making checkpoint...") log.Println("Start making checkpoint...")
_, err := memoryStore.ToArchive(conf.ArchiveRoot, lastCheckpoint.Unix(), time.Now().Unix()) now := time.Now()
n, err := memoryStore.ToArchive(conf.ArchiveRoot, lastCheckpoint.Unix(), now.Unix())
if err != nil { if err != nil {
log.Printf("Making checkpoint failed: %s\n", err.Error()) log.Printf("Making checkpoint failed: %s\n", err.Error())
} else { } else {
log.Println("Checkpoint successfull!") log.Printf("Checkpoint successfull (%d files written)\n", n)
}
lastCheckpoint = now
if conf.RetentionHours > 0 {
log.Println("Freeing up memory...")
t := now.Add(-time.Duration(conf.RetentionHours) * time.Hour)
freed, err := memoryStore.Free([]string{}, t.Unix())
if err != nil {
log.Printf("Freeing up memory failed: %s\n", err.Error())
}
log.Printf("%d values freed\n", freed)
} }
lastCheckpoint = time.Now()
} }
} }
}() }()
@ -133,7 +145,7 @@ func main() {
}() }()
go func() { go func() {
err := ReceiveNats(conf.Nats, handleLine, done) err := ReceiveNats(conf.Nats, handleLine, runtime.NumCPU()-1, done)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }