Introduce Retention and compression

This commit is contained in:
2023-05-09 16:33:26 +02:00
parent 72f178a088
commit 538427d59b
7 changed files with 159 additions and 34 deletions

View File

@@ -11,6 +11,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
@@ -38,17 +39,26 @@ func checkFileExists(filePath string) bool {
return !errors.Is(err, os.ErrNotExist)
}
func getDirectory(
job *schema.Job,
rootPath string,
) string {
lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000)
return filepath.Join(
rootPath,
job.Cluster,
lvl1, lvl2,
strconv.FormatInt(job.StartTime.Unix(), 10))
}
func getPath(
job *schema.Job,
rootPath string,
file string) string {
lvl1, lvl2 := fmt.Sprintf("%d", job.JobID/1000), fmt.Sprintf("%03d", job.JobID%1000)
return filepath.Join(
rootPath,
job.Cluster,
lvl1, lvl2,
strconv.FormatInt(job.StartTime.Unix(), 10), file)
getDirectory(job, rootPath), file)
}
func loadJobMeta(filename string) (*schema.JobMeta, error) {
@@ -147,12 +157,51 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
return version, nil
}
func (fsa *FsArchive) CleanUp() error {
func (fsa *FsArchive) CleanUp(jobs []*schema.Job) {
for _, job := range jobs {
dir := getDirectory(job, fsa.path)
if err := os.RemoveAll(dir); err != nil {
log.Errorf("JobArchive Cleanup() error: %v", err)
}
}
}
func (fsa *FsArchive) Compress(jobs []*schema.Job) {
for _, job := range jobs {
fileIn := getPath(job, fsa.path, "data.json")
if !checkFileExists(fileIn) {
originalFile, err := os.Open(fileIn)
if err != nil {
log.Errorf("JobArchive Compress() error: %v", err)
}
defer originalFile.Close()
fileOut := getPath(job, fsa.path, "data.json.gz")
gzippedFile, err := os.Create(fileOut)
if err != nil {
log.Errorf("JobArchive Compress() error: %v", err)
}
defer gzippedFile.Close()
gzipWriter := gzip.NewWriter(gzippedFile)
defer gzipWriter.Close()
_, err = io.Copy(gzipWriter, originalFile)
if err != nil {
log.Errorf("JobArchive Compress() error: %v", err)
}
gzipWriter.Flush()
if err := os.Remove(fileIn); err != nil {
log.Errorf("JobArchive Compress() error: %v", err)
}
}
}
}
func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
var isCompressed bool = true
var isCompressed bool
filename := getPath(job, fsa.path, "data.json.gz")
if !checkFileExists(filename) {
filename = getPath(job, fsa.path, "data.json")
@@ -227,7 +276,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
}
if loadMetricData {
var isCompressed bool = true
var isCompressed bool
filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz")
if !checkFileExists(filename) {