diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index d14ae7d..2c6f21f 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -27,8 +27,6 @@ type FsArchive struct { clusters []string } -// For a given job, return the path of the `data.json`/`meta.json` file. -// TODO: Implement Issue ClusterCockpit/ClusterCockpit#97 func getPath( job *schema.Job, rootPath string, @@ -46,6 +44,7 @@ func loadJobMeta(filename string) (schema.JobMeta, error) { f, err := os.Open(filename) if err != nil { + log.Errorf("fsBackend loadJobMeta()- %v", err) return schema.JobMeta{}, err } defer f.Close() @@ -57,13 +56,20 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) error { var config FsArchiveConfig if err := json.Unmarshal(rawConfig, &config); err != nil { - return fmt.Errorf("fsBackend Init()- %w", err) + log.Errorf("fsBackend Init()- %v", err) + return err + } + if config.Path == "" { + err := fmt.Errorf("fsBackend Init()- empty path") + log.Errorf("fsBackend Init()- %v", err) + return err } fsa.path = config.Path entries, err := os.ReadDir(fsa.path) if err != nil { - return fmt.Errorf("fsBackend Init()- Cannot read dir %s: %w", fsa.path, err) + log.Errorf("fsBackend Init()- %v", err) + return err } for _, de := range entries { @@ -76,9 +82,9 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) error { func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { filename := getPath(job, fsa.path, "data.json") - f, err := os.Open(filename) if err != nil { + log.Errorf("fsBackend LoadJobData()- %v", err) return nil, err } defer f.Close() @@ -89,21 +95,15 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (schema.JobMeta, error) { filename := getPath(job, fsa.path, "meta.json") - - f, err := os.Open(filename) - if err != nil { - return schema.JobMeta{}, err - } - defer f.Close() - - return DecodeJobMeta(bufio.NewReader(f)) + return loadJobMeta(filename) } func (fsa *FsArchive) LoadClusterCfg(name string) (schema.Cluster, error) { f, err := os.Open(filepath.Join(fsa.path, name, "cluster.json")) if err != nil { - return schema.Cluster{}, fmt.Errorf("fsBackend LoadClusterCfg()- Cannot open %s: %w", name, err) + log.Errorf("fsBackend LoadClusterCfg()- %v", err) + return schema.Cluster{}, err } defer f.Close() @@ -143,12 +143,9 @@ func (fsa *FsArchive) Iter() <-chan *schema.JobMeta { log.Fatalf("Reading jobs failed: %s", err.Error()) } - // For compability with the old job-archive directory structure where - // there was no start time directory. for _, startTimeDir := range startTimeDirs { if startTimeDir.IsDir() { - job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name())) - + job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json")) if err != nil { log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) } else { @@ -159,6 +156,7 @@ func (fsa *FsArchive) Iter() <-chan *schema.JobMeta { } } } + close(ch) }() return ch } diff --git a/pkg/archive/fsBackend_test.go b/pkg/archive/fsBackend_test.go new file mode 100644 index 0000000..962aa23 --- /dev/null +++ b/pkg/archive/fsBackend_test.go @@ -0,0 +1,163 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package archive + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +func TestInitEmptyPath(t *testing.T) { + var fsa FsArchive + err := fsa.Init(json.RawMessage("{\"kind\":\"../../test/archive\"}")) + if err == nil { + t.Fatal(err) + } +} + +func TestInitNoJson(t *testing.T) { + var fsa FsArchive + err := fsa.Init(json.RawMessage("\"path\":\"../../test/archive\"}")) + if err == nil { + t.Fatal(err) + } +} +func TestInitNotExists(t *testing.T) { + var fsa FsArchive + err := fsa.Init(json.RawMessage("{\"path\":\"../../test/job-archive\"}")) + if err == nil { + t.Fatal(err) + } +} + +func TestInit(t *testing.T) { + var fsa FsArchive + err := fsa.Init(json.RawMessage("{\"path\":\"../../test/archive\"}")) + if err != nil { + t.Fatal(err) + } + + if fsa.path != "../../test/archive" { + t.Fail() + } + + if len(fsa.clusters) != 1 || fsa.clusters[0] != "emmy" { + t.Fail() + } +} + +func TestLoadJobMetaInternal(t *testing.T) { + var fsa FsArchive + err := fsa.Init(json.RawMessage("{\"path\":\"../../test/archive\"}")) + if err != nil { + t.Fatal(err) + } + + job, err := loadJobMeta("../../test/archive/emmy/1404/397/1609300556/meta.json") + if err != nil { + t.Fatal(err) + } + + if job.JobID != 1404397 { + t.Fail() + } + if int(job.NumNodes) != len(job.Resources) { + t.Fail() + } + if job.StartTime != 1609300556 { + t.Fail() + } +} + +func TestLoadJobMeta(t *testing.T) { + var fsa FsArchive + err := fsa.Init(json.RawMessage("{\"path\":\"../../test/archive\"}")) + if err != nil { + t.Fatal(err) + } + + jobIn := schema.Job{BaseJob: schema.JobDefaults} + jobIn.StartTime = time.Unix(1608923076, 0) + jobIn.JobID = 1403244 + jobIn.Cluster = "emmy" + + job, err := fsa.LoadJobMeta(&jobIn) + if err != nil { + t.Fatal(err) + } + + if job.JobID != 1403244 { + t.Fail() + } + if int(job.NumNodes) != len(job.Resources) { + t.Fail() + } + if job.StartTime != 1608923076 { + t.Fail() + } +} + +func TestLoadJobData(t *testing.T) { + var fsa FsArchive + err := fsa.Init(json.RawMessage("{\"path\":\"../../test/archive\"}")) + if err != nil { + t.Fatal(err) + } + + jobIn := schema.Job{BaseJob: schema.JobDefaults} + jobIn.StartTime = time.Unix(1608923076, 0) + jobIn.JobID = 1403244 + jobIn.Cluster = "emmy" + + data, err := fsa.LoadJobData(&jobIn) + if err != nil { + t.Fatal(err) + } + + for name, scopes := range data { + fmt.Printf("Metric name: %s\n", name) + + if _, exists := scopes[schema.MetricScopeNode]; !exists { + t.Fail() + } + } +} + +func TestLoadCluster(t *testing.T) { + var fsa FsArchive + err := fsa.Init(json.RawMessage("{\"path\":\"../../test/archive\"}")) + if err != nil { + t.Fatal(err) + } + + cfg, err := fsa.LoadClusterCfg("emmy") + if err != nil { + t.Fatal(err) + } + + if cfg.SubClusters[0].CoresPerSocket != 10 { + t.Fail() + } +} + +func TestIter(t *testing.T) { + var fsa FsArchive + err := fsa.Init(json.RawMessage("{\"path\":\"../../test/archive\"}")) + if err != nil { + t.Fatal(err) + } + + for job := range fsa.Iter() { + fmt.Printf("Job %d\n", job.JobID) + + if job.Cluster != "emmy" { + t.Fail() + } + } +}