diff --git a/archive.go b/archive.go index cb6295d..b6f0087 100644 --- a/archive.go +++ b/archive.go @@ -1,13 +1,16 @@ package main import ( + "archive/zip" "bufio" "encoding/json" "errors" "fmt" + "io" "io/fs" "os" "path" + "path/filepath" "sort" "strconv" "strings" @@ -215,7 +218,7 @@ func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err } } - files, err := findFiles(jsonFiles, from) + files, err := findFiles(jsonFiles, from, true) if err != nil { return filesLoaded, err } @@ -249,7 +252,7 @@ func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err // This will probably get very slow over time! // A solution could be some sort of an index file in which all other files // and the timespan they contain is listed. -func findFiles(direntries []fs.DirEntry, from int64) ([]string, error) { +func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) { nums := map[string]int64{} for _, e := range direntries { ts, err := strconv.ParseInt(strings.TrimSuffix(e.Name(), ".json"), 10, 64) @@ -269,17 +272,109 @@ func findFiles(direntries []fs.DirEntry, from int64) ([]string, error) { e := direntries[i] ts1 := nums[e.Name()] - if from <= ts1 || i == len(direntries)-1 { + if findMoreRecentFiles && t <= ts1 || i == len(direntries)-1 { filenames = append(filenames, e.Name()) continue } enext := direntries[i+1] ts2 := nums[enext.Name()] - if ts1 < from && from < ts2 { - filenames = append(filenames, e.Name()) + + if findMoreRecentFiles { + if ts1 < t && t < ts2 { + filenames = append(filenames, e.Name()) + } + } else { + if ts2 < t { + filenames = append(filenames, e.Name()) + } } } return filenames, nil } + +// ZIP all checkpoint files older than `from` together and write them to the `archiveDir`, +// deleting them from the `checkpointsDir`. +func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64) error { + entries1, err := os.ReadDir(checkpointsDir) + if err != nil { + return err + } + + for _, de1 := range entries1 { + entries2, err := os.ReadDir(filepath.Join(checkpointsDir, de1.Name())) + if err != nil { + return err + } + + for _, de2 := range entries2 { + cdir := filepath.Join(checkpointsDir, de1.Name(), de2.Name()) + adir := filepath.Join(archiveDir, de1.Name(), de2.Name()) + if err := archiveCheckpoints(cdir, adir, from); err != nil { + return err + } + } + } + + return nil +} + +// Helper function for `ArchiveCheckpoints`. +func archiveCheckpoints(dir string, archiveDir string, from int64) error { + entries, err := os.ReadDir(dir) + if err != nil { + return err + } + + files, err := findFiles(entries, from, false) + if err != nil { + return err + } + + filename := filepath.Join(archiveDir, fmt.Sprintf("%d.zip", from)) + f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil && os.IsNotExist(err) { + err = os.MkdirAll(archiveDir, 0755) + if err == nil { + f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644) + } + } + if err != nil { + return err + } + defer f.Close() + bw := bufio.NewWriter(f) + zw := zip.NewWriter(bw) + + for _, jsonFile := range files { + filename := filepath.Join(dir, jsonFile) + r, err := os.Open(filename) + if err != nil { + return err + } + + w, err := zw.Create(jsonFile) + if err != nil { + return err + } + + if _, err = io.Copy(w, r); err != nil { + return err + } + + if err = os.Remove(filename); err != nil { + return err + } + } + + if err = zw.Close(); err != nil { + return err + } + + if err = bw.Flush(); err != nil { + return err + } + + return nil +} diff --git a/metric-store.go b/metric-store.go index 9d49b98..f06538e 100644 --- a/metric-store.go +++ b/metric-store.go @@ -76,10 +76,13 @@ func handleLine(line *Line) { } func intervals(wg *sync.WaitGroup, ctx context.Context) { - wg.Add(2) + wg.Add(3) go func() { defer wg.Done() d := time.Duration(conf.RetentionInMemory) * time.Second + if d <= 0 { + return + } ticks := time.Tick(d / 2) for { select { @@ -102,13 +105,16 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) { go func() { defer wg.Done() d := time.Duration(conf.Checkpoints.Interval) * time.Second + if d <= 0 { + return + } ticks := time.Tick(d) for { select { case <-ctx.Done(): return case <-ticks: - log.Printf("Checkpoint creation started...") + log.Println("Checkpoint creation started...") now := time.Now() n, err := memoryStore.ToCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), now.Unix()) @@ -122,8 +128,29 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) { } }() - // TODO: Implement Archive-Stuff: - // Zip multiple checkpoints together, write to archive, delete from checkpoints + go func() { + defer wg.Done() + d := time.Duration(conf.Archive.Interval) * time.Second + if d <= 0 { + return + } + ticks := time.Tick(d) + for { + select { + case <-ctx.Done(): + return + case <-ticks: + log.Println("Start zipping and deleting old checkpoints...") + t := time.Now().Add(-d) + err := ArchiveCheckpoints(conf.Checkpoints.RootDir, conf.Archive.RootDir, t.Unix()) + if err != nil { + log.Printf("Archiving failed: %s\n", err.Error()) + } else { + log.Println("Archiving checkpoints completed!") + } + } + } + }() } func main() {