diff --git a/internal/util/compress.go b/internal/util/compress.go new file mode 100644 index 0000000..a7a36bf --- /dev/null +++ b/internal/util/compress.go @@ -0,0 +1,73 @@ +package util + +import ( + "compress/gzip" + "io" + "os" + + "github.com/ClusterCockpit/cc-backend/pkg/log" +) + +func CompressFile(fileIn string, fileOut string) error { + originalFile, err := os.Open(fileIn) + if err != nil { + log.Errorf("CompressFile() error: %v", err) + return err + } + defer originalFile.Close() + + gzippedFile, err := os.Create(fileOut) + + if err != nil { + log.Errorf("CompressFile() error: %v", err) + return err + } + defer gzippedFile.Close() + + gzipWriter := gzip.NewWriter(gzippedFile) + defer gzipWriter.Close() + + _, err = io.Copy(gzipWriter, originalFile) + if err != nil { + log.Errorf("CompressFile() error: %v", err) + return err + } + gzipWriter.Flush() + if err := os.Remove(fileIn); err != nil { + log.Errorf("CompressFile() error: %v", err) + return err + } + + return nil +} + +func UncompressFile(fileIn string, fileOut string) error { + gzippedFile, err := os.Open(fileIn) + if err != nil { + log.Errorf("UncompressFile() error: %v", err) + return err + } + defer gzippedFile.Close() + + gzipReader, _ := gzip.NewReader(gzippedFile) + defer gzipReader.Close() + + uncompressedFile, err := os.Create(fileOut) + if err != nil { + log.Errorf("UncompressFile() error: %v", err) + return err + } + defer uncompressedFile.Close() + + _, err = io.Copy(uncompressedFile, gzipReader) + if err != nil { + log.Errorf("UncompressFile() error: %v", err) + return err + } + if err := os.Remove(fileIn); err != nil { + log.Errorf("UncompressFile() error: %v", err) + return err + } + + return nil +} diff --git a/internal/util/copy.go b/internal/util/copy.go new file mode 100644 index 0000000..f66b976 --- /dev/null +++ b/internal/util/copy.go @@ -0,0 +1,106 @@ +package util + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" +) + +func CopyFile(src, dst string) (err error) { + in, err := os.Open(src) + if err != nil { + return + } + defer in.Close() + + out, err := os.Create(dst) + if err != nil { + return + } + defer func() { + if e := out.Close(); e != nil { + err = e + } + }() + + _, err = io.Copy(out, in) + if err != nil { + return + } + + err = out.Sync() + if err != nil { + return + } + + si, err := os.Stat(src) + if err != nil { + return + } + err = os.Chmod(dst, si.Mode()) + if err != nil { + return + } + + return +} + +// CopyDir recursively copies a directory tree, attempting to preserve permissions. +// Source directory must exist, destination directory must *not* exist. +// Symlinks are ignored and skipped. +func CopyDir(src string, dst string) (err error) { + src = filepath.Clean(src) + dst = filepath.Clean(dst) + + si, err := os.Stat(src) + if err != nil { + return err + } + if !si.IsDir() { + return fmt.Errorf("source is not a directory") + } + + _, err = os.Stat(dst) + if err != nil && !os.IsNotExist(err) { + return + } + if err == nil { + return fmt.Errorf("destination already exists") + } + + err = os.MkdirAll(dst, si.Mode()) + if err != nil { + return + } + + entries, err := ioutil.ReadDir(src) + if err != nil { + return + } + + for _, entry := range entries { + srcPath := filepath.Join(src, entry.Name()) + dstPath := filepath.Join(dst, entry.Name()) + + if entry.IsDir() { + err = CopyDir(srcPath, dstPath) + if err != nil { + return + } + } else { + // Skip symlinks. + if entry.Mode()&os.ModeSymlink != 0 { + continue + } + + err = CopyFile(srcPath, dstPath) + if err != nil { + return + } + } + } + + return +} diff --git a/internal/util/fstat.go b/internal/util/fstat.go new file mode 100644 index 0000000..60cd68e --- /dev/null +++ b/internal/util/fstat.go @@ -0,0 +1,30 @@ +package util + +import ( + "errors" + "os" + + "github.com/ClusterCockpit/cc-backend/pkg/log" +) + +func CheckFileExists(filePath string) bool { + _, err := os.Stat(filePath) + return !errors.Is(err, os.ErrNotExist) +} + +func GetFilesize(filePath string) int64 { + fileInfo, err := os.Stat(filePath) + if err != nil { + log.Errorf("Error on Stat %s: %v", filePath, err) + } + return fileInfo.Size() +} + +func GetFilecount(path string) int { + files, err := os.ReadDir(path) + if err != nil { + log.Errorf("Error on ReadDir %s: %v", path, err) + } + + return len(files) +} diff --git a/pkg/archive/archive_test.go b/pkg/archive/archive_test.go index 19d0a06..b41a033 100644 --- a/pkg/archive/archive_test.go +++ b/pkg/archive/archive_test.go @@ -7,120 +7,21 @@ package archive_test import ( "encoding/json" "fmt" - "io" - "io/ioutil" - "os" "path/filepath" "testing" "time" + "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) -func CopyFile(src, dst string) (err error) { - in, err := os.Open(src) - if err != nil { - return - } - defer in.Close() - - out, err := os.Create(dst) - if err != nil { - return - } - defer func() { - if e := out.Close(); e != nil { - err = e - } - }() - - _, err = io.Copy(out, in) - if err != nil { - return - } - - err = out.Sync() - if err != nil { - return - } - - si, err := os.Stat(src) - if err != nil { - return - } - err = os.Chmod(dst, si.Mode()) - if err != nil { - return - } - - return -} - -// CopyDir recursively copies a directory tree, attempting to preserve permissions. -// Source directory must exist, destination directory must *not* exist. -// Symlinks are ignored and skipped. -func CopyDir(src string, dst string) (err error) { - src = filepath.Clean(src) - dst = filepath.Clean(dst) - - si, err := os.Stat(src) - if err != nil { - return err - } - if !si.IsDir() { - return fmt.Errorf("source is not a directory") - } - - _, err = os.Stat(dst) - if err != nil && !os.IsNotExist(err) { - return - } - if err == nil { - return fmt.Errorf("destination already exists") - } - - err = os.MkdirAll(dst, si.Mode()) - if err != nil { - return - } - - entries, err := ioutil.ReadDir(src) - if err != nil { - return - } - - for _, entry := range entries { - srcPath := filepath.Join(src, entry.Name()) - dstPath := filepath.Join(dst, entry.Name()) - - if entry.IsDir() { - err = CopyDir(srcPath, dstPath) - if err != nil { - return - } - } else { - // Skip symlinks. - if entry.Mode()&os.ModeSymlink != 0 { - continue - } - - err = CopyFile(srcPath, dstPath) - if err != nil { - return - } - } - } - - return -} - var jobs []*schema.Job func setup(t *testing.T) archive.ArchiveBackend { tmpdir := t.TempDir() jobarchive := filepath.Join(tmpdir, "job-archive") - CopyDir("./testdata/archive/", jobarchive) + util.CopyDir("./testdata/archive/", jobarchive) archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive) if err := archive.Init(json.RawMessage(archiveCfg), false); err != nil { diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 41d172d..227f37b 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -11,7 +11,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "os" "path" "path/filepath" @@ -20,6 +19,7 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/santhosh-tekuri/jsonschema/v5" @@ -34,11 +34,6 @@ type FsArchive struct { clusters []string } -func checkFileExists(filePath string) bool { - _, err := os.Stat(filePath) - return !errors.Is(err, os.ErrNotExist) -} - func getDirectory( job *schema.Job, rootPath string, @@ -169,39 +164,21 @@ func (fsa *FsArchive) CleanUp(jobs []*schema.Job) { if err := os.RemoveAll(dir); err != nil { log.Errorf("JobArchive Cleanup() error: %v", err) } + + parent := filepath.Clean(filepath.Join(dir, "..")) + if util.GetFilecount(parent) == 0 { + if err := os.Remove(parent); err != nil { + log.Errorf("JobArchive Cleanup() error: %v", err) + } + } } } func (fsa *FsArchive) Compress(jobs []*schema.Job) { for _, job := range jobs { fileIn := getPath(job, fsa.path, "data.json") - if !checkFileExists(fileIn) && (job.Duration > 600 || job.NumNodes > 4) { - - originalFile, err := os.Open(fileIn) - if err != nil { - log.Errorf("JobArchive Compress() error: %v", err) - } - defer originalFile.Close() - - fileOut := getPath(job, fsa.path, "data.json.gz") - gzippedFile, err := os.Create(fileOut) - - if err != nil { - log.Errorf("JobArchive Compress() error: %v", err) - } - defer gzippedFile.Close() - - gzipWriter := gzip.NewWriter(gzippedFile) - defer gzipWriter.Close() - - _, err = io.Copy(gzipWriter, originalFile) - if err != nil { - log.Errorf("JobArchive Compress() error: %v", err) - } - gzipWriter.Flush() - if err := os.Remove(fileIn); err != nil { - log.Errorf("JobArchive Compress() error: %v", err) - } + if !util.CheckFileExists(fileIn) && util.GetFilesize(fileIn) > 2000 { + util.CompressFile(fileIn, getPath(job, fsa.path, "data.json.gz")) } } } @@ -209,7 +186,8 @@ func (fsa *FsArchive) Compress(jobs []*schema.Job) { func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { var isCompressed bool = true filename := getPath(job, fsa.path, "data.json.gz") - if !checkFileExists(filename) { + + if !util.CheckFileExists(filename) { filename = getPath(job, fsa.path, "data.json") isCompressed = false } @@ -218,7 +196,6 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { } func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) { - filename := getPath(job, fsa.path, "meta.json") return loadJobMeta(filename) } @@ -285,7 +262,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { var isCompressed bool = true filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz") - if !checkFileExists(filename) { + if !util.CheckFileExists(filename) { filename = filepath.Join(dirpath, startTimeDir.Name(), "data.json") isCompressed = false }