cc-metric-store/internal/memorystore/archive.go

185 lines
3.9 KiB
Go
Raw Normal View History

package memorystore
import (
2021-09-13 13:40:39 +02:00
"archive/zip"
"bufio"
2024-05-06 14:20:43 +02:00
"context"
"errors"
"fmt"
2021-09-13 13:40:39 +02:00
"io"
2021-12-15 10:58:03 +01:00
"log"
"os"
2021-09-13 13:40:39 +02:00
"path/filepath"
"sync"
"sync/atomic"
2024-05-06 14:20:43 +02:00
"time"
2024-05-06 14:20:43 +02:00
"github.com/ClusterCockpit/cc-metric-store/internal/config"
)
2024-05-06 14:20:43 +02:00
func Archiving(wg *sync.WaitGroup, ctx context.Context) {
go func() {
defer wg.Done()
d, err := time.ParseDuration(config.Keys.Archive.Interval)
if err != nil {
log.Fatal(err)
2022-02-21 09:53:40 +01:00
}
2024-05-06 14:20:43 +02:00
if d <= 0 {
return
}
2022-02-17 11:00:30 +01:00
2024-05-06 14:20:43 +02:00
ticks := func() <-chan time.Time {
if d <= 0 {
return nil
}
2024-05-06 14:20:43 +02:00
return time.NewTicker(d).C
2022-02-17 11:00:30 +01:00
}()
2024-05-06 14:20:43 +02:00
for {
select {
case <-ctx.Done():
return
case <-ticks:
t := time.Now().Add(-d)
log.Printf("start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339))
n, err := ArchiveCheckpoints(config.Keys.Checkpoints.RootDir, config.Keys.Archive.RootDir, t.Unix(), config.Keys.Archive.DeleteInstead)
if err != nil {
2024-05-06 14:20:43 +02:00
log.Printf("archiving failed: %s\n", err.Error())
} else {
log.Printf("done: %d files zipped and moved to archive\n", n)
}
}
2021-12-15 10:58:03 +01:00
}
2024-05-06 14:20:43 +02:00
}()
}
2024-05-06 14:20:43 +02:00
var ErrNoNewData error = errors.New("all data already archived")
2021-09-13 13:40:39 +02:00
// ZIP all checkpoint files older than `from` together and write them to the `archiveDir`,
// deleting them from the `checkpointsDir`.
func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteInstead bool) (int, error) {
2021-09-13 13:40:39 +02:00
entries1, err := os.ReadDir(checkpointsDir)
if err != nil {
2021-12-15 10:23:21 +01:00
return 0, err
2021-09-13 13:40:39 +02:00
}
2022-02-21 09:53:40 +01:00
type workItem struct {
cdir, adir string
cluster, host string
}
var wg sync.WaitGroup
n, errs := int32(0), int32(0)
2022-02-21 12:29:46 +01:00
work := make(chan workItem, NumWorkers)
2022-02-21 09:53:40 +01:00
wg.Add(NumWorkers)
for worker := 0; worker < NumWorkers; worker++ {
go func() {
defer wg.Done()
for workItem := range work {
m, err := archiveCheckpoints(workItem.cdir, workItem.adir, from, deleteInstead)
2022-02-21 09:53:40 +01:00
if err != nil {
log.Printf("error while archiving %s/%s: %s", workItem.cluster, workItem.host, err.Error())
atomic.AddInt32(&errs, 1)
}
atomic.AddInt32(&n, int32(m))
}
}()
}
2021-09-13 13:40:39 +02:00
for _, de1 := range entries1 {
2022-02-21 09:53:40 +01:00
entries2, e := os.ReadDir(filepath.Join(checkpointsDir, de1.Name()))
if e != nil {
err = e
2021-09-13 13:40:39 +02:00
}
for _, de2 := range entries2 {
cdir := filepath.Join(checkpointsDir, de1.Name(), de2.Name())
adir := filepath.Join(archiveDir, de1.Name(), de2.Name())
2022-02-21 09:53:40 +01:00
work <- workItem{
adir: adir, cdir: cdir,
cluster: de1.Name(), host: de2.Name(),
2021-09-13 13:40:39 +02:00
}
}
}
2022-02-21 09:53:40 +01:00
close(work)
wg.Wait()
if err != nil {
return int(n), err
}
if errs > 0 {
return int(n), fmt.Errorf("%d errors happend while archiving (%d successes)", errs, n)
}
return int(n), nil
2021-09-13 13:40:39 +02:00
}
// Helper function for `ArchiveCheckpoints`.
func archiveCheckpoints(dir string, archiveDir string, from int64, deleteInstead bool) (int, error) {
2021-09-13 13:40:39 +02:00
entries, err := os.ReadDir(dir)
if err != nil {
2021-12-15 10:23:21 +01:00
return 0, err
2021-09-13 13:40:39 +02:00
}
files, err := findFiles(entries, from, false)
if err != nil {
2021-12-15 10:23:21 +01:00
return 0, err
2021-09-13 13:40:39 +02:00
}
if deleteInstead {
n := 0
for _, checkpoint := range files {
filename := filepath.Join(dir, checkpoint)
if err = os.Remove(filename); err != nil {
return n, err
}
n += 1
}
return n, nil
}
2021-09-13 13:40:39 +02:00
filename := filepath.Join(archiveDir, fmt.Sprintf("%d.zip", from))
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0o644)
2021-09-13 13:40:39 +02:00
if err != nil && os.IsNotExist(err) {
err = os.MkdirAll(archiveDir, 0o755)
2021-09-13 13:40:39 +02:00
if err == nil {
f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0o644)
2021-09-13 13:40:39 +02:00
}
}
if err != nil {
2021-12-15 10:23:21 +01:00
return 0, err
2021-09-13 13:40:39 +02:00
}
defer f.Close()
bw := bufio.NewWriter(f)
2022-02-21 12:29:46 +01:00
defer bw.Flush()
2021-09-13 13:40:39 +02:00
zw := zip.NewWriter(bw)
2022-02-21 12:29:46 +01:00
defer zw.Close()
2021-09-13 13:40:39 +02:00
2021-12-15 10:23:21 +01:00
n := 0
for _, checkpoint := range files {
filename := filepath.Join(dir, checkpoint)
2021-09-13 13:40:39 +02:00
r, err := os.Open(filename)
if err != nil {
2021-12-15 10:23:21 +01:00
return n, err
2021-09-13 13:40:39 +02:00
}
2022-02-21 12:29:46 +01:00
defer r.Close()
2021-09-13 13:40:39 +02:00
w, err := zw.Create(checkpoint)
2021-09-13 13:40:39 +02:00
if err != nil {
2021-12-15 10:23:21 +01:00
return n, err
2021-09-13 13:40:39 +02:00
}
if _, err = io.Copy(w, r); err != nil {
2021-12-15 10:23:21 +01:00
return n, err
2021-09-13 13:40:39 +02:00
}
if err = os.Remove(filename); err != nil {
2021-12-15 10:23:21 +01:00
return n, err
2021-09-13 13:40:39 +02:00
}
2021-12-15 10:23:21 +01:00
n += 1
2021-09-13 13:40:39 +02:00
}
2021-12-15 10:23:21 +01:00
return n, nil
2021-09-13 13:40:39 +02:00
}