ZIP checkpoints and move to archive

This commit is contained in:
Lou Knauer 2021-09-13 13:40:39 +02:00
parent 372d07b454
commit 22de7da5e4
2 changed files with 131 additions and 9 deletions

View File

@ -1,13 +1,16 @@
package main package main
import ( import (
"archive/zip"
"bufio" "bufio"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"io/fs" "io/fs"
"os" "os"
"path" "path"
"path/filepath"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -215,7 +218,7 @@ func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err
} }
} }
files, err := findFiles(jsonFiles, from) files, err := findFiles(jsonFiles, from, true)
if err != nil { if err != nil {
return filesLoaded, err return filesLoaded, err
} }
@ -249,7 +252,7 @@ func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err
// This will probably get very slow over time! // This will probably get very slow over time!
// A solution could be some sort of an index file in which all other files // A solution could be some sort of an index file in which all other files
// and the timespan they contain is listed. // and the timespan they contain is listed.
func findFiles(direntries []fs.DirEntry, from int64) ([]string, error) { func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) {
nums := map[string]int64{} nums := map[string]int64{}
for _, e := range direntries { for _, e := range direntries {
ts, err := strconv.ParseInt(strings.TrimSuffix(e.Name(), ".json"), 10, 64) ts, err := strconv.ParseInt(strings.TrimSuffix(e.Name(), ".json"), 10, 64)
@ -269,17 +272,109 @@ func findFiles(direntries []fs.DirEntry, from int64) ([]string, error) {
e := direntries[i] e := direntries[i]
ts1 := nums[e.Name()] ts1 := nums[e.Name()]
if from <= ts1 || i == len(direntries)-1 { if findMoreRecentFiles && t <= ts1 || i == len(direntries)-1 {
filenames = append(filenames, e.Name()) filenames = append(filenames, e.Name())
continue continue
} }
enext := direntries[i+1] enext := direntries[i+1]
ts2 := nums[enext.Name()] ts2 := nums[enext.Name()]
if ts1 < from && from < ts2 {
filenames = append(filenames, e.Name()) if findMoreRecentFiles {
if ts1 < t && t < ts2 {
filenames = append(filenames, e.Name())
}
} else {
if ts2 < t {
filenames = append(filenames, e.Name())
}
} }
} }
return filenames, nil return filenames, nil
} }
// ZIP all checkpoint files older than `from` together and write them to the `archiveDir`,
// deleting them from the `checkpointsDir`.
func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64) error {
entries1, err := os.ReadDir(checkpointsDir)
if err != nil {
return err
}
for _, de1 := range entries1 {
entries2, err := os.ReadDir(filepath.Join(checkpointsDir, de1.Name()))
if err != nil {
return err
}
for _, de2 := range entries2 {
cdir := filepath.Join(checkpointsDir, de1.Name(), de2.Name())
adir := filepath.Join(archiveDir, de1.Name(), de2.Name())
if err := archiveCheckpoints(cdir, adir, from); err != nil {
return err
}
}
}
return nil
}
// Helper function for `ArchiveCheckpoints`.
func archiveCheckpoints(dir string, archiveDir string, from int64) error {
entries, err := os.ReadDir(dir)
if err != nil {
return err
}
files, err := findFiles(entries, from, false)
if err != nil {
return err
}
filename := filepath.Join(archiveDir, fmt.Sprintf("%d.zip", from))
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil && os.IsNotExist(err) {
err = os.MkdirAll(archiveDir, 0755)
if err == nil {
f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644)
}
}
if err != nil {
return err
}
defer f.Close()
bw := bufio.NewWriter(f)
zw := zip.NewWriter(bw)
for _, jsonFile := range files {
filename := filepath.Join(dir, jsonFile)
r, err := os.Open(filename)
if err != nil {
return err
}
w, err := zw.Create(jsonFile)
if err != nil {
return err
}
if _, err = io.Copy(w, r); err != nil {
return err
}
if err = os.Remove(filename); err != nil {
return err
}
}
if err = zw.Close(); err != nil {
return err
}
if err = bw.Flush(); err != nil {
return err
}
return nil
}

View File

@ -76,10 +76,13 @@ func handleLine(line *Line) {
} }
func intervals(wg *sync.WaitGroup, ctx context.Context) { func intervals(wg *sync.WaitGroup, ctx context.Context) {
wg.Add(2) wg.Add(3)
go func() { go func() {
defer wg.Done() defer wg.Done()
d := time.Duration(conf.RetentionInMemory) * time.Second d := time.Duration(conf.RetentionInMemory) * time.Second
if d <= 0 {
return
}
ticks := time.Tick(d / 2) ticks := time.Tick(d / 2)
for { for {
select { select {
@ -102,13 +105,16 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) {
go func() { go func() {
defer wg.Done() defer wg.Done()
d := time.Duration(conf.Checkpoints.Interval) * time.Second d := time.Duration(conf.Checkpoints.Interval) * time.Second
if d <= 0 {
return
}
ticks := time.Tick(d) ticks := time.Tick(d)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-ticks: case <-ticks:
log.Printf("Checkpoint creation started...") log.Println("Checkpoint creation started...")
now := time.Now() now := time.Now()
n, err := memoryStore.ToCheckpoint(conf.Checkpoints.RootDir, n, err := memoryStore.ToCheckpoint(conf.Checkpoints.RootDir,
lastCheckpoint.Unix(), now.Unix()) lastCheckpoint.Unix(), now.Unix())
@ -122,8 +128,29 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) {
} }
}() }()
// TODO: Implement Archive-Stuff: go func() {
// Zip multiple checkpoints together, write to archive, delete from checkpoints defer wg.Done()
d := time.Duration(conf.Archive.Interval) * time.Second
if d <= 0 {
return
}
ticks := time.Tick(d)
for {
select {
case <-ctx.Done():
return
case <-ticks:
log.Println("Start zipping and deleting old checkpoints...")
t := time.Now().Add(-d)
err := ArchiveCheckpoints(conf.Checkpoints.RootDir, conf.Archive.RootDir, t.Unix())
if err != nil {
log.Printf("Archiving failed: %s\n", err.Error())
} else {
log.Println("Archiving checkpoints completed!")
}
}
}
}()
} }
func main() { func main() {