diff --git a/archive.go b/archive.go index 18f8c32..cb6295d 100644 --- a/archive.go +++ b/archive.go @@ -13,23 +13,23 @@ import ( "strings" ) -type ArchiveMetrics struct { +type CheckpointMetrics struct { Frequency int64 `json:"frequency"` Start int64 `json:"start"` Data []Float `json:"data"` } -type ArchiveFile struct { - From int64 `json:"from"` - Metrics map[string]*ArchiveMetrics `json:"metrics"` - Children map[string]*ArchiveFile `json:"children"` +type CheckpointFile struct { + From int64 `json:"from"` + Metrics map[string]*CheckpointMetrics `json:"metrics"` + Children map[string]*CheckpointFile `json:"children"` } // Metrics stored at the lowest 2 levels are not stored away (root and cluster)! // On a per-host basis a new JSON file is created. I have no idea if this will scale. // The good thing: Only a host at a time is locked, so this function can run // in parallel to writes/reads. -func (m *MemoryStore) ToArchive(archiveRoot string, from, to int64) (int, error) { +func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { levels := make([]*level, 0) selectors := make([][]string, 0) m.root.lock.RLock() @@ -44,8 +44,8 @@ func (m *MemoryStore) ToArchive(archiveRoot string, from, to int64) (int, error) m.root.lock.RUnlock() for i := 0; i < len(levels); i++ { - dir := path.Join(archiveRoot, path.Join(selectors[i]...)) - err := levels[i].toArchive(dir, from, to, m) + dir := path.Join(dir, path.Join(selectors[i]...)) + err := levels[i].toCheckpoint(dir, from, to, m) if err != nil { return i, err } @@ -54,14 +54,14 @@ func (m *MemoryStore) ToArchive(archiveRoot string, from, to int64) (int, error) return len(levels), nil } -func (l *level) toArchiveFile(from, to int64, m *MemoryStore) (*ArchiveFile, error) { +func (l *level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFile, error) { l.lock.RLock() defer l.lock.RUnlock() - retval := &ArchiveFile{ + retval := &CheckpointFile{ From: from, - Metrics: make(map[string]*ArchiveMetrics), - Children: make(map[string]*ArchiveFile), + Metrics: make(map[string]*CheckpointMetrics), + Children: make(map[string]*CheckpointFile), } for metric, minfo := range m.metrics { @@ -80,7 +80,7 @@ func (l *level) toArchiveFile(from, to int64, m *MemoryStore) (*ArchiveFile, err data[i] = NaN } - retval.Metrics[metric] = &ArchiveMetrics{ + retval.Metrics[metric] = &CheckpointMetrics{ Frequency: b.frequency, Start: start, Data: data, @@ -88,7 +88,7 @@ func (l *level) toArchiveFile(from, to int64, m *MemoryStore) (*ArchiveFile, err } for name, child := range l.children { - val, err := child.toArchiveFile(from, to, m) + val, err := child.toCheckpointFile(from, to, m) if err != nil { return nil, err } @@ -99,8 +99,8 @@ func (l *level) toArchiveFile(from, to int64, m *MemoryStore) (*ArchiveFile, err return retval, nil } -func (l *level) toArchive(dir string, from, to int64, m *MemoryStore) error { - af, err := l.toArchiveFile(from, to, m) +func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { + cf, err := l.toCheckpointFile(from, to, m) if err != nil { return err } @@ -117,10 +117,10 @@ func (l *level) toArchive(dir string, from, to int64, m *MemoryStore) error { return err } defer f.Close() + bw := bufio.NewWriter(f) - err = json.NewEncoder(bw).Encode(af) - if err != nil { + if err = json.NewEncoder(bw).Encode(cf); err != nil { return err } @@ -129,13 +129,13 @@ func (l *level) toArchive(dir string, from, to int64, m *MemoryStore) error { // Metrics stored at the lowest 2 levels are not loaded (root and cluster)! // This function can only be called once and before the very first write or read. -// Unlike ToArchive, this function is NOT thread-safe. -func (m *MemoryStore) FromArchive(archiveRoot string, from int64) (int, error) { - return m.root.fromArchive(archiveRoot, from, m) +// Unlike ToCheckpoint, this function is NOT thread-safe. +func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { + return m.root.fromCheckpoint(dir, from, m) } -func (l *level) loadFile(af *ArchiveFile, m *MemoryStore) error { - for name, metric := range af.Metrics { +func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error { + for name, metric := range cf.Metrics { n := len(metric.Data) b := &buffer{ frequency: metric.Frequency, @@ -164,18 +164,21 @@ func (l *level) loadFile(af *ArchiveFile, m *MemoryStore) error { l.metrics[minfo.offset] = b } - for sel, childAf := range af.Children { + if len(cf.Children) > 0 && l.children == nil { + l.children = make(map[string]*level) + } + + for sel, childCf := range cf.Children { child, ok := l.children[sel] if !ok { child = &level{ metrics: make([]*buffer, len(m.metrics)), - children: make(map[string]*level), + children: nil, } l.children[sel] = child } - err := child.loadFile(childAf, m) - if err != nil { + if err := child.loadFile(childCf, m); err != nil { return err } } @@ -183,7 +186,7 @@ func (l *level) loadFile(af *ArchiveFile, m *MemoryStore) error { return nil } -func (l *level) fromArchive(dir string, from int64, m *MemoryStore) (int, error) { +func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, error) { direntries, err := os.ReadDir(dir) if err != nil { return 0, err @@ -198,7 +201,7 @@ func (l *level) fromArchive(dir string, from int64, m *MemoryStore) (int, error) children: make(map[string]*level), } - files, err := child.fromArchive(path.Join(dir, e.Name()), from, m) + files, err := child.fromCheckpoint(path.Join(dir, e.Name()), from, m) filesLoaded += files if err != nil { return filesLoaded, err @@ -208,7 +211,7 @@ func (l *level) fromArchive(dir string, from int64, m *MemoryStore) (int, error) } else if strings.HasSuffix(e.Name(), ".json") { jsonFiles = append(jsonFiles, e) } else { - return filesLoaded, errors.New("unexpected file in archive") + return filesLoaded, errors.New("unexpected file: " + dir + "/" + e.Name()) } } @@ -223,14 +226,17 @@ func (l *level) fromArchive(dir string, from int64, m *MemoryStore) (int, error) return filesLoaded, err } - af := &ArchiveFile{} - err = json.NewDecoder(bufio.NewReader(f)).Decode(af) - if err != nil { + br := bufio.NewReader(f) + cf := &CheckpointFile{} + if err = json.NewDecoder(br).Decode(cf); err != nil { return filesLoaded, err } - err = l.loadFile(af, m) - if err != nil { + if err = l.loadFile(cf, m); err != nil { + return filesLoaded, err + } + + if err = f.Close(); err != nil { return filesLoaded, err } diff --git a/config.json b/config.json index 0c007aa..db3f01f 100644 --- a/config.json +++ b/config.json @@ -13,9 +13,15 @@ "clock": { "frequency": 3, "aggregation": "avg", "scope": "cpu" }, "cpi": { "frequency": 3, "aggregation": "avg", "scope": "cpu" } }, - "retention-hours": 20, - "restore-last-hours": 20, - "checkpoint-interval-hours": 10, - "archive-root": "./archive", + "checkpoints": { + "interval": 21600, + "directory": "./var/checkpoints", + "restore": 43200 + }, + "archive": { + "interval": 86400, + "directory": "./var/archive" + }, + "retention-in-memory": 86400, "nats": "nats://localhost:4222" } diff --git a/memstore.go b/memstore.go index ed53e86..534a847 100644 --- a/memstore.go +++ b/memstore.go @@ -192,8 +192,6 @@ func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level { l.lock.Unlock() return child.findLevelOrCreate(selector[1:], nMetrics) } - } else { - l.children = make(map[string]*level) } child = &level{ @@ -201,11 +199,16 @@ func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level { children: nil, } - l.children[selector[0]] = child + if l.children != nil { + l.children[selector[0]] = child + } else { + l.children = map[string]*level{selector[0]: child} + } l.lock.Unlock() return child.findLevelOrCreate(selector[1:], nMetrics) } +// For aggregation over multiple values at different cpus/sockets/..., not time! type AggregationStrategy int const ( @@ -223,6 +226,8 @@ type MemoryStore struct { } } +// Return a new, initialized instance of a MemoryStore. +// Will panic if values in the metric configurations are invalid. func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore { ms := make(map[string]struct { offset int @@ -273,7 +278,7 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error for _, metric := range metrics { minfo, ok := m.metrics[metric.Name] if !ok { - continue + return errors.New("Unknown metric: " + metric.Name) } b := l.metrics[minfo.offset] @@ -296,9 +301,10 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error return nil } -// Returns all values for metric `metric` from `from` to `to` for the selected level. +// Returns all values for metric `metric` from `from` to `to` for the selected level(s). // If the level does not hold the metric itself, the data will be aggregated recursively from the children. -// See `level.read` for more information. +// The second and third return value are the actual from/to for the data. Those can be different from +// the range asked for if no data was available. func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]Float, int64, int64, error) { if from > to { return nil, 0, 0, errors.New("invalid time range") diff --git a/memstore_test.go b/memstore_test.go index 50b7102..e8a0a94 100644 --- a/memstore_test.go +++ b/memstore_test.go @@ -263,13 +263,13 @@ func TestMemoryStoreArchive(t *testing.T) { } archiveRoot := t.TempDir() - _, err := store1.ToArchive(archiveRoot, 100, 100+int64(count/2)) + _, err := store1.ToCheckpoint(archiveRoot, 100, 100+int64(count/2)) if err != nil { t.Error(err) return } - _, err = store1.ToArchive(archiveRoot, 100+int64(count/2), 100+int64(count)) + _, err = store1.ToCheckpoint(archiveRoot, 100+int64(count/2), 100+int64(count)) if err != nil { t.Error(err) return @@ -279,7 +279,7 @@ func TestMemoryStoreArchive(t *testing.T) { "a": {Frequency: 1}, "b": {Frequency: 1}, }) - n, err := store2.FromArchive(archiveRoot, 100) + n, err := store2.FromCheckpoint(archiveRoot, 100) if err != nil { t.Error(err) return diff --git a/metric-store.go b/metric-store.go index b641736..9d49b98 100644 --- a/metric-store.go +++ b/metric-store.go @@ -7,7 +7,6 @@ import ( "log" "os" "os/signal" - "runtime" "sync" "syscall" "time" @@ -20,12 +19,18 @@ type MetricConfig struct { } type Config struct { - Metrics map[string]MetricConfig `json:"metrics"` - RetentionHours int `json:"retention-hours"` - RestoreLastHours int `json:"restore-last-hours"` - CheckpointIntervalHours int `json:"checkpoint-interval-hours"` - ArchiveRoot string `json:"archive-root"` - Nats string `json:"nats"` + Metrics map[string]MetricConfig `json:"metrics"` + RetentionInMemory int `json:"retention-in-memory"` + Nats string `json:"nats"` + Checkpoints struct { + Interval int `json:"interval"` + RootDir string `json:"directory"` + Restore int `json:"restore"` + } `json:"checkpoints"` + Archive struct { + Interval int `json:"interval"` + RootDir string `json:"directory"` + } `json:"archive"` } var conf Config @@ -70,22 +75,68 @@ func handleLine(line *Line) { } } +func intervals(wg *sync.WaitGroup, ctx context.Context) { + wg.Add(2) + go func() { + defer wg.Done() + d := time.Duration(conf.RetentionInMemory) * time.Second + ticks := time.Tick(d / 2) + for { + select { + case <-ctx.Done(): + return + case <-ticks: + log.Println("Freeing up memory...") + t := time.Now().Add(-d) + freed, err := memoryStore.Free(Selector{}, t.Unix()) + if err != nil { + log.Printf("Freeing up memory failed: %s\n", err.Error()) + } else { + log.Printf("%d buffers freed\n", freed) + } + } + } + }() + + lastCheckpoint = time.Now() + go func() { + defer wg.Done() + d := time.Duration(conf.Checkpoints.Interval) * time.Second + ticks := time.Tick(d) + for { + select { + case <-ctx.Done(): + return + case <-ticks: + log.Printf("Checkpoint creation started...") + now := time.Now() + n, err := memoryStore.ToCheckpoint(conf.Checkpoints.RootDir, + lastCheckpoint.Unix(), now.Unix()) + if err != nil { + log.Printf("Checkpoint creation failed: %s\n", err.Error()) + } else { + log.Printf("Checkpoint finished (%d files)\n", n) + lastCheckpoint = now + } + } + } + }() + + // TODO: Implement Archive-Stuff: + // Zip multiple checkpoints together, write to archive, delete from checkpoints +} + func main() { startupTime := time.Now() conf = loadConfiguration("config.json") - memoryStore = NewMemoryStore(conf.Metrics) - if conf.ArchiveRoot != "" && conf.RestoreLastHours > 0 { - d := time.Duration(conf.RestoreLastHours) * time.Hour - from := startupTime.Add(-d).Unix() - log.Printf("Restoring data since %d from '%s'...\n", from, conf.ArchiveRoot) - files, err := memoryStore.FromArchive(conf.ArchiveRoot, from) - if err != nil { - log.Printf("Loading archive failed: %s\n", err.Error()) - } else { - log.Printf("Archive loaded (%d files)\n", files) - } + restoreFrom := startupTime.Add(-time.Duration(conf.Checkpoints.Restore)) + files, err := memoryStore.FromCheckpoint(conf.Checkpoints.RootDir, restoreFrom.Unix()) + if err != nil { + log.Fatalf("Loading checkpoints failed: %s\n", err.Error()) + } else { + log.Printf("Checkpoints loaded (%d files)\n", files) } ctx, shutdown := context.WithCancel(context.Background()) @@ -99,43 +150,9 @@ func main() { shutdown() }() - lastCheckpoint = startupTime - if conf.ArchiveRoot != "" && conf.CheckpointIntervalHours > 0 { - wg.Add(3) - go func() { - d := time.Duration(conf.CheckpointIntervalHours) * time.Hour - ticks := time.Tick(d) - for { - select { - case <-ctx.Done(): - wg.Done() - return - case <-ticks: - log.Println("Start making checkpoint...") - now := time.Now() - n, err := memoryStore.ToArchive(conf.ArchiveRoot, lastCheckpoint.Unix(), now.Unix()) - if err != nil { - log.Printf("Making checkpoint failed: %s\n", err.Error()) - } else { - log.Printf("Checkpoint successfull (%d files written)\n", n) - } - lastCheckpoint = now + intervals(&wg, ctx) - if conf.RetentionHours > 0 { - log.Println("Freeing up memory...") - t := now.Add(-time.Duration(conf.RetentionHours) * time.Hour) - freed, err := memoryStore.Free(Selector{}, t.Unix()) - if err != nil { - log.Printf("Freeing up memory failed: %s\n", err.Error()) - } - log.Printf("%d values freed\n", freed) - } - } - } - }() - } else { - wg.Add(2) - } + wg.Add(2) go func() { err := StartApiServer(":8080", ctx) @@ -146,7 +163,9 @@ func main() { }() go func() { - err := ReceiveNats(conf.Nats, handleLine, runtime.NumCPU()-1, ctx) + // err := ReceiveNats(conf.Nats, handleLine, runtime.NumCPU()-1, ctx) + err := ReceiveNats(conf.Nats, handleLine, 1, ctx) + if err != nil { log.Fatal(err) } @@ -155,12 +174,10 @@ func main() { wg.Wait() - if conf.ArchiveRoot != "" { - log.Printf("Writing to '%s'...\n", conf.ArchiveRoot) - files, err := memoryStore.ToArchive(conf.ArchiveRoot, lastCheckpoint.Unix(), time.Now().Unix()) - if err != nil { - log.Printf("Writing to archive failed: %s\n", err.Error()) - } - log.Printf("Done! (%d files written)\n", files) + log.Printf("Writing to '%s'...\n", conf.Checkpoints.RootDir) + files, err = memoryStore.ToCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) + if err != nil { + log.Printf("Writing checkpoint failed: %s\n", err.Error()) } + log.Printf("Done! (%d files written)\n", files) }