add config flag; fix bug in archive

This commit is contained in:
Lou Knauer 2021-12-15 10:58:03 +01:00
parent ac7e981321
commit 50731e43a8
2 changed files with 18 additions and 6 deletions

View File

@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"io" "io"
"io/fs" "io/fs"
"log"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
@ -27,6 +28,7 @@ type CheckpointMetrics struct {
type CheckpointFile struct { type CheckpointFile struct {
From int64 `json:"from"` From int64 `json:"from"`
To int64 `json:"to"`
Metrics map[string]*CheckpointMetrics `json:"metrics"` Metrics map[string]*CheckpointMetrics `json:"metrics"`
Children map[string]*CheckpointFile `json:"children"` Children map[string]*CheckpointFile `json:"children"`
} }
@ -75,6 +77,7 @@ func (l *level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil
retval := &CheckpointFile{ retval := &CheckpointFile{
From: from, From: from,
To: to,
Metrics: make(map[string]*CheckpointMetrics), Metrics: make(map[string]*CheckpointMetrics),
Children: make(map[string]*CheckpointFile), Children: make(map[string]*CheckpointFile),
} }
@ -168,8 +171,8 @@ func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
// This function can only be called once and before the very first write or read. // This function can only be called once and before the very first write or read.
// Different host's data is loaded to memory in parallel. // Different host's data is loaded to memory in parallel.
func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
workers := runtime.NumCPU() / 2 workers := (runtime.NumCPU() / 2) + 1
work := make(chan [2]string, workers) work := make(chan [2]string, workers*2)
errs := make(chan error, 1) errs := make(chan error, 1)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(workers + 1) wg.Add(workers + 1)
@ -216,8 +219,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
lvl := m.root.findLevelOrCreate(host[:], len(m.metrics)) lvl := m.root.findLevelOrCreate(host[:], len(m.metrics))
n, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m) n, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m)
if err != nil { if err != nil {
errs <- err log.Fatalf("error while loading checkpoints: %s", err.Error())
return
} }
atomic.AddInt32(&loadedFiles, int32(n)) atomic.AddInt32(&loadedFiles, int32(n))
} }
@ -249,7 +251,8 @@ func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
minfo, ok := m.metrics[name] minfo, ok := m.metrics[name]
if !ok { if !ok {
return errors.New("Unkown metric: " + name) continue
// return errors.New("Unkown metric: " + name)
} }
prev := l.metrics[minfo.offset] prev := l.metrics[minfo.offset]
@ -338,6 +341,10 @@ func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err
return filesLoaded, err return filesLoaded, err
} }
if cf.To != 0 && cf.To < from {
continue
}
if err = l.loadFile(cf, m); err != nil { if err = l.loadFile(cf, m); err != nil {
return filesLoaded, err return filesLoaded, err
} }

View File

@ -4,6 +4,7 @@ import (
"bufio" "bufio"
"context" "context"
"encoding/json" "encoding/json"
"flag"
"fmt" "fmt"
"log" "log"
"os" "os"
@ -212,8 +213,12 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) {
} }
func main() { func main() {
var configFile string
flag.StringVar(&configFile, "config", "./config.json", "configuration file")
flag.Parse()
startupTime := time.Now() startupTime := time.Now()
conf = loadConfiguration("config.json") conf = loadConfiguration(configFile)
memoryStore = NewMemoryStore(conf.Metrics) memoryStore = NewMemoryStore(conf.Metrics)
restoreFrom := startupTime.Add(-time.Duration(conf.Checkpoints.Restore) * time.Second) restoreFrom := startupTime.Add(-time.Duration(conf.Checkpoints.Restore) * time.Second)