From 50731e43a84f60ef200e86679e06b1c0b468b4e5 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Wed, 15 Dec 2021 10:58:03 +0100 Subject: [PATCH] add config flag; fix bug in archive --- archive.go | 17 ++++++++++++----- metric-store.go | 7 ++++++- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/archive.go b/archive.go index 1cf1fd9..2770963 100644 --- a/archive.go +++ b/archive.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "io/fs" + "log" "os" "path" "path/filepath" @@ -27,6 +28,7 @@ type CheckpointMetrics struct { type CheckpointFile struct { From int64 `json:"from"` + To int64 `json:"to"` Metrics map[string]*CheckpointMetrics `json:"metrics"` Children map[string]*CheckpointFile `json:"children"` } @@ -75,6 +77,7 @@ func (l *level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil retval := &CheckpointFile{ From: from, + To: to, Metrics: make(map[string]*CheckpointMetrics), Children: make(map[string]*CheckpointFile), } @@ -168,8 +171,8 @@ func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { // This function can only be called once and before the very first write or read. // Different host's data is loaded to memory in parallel. func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { - workers := runtime.NumCPU() / 2 - work := make(chan [2]string, workers) + workers := (runtime.NumCPU() / 2) + 1 + work := make(chan [2]string, workers*2) errs := make(chan error, 1) var wg sync.WaitGroup wg.Add(workers + 1) @@ -216,8 +219,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { lvl := m.root.findLevelOrCreate(host[:], len(m.metrics)) n, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m) if err != nil { - errs <- err - return + log.Fatalf("error while loading checkpoints: %s", err.Error()) } atomic.AddInt32(&loadedFiles, int32(n)) } @@ -249,7 +251,8 @@ func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error { minfo, ok := m.metrics[name] if !ok { - return errors.New("Unkown metric: " + name) + continue + // return errors.New("Unkown metric: " + name) } prev := l.metrics[minfo.offset] @@ -338,6 +341,10 @@ func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err return filesLoaded, err } + if cf.To != 0 && cf.To < from { + continue + } + if err = l.loadFile(cf, m); err != nil { return filesLoaded, err } diff --git a/metric-store.go b/metric-store.go index eb7ab17..e82a289 100644 --- a/metric-store.go +++ b/metric-store.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "encoding/json" + "flag" "fmt" "log" "os" @@ -212,8 +213,12 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) { } func main() { + var configFile string + flag.StringVar(&configFile, "config", "./config.json", "configuration file") + flag.Parse() + startupTime := time.Now() - conf = loadConfiguration("config.json") + conf = loadConfiguration(configFile) memoryStore = NewMemoryStore(conf.Metrics) restoreFrom := startupTime.Add(-time.Duration(conf.Checkpoints.Restore) * time.Second)