Introduce compression in job archive.

This commit is contained in:
Jan Eitzinger 2023-03-27 11:11:14 +02:00
parent 465d5c0186
commit a4a90074ee
6 changed files with 50 additions and 835948 deletions

View File

@ -7,7 +7,9 @@ package archive
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"compress/gzip"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"os" "os"
"path" "path"
@ -29,6 +31,11 @@ type FsArchive struct {
clusters []string clusters []string
} }
func checkFileExists(filePath string) bool {
_, err := os.Stat(filePath)
return !errors.Is(err, os.ErrNotExist)
}
func getPath( func getPath(
job *schema.Job, job *schema.Job,
rootPath string, rootPath string,
@ -82,16 +89,31 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) error {
} }
func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
var isCompressed bool = true
filename := getPath(job, fsa.path, "data.json.gz")
filename := getPath(job, fsa.path, "data.json") if !checkFileExists(filename) {
filename = getPath(job, fsa.path, "data.json")
isCompressed = false
}
f, err := os.Open(filename) f, err := os.Open(filename)
if err != nil { if err != nil {
log.Errorf("fsBackend LoadJobData()- %v", err) log.Errorf("fsBackend LoadJobData()- %v", err)
return nil, err return nil, err
} }
defer f.Close() defer f.Close()
return DecodeJobData(bufio.NewReader(f), filename) if isCompressed {
r, err := gzip.NewReader(f)
if err != nil {
log.Errorf(" %v", err)
return nil, err
}
return DecodeJobData(r, filename)
} else {
return DecodeJobData(bufio.NewReader(f), filename)
}
} }
func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) {
@ -217,12 +239,32 @@ func (fsa *FsArchive) ImportJob(
return err return err
} }
// var isCompressed bool = true
// // TODO Use shortJob Config for check
// if jobMeta.Duration < 300 {
// isCompressed = false
// f, err = os.Create(path.Join(dir, "data.json"))
// } else {
// f, err = os.Create(path.Join(dir, "data.json.gz"))
// }
// if err != nil {
// return err
// }
//
// if isCompressed {
// if err := EncodeJobData(gzip.NewWriter(f), jobData); err != nil {
// return err
// }
// } else {
// if err := EncodeJobData(f, jobData); err != nil {
// return err
// }
// }
f, err = os.Create(path.Join(dir, "data.json")) f, err = os.Create(path.Join(dir, "data.json"))
if err != nil {
return err
}
if err := EncodeJobData(f, jobData); err != nil { if err := EncodeJobData(f, jobData); err != nil {
return err return err
} }
return f.Close() return f.Close()
} }

File diff suppressed because it is too large Load Diff

Binary file not shown.

File diff suppressed because it is too large Load Diff

Binary file not shown.

View File

@ -468,7 +468,7 @@ func TestRestApi(t *testing.T) {
} }
const stopJobBody string = `{ const stopJobBody string = `{
"jobId": 123, "jobId": 123,
"startTime": 123456789, "startTime": 123456789,
"cluster": "testcluster", "cluster": "testcluster",
@ -551,7 +551,7 @@ func TestRestApi(t *testing.T) {
func subtestLetJobFail(t *testing.T, restapi *api.RestApi, r *mux.Router) { func subtestLetJobFail(t *testing.T, restapi *api.RestApi, r *mux.Router) {
const startJobBody string = `{ const startJobBody string = `{
"jobId": 12345, "jobId": 12345,
"user": "testuser", "user": "testuser",
"project": "testproj", "project": "testproj",
"cluster": "testcluster", "cluster": "testcluster",
@ -588,7 +588,7 @@ func subtestLetJobFail(t *testing.T, restapi *api.RestApi, r *mux.Router) {
} }
const stopJobBody string = `{ const stopJobBody string = `{
"jobId": 12345, "jobId": 12345,
"cluster": "testcluster", "cluster": "testcluster",
"jobState": "failed", "jobState": "failed",