diff --git a/Makefile b/Makefile index dfd833b..3a03b45 100644 --- a/Makefile +++ b/Makefile @@ -49,6 +49,7 @@ distclean: test: $(info ===> TESTING) + @go clean -testcache @go build ./... @go vet ./... @go test ./... diff --git a/docs/config.json b/docs/config.json index d18c072..f48d224 100644 --- a/docs/config.json +++ b/docs/config.json @@ -1,6 +1,6 @@ { "addr": "127.0.0.1:8080", - "job-archive": { + "archive": { "kind": "file", "path": "./var/job-archive" }, diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index c347551..51d95d3 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -18,6 +18,8 @@ const Version uint64 = 1 type ArchiveBackend interface { Init(rawConfig json.RawMessage) (uint64, error) + Exists(job *schema.Job) bool + LoadJobMeta(job *schema.Job) (*schema.JobMeta, error) LoadJobData(job *schema.Job) (schema.JobData, error) diff --git a/pkg/archive/archive_test.go b/pkg/archive/archive_test.go new file mode 100644 index 0000000..19d0a06 --- /dev/null +++ b/pkg/archive/archive_test.go @@ -0,0 +1,168 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package archive_test + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +func CopyFile(src, dst string) (err error) { + in, err := os.Open(src) + if err != nil { + return + } + defer in.Close() + + out, err := os.Create(dst) + if err != nil { + return + } + defer func() { + if e := out.Close(); e != nil { + err = e + } + }() + + _, err = io.Copy(out, in) + if err != nil { + return + } + + err = out.Sync() + if err != nil { + return + } + + si, err := os.Stat(src) + if err != nil { + return + } + err = os.Chmod(dst, si.Mode()) + if err != nil { + return + } + + return +} + +// CopyDir recursively copies a directory tree, attempting to preserve permissions. +// Source directory must exist, destination directory must *not* exist. +// Symlinks are ignored and skipped. +func CopyDir(src string, dst string) (err error) { + src = filepath.Clean(src) + dst = filepath.Clean(dst) + + si, err := os.Stat(src) + if err != nil { + return err + } + if !si.IsDir() { + return fmt.Errorf("source is not a directory") + } + + _, err = os.Stat(dst) + if err != nil && !os.IsNotExist(err) { + return + } + if err == nil { + return fmt.Errorf("destination already exists") + } + + err = os.MkdirAll(dst, si.Mode()) + if err != nil { + return + } + + entries, err := ioutil.ReadDir(src) + if err != nil { + return + } + + for _, entry := range entries { + srcPath := filepath.Join(src, entry.Name()) + dstPath := filepath.Join(dst, entry.Name()) + + if entry.IsDir() { + err = CopyDir(srcPath, dstPath) + if err != nil { + return + } + } else { + // Skip symlinks. + if entry.Mode()&os.ModeSymlink != 0 { + continue + } + + err = CopyFile(srcPath, dstPath) + if err != nil { + return + } + } + } + + return +} + +var jobs []*schema.Job + +func setup(t *testing.T) archive.ArchiveBackend { + tmpdir := t.TempDir() + jobarchive := filepath.Join(tmpdir, "job-archive") + CopyDir("./testdata/archive/", jobarchive) + archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive) + + if err := archive.Init(json.RawMessage(archiveCfg), false); err != nil { + t.Fatal(err) + } + + jobs = make([]*schema.Job, 2) + jobs[0] = &schema.Job{} + jobs[0].JobID = 1403244 + jobs[0].Cluster = "emmy" + jobs[0].StartTime = time.Unix(1608923076, 0) + + jobs[1] = &schema.Job{} + jobs[0].JobID = 1404397 + jobs[0].Cluster = "emmy" + jobs[0].StartTime = time.Unix(1609300556, 0) + + return archive.GetHandle() +} + +func TestCleanUp(t *testing.T) { + a := setup(t) + if !a.Exists(jobs[0]) { + t.Error("Job does not exist") + } + + a.CleanUp(jobs) + + if a.Exists(jobs[0]) || a.Exists(jobs[1]) { + t.Error("Jobs still exist") + } +} + +// func TestCompress(t *testing.T) { +// a := setup(t) +// if !a.Exists(jobs[0]) { +// t.Error("Job does not exist") +// } +// +// a.Compress(jobs) +// +// if a.Exists(jobs[0]) || a.Exists(jobs[1]) { +// t.Error("Jobs still exist") +// } +// } diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 1c52672..41d172d 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -84,6 +84,7 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) { log.Errorf("fsBackend LoadJobData()- %v", err) return nil, err } + defer f.Close() if isCompressed { r, err := gzip.NewReader(f) @@ -101,7 +102,6 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) { return DecodeJobData(r, filename) } else { - defer f.Close() if config.Keys.Validate { if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil { return schema.JobData{}, fmt.Errorf("validate job data: %v", err) @@ -157,6 +157,12 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) { return version, nil } +func (fsa *FsArchive) Exists(job *schema.Job) bool { + dir := getDirectory(job, fsa.path) + _, err := os.Stat(dir) + return !errors.Is(err, os.ErrNotExist) +} + func (fsa *FsArchive) CleanUp(jobs []*schema.Job) { for _, job := range jobs { dir := getDirectory(job, fsa.path) @@ -169,7 +175,7 @@ func (fsa *FsArchive) CleanUp(jobs []*schema.Job) { func (fsa *FsArchive) Compress(jobs []*schema.Job) { for _, job := range jobs { fileIn := getPath(job, fsa.path, "data.json") - if !checkFileExists(fileIn) { + if !checkFileExists(fileIn) && (job.Duration > 600 || job.NumNodes > 4) { originalFile, err := os.Open(fileIn) if err != nil { @@ -201,7 +207,7 @@ func (fsa *FsArchive) Compress(jobs []*schema.Job) { } func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { - var isCompressed bool + var isCompressed bool = true filename := getPath(job, fsa.path, "data.json.gz") if !checkFileExists(filename) { filename = getPath(job, fsa.path, "data.json") @@ -276,7 +282,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer { } if loadMetricData { - var isCompressed bool + var isCompressed bool = true filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz") if !checkFileExists(filename) { diff --git a/pkg/archive/fsBackend_test.go b/pkg/archive/fsBackend_test.go index 95d94cf..8e16e1b 100644 --- a/pkg/archive/fsBackend_test.go +++ b/pkg/archive/fsBackend_test.go @@ -10,14 +10,9 @@ import ( "testing" "time" - "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) -func init() { - log.Init("info", true) -} - func TestInitEmptyPath(t *testing.T) { var fsa FsArchive _, err := fsa.Init(json.RawMessage("{\"kind\":\"testdata/archive\"}"))