Add archive test and fix fsBackend test

This commit is contained in:
Jan Eitzinger 2023-05-11 16:17:17 +02:00
parent 19e3ba7290
commit cfafd5aa08
6 changed files with 182 additions and 10 deletions

View File

@ -49,6 +49,7 @@ distclean:
test:
$(info ===> TESTING)
@go clean -testcache
@go build ./...
@go vet ./...
@go test ./...

View File

@ -1,6 +1,6 @@
{
"addr": "127.0.0.1:8080",
"job-archive": {
"archive": {
"kind": "file",
"path": "./var/job-archive"
},

View File

@ -18,6 +18,8 @@ const Version uint64 = 1
type ArchiveBackend interface {
Init(rawConfig json.RawMessage) (uint64, error)
Exists(job *schema.Job) bool
LoadJobMeta(job *schema.Job) (*schema.JobMeta, error)
LoadJobData(job *schema.Job) (schema.JobData, error)

168
pkg/archive/archive_test.go Normal file
View File

@ -0,0 +1,168 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package archive_test
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
func CopyFile(src, dst string) (err error) {
in, err := os.Open(src)
if err != nil {
return
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
return
}
defer func() {
if e := out.Close(); e != nil {
err = e
}
}()
_, err = io.Copy(out, in)
if err != nil {
return
}
err = out.Sync()
if err != nil {
return
}
si, err := os.Stat(src)
if err != nil {
return
}
err = os.Chmod(dst, si.Mode())
if err != nil {
return
}
return
}
// CopyDir recursively copies a directory tree, attempting to preserve permissions.
// Source directory must exist, destination directory must *not* exist.
// Symlinks are ignored and skipped.
func CopyDir(src string, dst string) (err error) {
src = filepath.Clean(src)
dst = filepath.Clean(dst)
si, err := os.Stat(src)
if err != nil {
return err
}
if !si.IsDir() {
return fmt.Errorf("source is not a directory")
}
_, err = os.Stat(dst)
if err != nil && !os.IsNotExist(err) {
return
}
if err == nil {
return fmt.Errorf("destination already exists")
}
err = os.MkdirAll(dst, si.Mode())
if err != nil {
return
}
entries, err := ioutil.ReadDir(src)
if err != nil {
return
}
for _, entry := range entries {
srcPath := filepath.Join(src, entry.Name())
dstPath := filepath.Join(dst, entry.Name())
if entry.IsDir() {
err = CopyDir(srcPath, dstPath)
if err != nil {
return
}
} else {
// Skip symlinks.
if entry.Mode()&os.ModeSymlink != 0 {
continue
}
err = CopyFile(srcPath, dstPath)
if err != nil {
return
}
}
}
return
}
var jobs []*schema.Job
func setup(t *testing.T) archive.ArchiveBackend {
tmpdir := t.TempDir()
jobarchive := filepath.Join(tmpdir, "job-archive")
CopyDir("./testdata/archive/", jobarchive)
archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive)
if err := archive.Init(json.RawMessage(archiveCfg), false); err != nil {
t.Fatal(err)
}
jobs = make([]*schema.Job, 2)
jobs[0] = &schema.Job{}
jobs[0].JobID = 1403244
jobs[0].Cluster = "emmy"
jobs[0].StartTime = time.Unix(1608923076, 0)
jobs[1] = &schema.Job{}
jobs[0].JobID = 1404397
jobs[0].Cluster = "emmy"
jobs[0].StartTime = time.Unix(1609300556, 0)
return archive.GetHandle()
}
func TestCleanUp(t *testing.T) {
a := setup(t)
if !a.Exists(jobs[0]) {
t.Error("Job does not exist")
}
a.CleanUp(jobs)
if a.Exists(jobs[0]) || a.Exists(jobs[1]) {
t.Error("Jobs still exist")
}
}
// func TestCompress(t *testing.T) {
// a := setup(t)
// if !a.Exists(jobs[0]) {
// t.Error("Job does not exist")
// }
//
// a.Compress(jobs)
//
// if a.Exists(jobs[0]) || a.Exists(jobs[1]) {
// t.Error("Jobs still exist")
// }
// }

View File

@ -84,6 +84,7 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
log.Errorf("fsBackend LoadJobData()- %v", err)
return nil, err
}
defer f.Close()
if isCompressed {
r, err := gzip.NewReader(f)
@ -101,7 +102,6 @@ func loadJobData(filename string, isCompressed bool) (schema.JobData, error) {
return DecodeJobData(r, filename)
} else {
defer f.Close()
if config.Keys.Validate {
if err := schema.Validate(schema.Data, bufio.NewReader(f)); err != nil {
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
@ -157,6 +157,12 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
return version, nil
}
func (fsa *FsArchive) Exists(job *schema.Job) bool {
dir := getDirectory(job, fsa.path)
_, err := os.Stat(dir)
return !errors.Is(err, os.ErrNotExist)
}
func (fsa *FsArchive) CleanUp(jobs []*schema.Job) {
for _, job := range jobs {
dir := getDirectory(job, fsa.path)
@ -169,7 +175,7 @@ func (fsa *FsArchive) CleanUp(jobs []*schema.Job) {
func (fsa *FsArchive) Compress(jobs []*schema.Job) {
for _, job := range jobs {
fileIn := getPath(job, fsa.path, "data.json")
if !checkFileExists(fileIn) {
if !checkFileExists(fileIn) && (job.Duration > 600 || job.NumNodes > 4) {
originalFile, err := os.Open(fileIn)
if err != nil {
@ -201,7 +207,7 @@ func (fsa *FsArchive) Compress(jobs []*schema.Job) {
}
func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
var isCompressed bool
var isCompressed bool = true
filename := getPath(job, fsa.path, "data.json.gz")
if !checkFileExists(filename) {
filename = getPath(job, fsa.path, "data.json")
@ -276,7 +282,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
}
if loadMetricData {
var isCompressed bool
var isCompressed bool = true
filename := filepath.Join(dirpath, startTimeDir.Name(), "data.json.gz")
if !checkFileExists(filename) {

View File

@ -10,14 +10,9 @@ import (
"testing"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
func init() {
log.Init("info", true)
}
func TestInitEmptyPath(t *testing.T) {
var fsa FsArchive
_, err := fsa.Init(json.RawMessage("{\"kind\":\"testdata/archive\"}"))