Rename archive to checkpoints; new config

This commit is contained in:
Lou Knauer 2021-09-13 12:28:33 +02:00
parent 4d17abdbc8
commit 372d07b454
5 changed files with 145 additions and 110 deletions

View File

@ -13,23 +13,23 @@ import (
"strings"
)
type ArchiveMetrics struct {
type CheckpointMetrics struct {
Frequency int64 `json:"frequency"`
Start int64 `json:"start"`
Data []Float `json:"data"`
}
type ArchiveFile struct {
type CheckpointFile struct {
From int64 `json:"from"`
Metrics map[string]*ArchiveMetrics `json:"metrics"`
Children map[string]*ArchiveFile `json:"children"`
Metrics map[string]*CheckpointMetrics `json:"metrics"`
Children map[string]*CheckpointFile `json:"children"`
}
// Metrics stored at the lowest 2 levels are not stored away (root and cluster)!
// On a per-host basis a new JSON file is created. I have no idea if this will scale.
// The good thing: Only a host at a time is locked, so this function can run
// in parallel to writes/reads.
func (m *MemoryStore) ToArchive(archiveRoot string, from, to int64) (int, error) {
func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) {
levels := make([]*level, 0)
selectors := make([][]string, 0)
m.root.lock.RLock()
@ -44,8 +44,8 @@ func (m *MemoryStore) ToArchive(archiveRoot string, from, to int64) (int, error)
m.root.lock.RUnlock()
for i := 0; i < len(levels); i++ {
dir := path.Join(archiveRoot, path.Join(selectors[i]...))
err := levels[i].toArchive(dir, from, to, m)
dir := path.Join(dir, path.Join(selectors[i]...))
err := levels[i].toCheckpoint(dir, from, to, m)
if err != nil {
return i, err
}
@ -54,14 +54,14 @@ func (m *MemoryStore) ToArchive(archiveRoot string, from, to int64) (int, error)
return len(levels), nil
}
func (l *level) toArchiveFile(from, to int64, m *MemoryStore) (*ArchiveFile, error) {
func (l *level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFile, error) {
l.lock.RLock()
defer l.lock.RUnlock()
retval := &ArchiveFile{
retval := &CheckpointFile{
From: from,
Metrics: make(map[string]*ArchiveMetrics),
Children: make(map[string]*ArchiveFile),
Metrics: make(map[string]*CheckpointMetrics),
Children: make(map[string]*CheckpointFile),
}
for metric, minfo := range m.metrics {
@ -80,7 +80,7 @@ func (l *level) toArchiveFile(from, to int64, m *MemoryStore) (*ArchiveFile, err
data[i] = NaN
}
retval.Metrics[metric] = &ArchiveMetrics{
retval.Metrics[metric] = &CheckpointMetrics{
Frequency: b.frequency,
Start: start,
Data: data,
@ -88,7 +88,7 @@ func (l *level) toArchiveFile(from, to int64, m *MemoryStore) (*ArchiveFile, err
}
for name, child := range l.children {
val, err := child.toArchiveFile(from, to, m)
val, err := child.toCheckpointFile(from, to, m)
if err != nil {
return nil, err
}
@ -99,8 +99,8 @@ func (l *level) toArchiveFile(from, to int64, m *MemoryStore) (*ArchiveFile, err
return retval, nil
}
func (l *level) toArchive(dir string, from, to int64, m *MemoryStore) error {
af, err := l.toArchiveFile(from, to, m)
func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
cf, err := l.toCheckpointFile(from, to, m)
if err != nil {
return err
}
@ -117,10 +117,10 @@ func (l *level) toArchive(dir string, from, to int64, m *MemoryStore) error {
return err
}
defer f.Close()
bw := bufio.NewWriter(f)
err = json.NewEncoder(bw).Encode(af)
if err != nil {
if err = json.NewEncoder(bw).Encode(cf); err != nil {
return err
}
@ -129,13 +129,13 @@ func (l *level) toArchive(dir string, from, to int64, m *MemoryStore) error {
// Metrics stored at the lowest 2 levels are not loaded (root and cluster)!
// This function can only be called once and before the very first write or read.
// Unlike ToArchive, this function is NOT thread-safe.
func (m *MemoryStore) FromArchive(archiveRoot string, from int64) (int, error) {
return m.root.fromArchive(archiveRoot, from, m)
// Unlike ToCheckpoint, this function is NOT thread-safe.
func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
return m.root.fromCheckpoint(dir, from, m)
}
func (l *level) loadFile(af *ArchiveFile, m *MemoryStore) error {
for name, metric := range af.Metrics {
func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
for name, metric := range cf.Metrics {
n := len(metric.Data)
b := &buffer{
frequency: metric.Frequency,
@ -164,18 +164,21 @@ func (l *level) loadFile(af *ArchiveFile, m *MemoryStore) error {
l.metrics[minfo.offset] = b
}
for sel, childAf := range af.Children {
if len(cf.Children) > 0 && l.children == nil {
l.children = make(map[string]*level)
}
for sel, childCf := range cf.Children {
child, ok := l.children[sel]
if !ok {
child = &level{
metrics: make([]*buffer, len(m.metrics)),
children: make(map[string]*level),
children: nil,
}
l.children[sel] = child
}
err := child.loadFile(childAf, m)
if err != nil {
if err := child.loadFile(childCf, m); err != nil {
return err
}
}
@ -183,7 +186,7 @@ func (l *level) loadFile(af *ArchiveFile, m *MemoryStore) error {
return nil
}
func (l *level) fromArchive(dir string, from int64, m *MemoryStore) (int, error) {
func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, error) {
direntries, err := os.ReadDir(dir)
if err != nil {
return 0, err
@ -198,7 +201,7 @@ func (l *level) fromArchive(dir string, from int64, m *MemoryStore) (int, error)
children: make(map[string]*level),
}
files, err := child.fromArchive(path.Join(dir, e.Name()), from, m)
files, err := child.fromCheckpoint(path.Join(dir, e.Name()), from, m)
filesLoaded += files
if err != nil {
return filesLoaded, err
@ -208,7 +211,7 @@ func (l *level) fromArchive(dir string, from int64, m *MemoryStore) (int, error)
} else if strings.HasSuffix(e.Name(), ".json") {
jsonFiles = append(jsonFiles, e)
} else {
return filesLoaded, errors.New("unexpected file in archive")
return filesLoaded, errors.New("unexpected file: " + dir + "/" + e.Name())
}
}
@ -223,14 +226,17 @@ func (l *level) fromArchive(dir string, from int64, m *MemoryStore) (int, error)
return filesLoaded, err
}
af := &ArchiveFile{}
err = json.NewDecoder(bufio.NewReader(f)).Decode(af)
if err != nil {
br := bufio.NewReader(f)
cf := &CheckpointFile{}
if err = json.NewDecoder(br).Decode(cf); err != nil {
return filesLoaded, err
}
err = l.loadFile(af, m)
if err != nil {
if err = l.loadFile(cf, m); err != nil {
return filesLoaded, err
}
if err = f.Close(); err != nil {
return filesLoaded, err
}

View File

@ -13,9 +13,15 @@
"clock": { "frequency": 3, "aggregation": "avg", "scope": "cpu" },
"cpi": { "frequency": 3, "aggregation": "avg", "scope": "cpu" }
},
"retention-hours": 20,
"restore-last-hours": 20,
"checkpoint-interval-hours": 10,
"archive-root": "./archive",
"checkpoints": {
"interval": 21600,
"directory": "./var/checkpoints",
"restore": 43200
},
"archive": {
"interval": 86400,
"directory": "./var/archive"
},
"retention-in-memory": 86400,
"nats": "nats://localhost:4222"
}

View File

@ -192,8 +192,6 @@ func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level {
l.lock.Unlock()
return child.findLevelOrCreate(selector[1:], nMetrics)
}
} else {
l.children = make(map[string]*level)
}
child = &level{
@ -201,11 +199,16 @@ func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level {
children: nil,
}
if l.children != nil {
l.children[selector[0]] = child
} else {
l.children = map[string]*level{selector[0]: child}
}
l.lock.Unlock()
return child.findLevelOrCreate(selector[1:], nMetrics)
}
// For aggregation over multiple values at different cpus/sockets/..., not time!
type AggregationStrategy int
const (
@ -223,6 +226,8 @@ type MemoryStore struct {
}
}
// Return a new, initialized instance of a MemoryStore.
// Will panic if values in the metric configurations are invalid.
func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore {
ms := make(map[string]struct {
offset int
@ -273,7 +278,7 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error
for _, metric := range metrics {
minfo, ok := m.metrics[metric.Name]
if !ok {
continue
return errors.New("Unknown metric: " + metric.Name)
}
b := l.metrics[minfo.offset]
@ -296,9 +301,10 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error
return nil
}
// Returns all values for metric `metric` from `from` to `to` for the selected level.
// Returns all values for metric `metric` from `from` to `to` for the selected level(s).
// If the level does not hold the metric itself, the data will be aggregated recursively from the children.
// See `level.read` for more information.
// The second and third return value are the actual from/to for the data. Those can be different from
// the range asked for if no data was available.
func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]Float, int64, int64, error) {
if from > to {
return nil, 0, 0, errors.New("invalid time range")

View File

@ -263,13 +263,13 @@ func TestMemoryStoreArchive(t *testing.T) {
}
archiveRoot := t.TempDir()
_, err := store1.ToArchive(archiveRoot, 100, 100+int64(count/2))
_, err := store1.ToCheckpoint(archiveRoot, 100, 100+int64(count/2))
if err != nil {
t.Error(err)
return
}
_, err = store1.ToArchive(archiveRoot, 100+int64(count/2), 100+int64(count))
_, err = store1.ToCheckpoint(archiveRoot, 100+int64(count/2), 100+int64(count))
if err != nil {
t.Error(err)
return
@ -279,7 +279,7 @@ func TestMemoryStoreArchive(t *testing.T) {
"a": {Frequency: 1},
"b": {Frequency: 1},
})
n, err := store2.FromArchive(archiveRoot, 100)
n, err := store2.FromCheckpoint(archiveRoot, 100)
if err != nil {
t.Error(err)
return

View File

@ -7,7 +7,6 @@ import (
"log"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
@ -21,11 +20,17 @@ type MetricConfig struct {
type Config struct {
Metrics map[string]MetricConfig `json:"metrics"`
RetentionHours int `json:"retention-hours"`
RestoreLastHours int `json:"restore-last-hours"`
CheckpointIntervalHours int `json:"checkpoint-interval-hours"`
ArchiveRoot string `json:"archive-root"`
RetentionInMemory int `json:"retention-in-memory"`
Nats string `json:"nats"`
Checkpoints struct {
Interval int `json:"interval"`
RootDir string `json:"directory"`
Restore int `json:"restore"`
} `json:"checkpoints"`
Archive struct {
Interval int `json:"interval"`
RootDir string `json:"directory"`
} `json:"archive"`
}
var conf Config
@ -70,22 +75,68 @@ func handleLine(line *Line) {
}
}
func intervals(wg *sync.WaitGroup, ctx context.Context) {
wg.Add(2)
go func() {
defer wg.Done()
d := time.Duration(conf.RetentionInMemory) * time.Second
ticks := time.Tick(d / 2)
for {
select {
case <-ctx.Done():
return
case <-ticks:
log.Println("Freeing up memory...")
t := time.Now().Add(-d)
freed, err := memoryStore.Free(Selector{}, t.Unix())
if err != nil {
log.Printf("Freeing up memory failed: %s\n", err.Error())
} else {
log.Printf("%d buffers freed\n", freed)
}
}
}
}()
lastCheckpoint = time.Now()
go func() {
defer wg.Done()
d := time.Duration(conf.Checkpoints.Interval) * time.Second
ticks := time.Tick(d)
for {
select {
case <-ctx.Done():
return
case <-ticks:
log.Printf("Checkpoint creation started...")
now := time.Now()
n, err := memoryStore.ToCheckpoint(conf.Checkpoints.RootDir,
lastCheckpoint.Unix(), now.Unix())
if err != nil {
log.Printf("Checkpoint creation failed: %s\n", err.Error())
} else {
log.Printf("Checkpoint finished (%d files)\n", n)
lastCheckpoint = now
}
}
}
}()
// TODO: Implement Archive-Stuff:
// Zip multiple checkpoints together, write to archive, delete from checkpoints
}
func main() {
startupTime := time.Now()
conf = loadConfiguration("config.json")
memoryStore = NewMemoryStore(conf.Metrics)
if conf.ArchiveRoot != "" && conf.RestoreLastHours > 0 {
d := time.Duration(conf.RestoreLastHours) * time.Hour
from := startupTime.Add(-d).Unix()
log.Printf("Restoring data since %d from '%s'...\n", from, conf.ArchiveRoot)
files, err := memoryStore.FromArchive(conf.ArchiveRoot, from)
restoreFrom := startupTime.Add(-time.Duration(conf.Checkpoints.Restore))
files, err := memoryStore.FromCheckpoint(conf.Checkpoints.RootDir, restoreFrom.Unix())
if err != nil {
log.Printf("Loading archive failed: %s\n", err.Error())
log.Fatalf("Loading checkpoints failed: %s\n", err.Error())
} else {
log.Printf("Archive loaded (%d files)\n", files)
}
log.Printf("Checkpoints loaded (%d files)\n", files)
}
ctx, shutdown := context.WithCancel(context.Background())
@ -99,43 +150,9 @@ func main() {
shutdown()
}()
lastCheckpoint = startupTime
if conf.ArchiveRoot != "" && conf.CheckpointIntervalHours > 0 {
wg.Add(3)
go func() {
d := time.Duration(conf.CheckpointIntervalHours) * time.Hour
ticks := time.Tick(d)
for {
select {
case <-ctx.Done():
wg.Done()
return
case <-ticks:
log.Println("Start making checkpoint...")
now := time.Now()
n, err := memoryStore.ToArchive(conf.ArchiveRoot, lastCheckpoint.Unix(), now.Unix())
if err != nil {
log.Printf("Making checkpoint failed: %s\n", err.Error())
} else {
log.Printf("Checkpoint successfull (%d files written)\n", n)
}
lastCheckpoint = now
intervals(&wg, ctx)
if conf.RetentionHours > 0 {
log.Println("Freeing up memory...")
t := now.Add(-time.Duration(conf.RetentionHours) * time.Hour)
freed, err := memoryStore.Free(Selector{}, t.Unix())
if err != nil {
log.Printf("Freeing up memory failed: %s\n", err.Error())
}
log.Printf("%d values freed\n", freed)
}
}
}
}()
} else {
wg.Add(2)
}
go func() {
err := StartApiServer(":8080", ctx)
@ -146,7 +163,9 @@ func main() {
}()
go func() {
err := ReceiveNats(conf.Nats, handleLine, runtime.NumCPU()-1, ctx)
// err := ReceiveNats(conf.Nats, handleLine, runtime.NumCPU()-1, ctx)
err := ReceiveNats(conf.Nats, handleLine, 1, ctx)
if err != nil {
log.Fatal(err)
}
@ -155,12 +174,10 @@ func main() {
wg.Wait()
if conf.ArchiveRoot != "" {
log.Printf("Writing to '%s'...\n", conf.ArchiveRoot)
files, err := memoryStore.ToArchive(conf.ArchiveRoot, lastCheckpoint.Unix(), time.Now().Unix())
log.Printf("Writing to '%s'...\n", conf.Checkpoints.RootDir)
files, err = memoryStore.ToCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
if err != nil {
log.Printf("Writing to archive failed: %s\n", err.Error())
log.Printf("Writing checkpoint failed: %s\n", err.Error())
}
log.Printf("Done! (%d files written)\n", files)
}
}