Improve error handling and implement tests

This commit is contained in:
Jan Eitzinger 2022-09-11 07:09:10 +02:00
parent 80583323c0
commit 48c1ba097a
2 changed files with 179 additions and 18 deletions

View File

@ -27,8 +27,6 @@ type FsArchive struct {
clusters []string clusters []string
} }
// For a given job, return the path of the `data.json`/`meta.json` file.
// TODO: Implement Issue ClusterCockpit/ClusterCockpit#97
func getPath( func getPath(
job *schema.Job, job *schema.Job,
rootPath string, rootPath string,
@ -46,6 +44,7 @@ func loadJobMeta(filename string) (schema.JobMeta, error) {
f, err := os.Open(filename) f, err := os.Open(filename)
if err != nil { if err != nil {
log.Errorf("fsBackend loadJobMeta()- %v", err)
return schema.JobMeta{}, err return schema.JobMeta{}, err
} }
defer f.Close() defer f.Close()
@ -57,13 +56,20 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) error {
var config FsArchiveConfig var config FsArchiveConfig
if err := json.Unmarshal(rawConfig, &config); err != nil { if err := json.Unmarshal(rawConfig, &config); err != nil {
return fmt.Errorf("fsBackend Init()- %w", err) log.Errorf("fsBackend Init()- %v", err)
return err
}
if config.Path == "" {
err := fmt.Errorf("fsBackend Init()- empty path")
log.Errorf("fsBackend Init()- %v", err)
return err
} }
fsa.path = config.Path fsa.path = config.Path
entries, err := os.ReadDir(fsa.path) entries, err := os.ReadDir(fsa.path)
if err != nil { if err != nil {
return fmt.Errorf("fsBackend Init()- Cannot read dir %s: %w", fsa.path, err) log.Errorf("fsBackend Init()- %v", err)
return err
} }
for _, de := range entries { for _, de := range entries {
@ -76,9 +82,9 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) error {
func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
filename := getPath(job, fsa.path, "data.json") filename := getPath(job, fsa.path, "data.json")
f, err := os.Open(filename) f, err := os.Open(filename)
if err != nil { if err != nil {
log.Errorf("fsBackend LoadJobData()- %v", err)
return nil, err return nil, err
} }
defer f.Close() defer f.Close()
@ -89,21 +95,15 @@ func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (schema.JobMeta, error) { func (fsa *FsArchive) LoadJobMeta(job *schema.Job) (schema.JobMeta, error) {
filename := getPath(job, fsa.path, "meta.json") filename := getPath(job, fsa.path, "meta.json")
return loadJobMeta(filename)
f, err := os.Open(filename)
if err != nil {
return schema.JobMeta{}, err
}
defer f.Close()
return DecodeJobMeta(bufio.NewReader(f))
} }
func (fsa *FsArchive) LoadClusterCfg(name string) (schema.Cluster, error) { func (fsa *FsArchive) LoadClusterCfg(name string) (schema.Cluster, error) {
f, err := os.Open(filepath.Join(fsa.path, name, "cluster.json")) f, err := os.Open(filepath.Join(fsa.path, name, "cluster.json"))
if err != nil { if err != nil {
return schema.Cluster{}, fmt.Errorf("fsBackend LoadClusterCfg()- Cannot open %s: %w", name, err) log.Errorf("fsBackend LoadClusterCfg()- %v", err)
return schema.Cluster{}, err
} }
defer f.Close() defer f.Close()
@ -143,12 +143,9 @@ func (fsa *FsArchive) Iter() <-chan *schema.JobMeta {
log.Fatalf("Reading jobs failed: %s", err.Error()) log.Fatalf("Reading jobs failed: %s", err.Error())
} }
// For compability with the old job-archive directory structure where
// there was no start time directory.
for _, startTimeDir := range startTimeDirs { for _, startTimeDir := range startTimeDirs {
if startTimeDir.IsDir() { if startTimeDir.IsDir() {
job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name())) job, err := loadJobMeta(filepath.Join(dirpath, startTimeDir.Name(), "meta.json"))
if err != nil { if err != nil {
log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error()) log.Errorf("in %s: %s", filepath.Join(dirpath, startTimeDir.Name()), err.Error())
} else { } else {
@ -159,6 +156,7 @@ func (fsa *FsArchive) Iter() <-chan *schema.JobMeta {
} }
} }
} }
close(ch)
}() }()
return ch return ch
} }

View File

@ -0,0 +1,163 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package archive
import (
"encoding/json"
"fmt"
"testing"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
func TestInitEmptyPath(t *testing.T) {
var fsa FsArchive
err := fsa.Init(json.RawMessage("{\"kind\":\"../../test/archive\"}"))
if err == nil {
t.Fatal(err)
}
}
func TestInitNoJson(t *testing.T) {
var fsa FsArchive
err := fsa.Init(json.RawMessage("\"path\":\"../../test/archive\"}"))
if err == nil {
t.Fatal(err)
}
}
func TestInitNotExists(t *testing.T) {
var fsa FsArchive
err := fsa.Init(json.RawMessage("{\"path\":\"../../test/job-archive\"}"))
if err == nil {
t.Fatal(err)
}
}
func TestInit(t *testing.T) {
var fsa FsArchive
err := fsa.Init(json.RawMessage("{\"path\":\"../../test/archive\"}"))
if err != nil {
t.Fatal(err)
}
if fsa.path != "../../test/archive" {
t.Fail()
}
if len(fsa.clusters) != 1 || fsa.clusters[0] != "emmy" {
t.Fail()
}
}
func TestLoadJobMetaInternal(t *testing.T) {
var fsa FsArchive
err := fsa.Init(json.RawMessage("{\"path\":\"../../test/archive\"}"))
if err != nil {
t.Fatal(err)
}
job, err := loadJobMeta("../../test/archive/emmy/1404/397/1609300556/meta.json")
if err != nil {
t.Fatal(err)
}
if job.JobID != 1404397 {
t.Fail()
}
if int(job.NumNodes) != len(job.Resources) {
t.Fail()
}
if job.StartTime != 1609300556 {
t.Fail()
}
}
func TestLoadJobMeta(t *testing.T) {
var fsa FsArchive
err := fsa.Init(json.RawMessage("{\"path\":\"../../test/archive\"}"))
if err != nil {
t.Fatal(err)
}
jobIn := schema.Job{BaseJob: schema.JobDefaults}
jobIn.StartTime = time.Unix(1608923076, 0)
jobIn.JobID = 1403244
jobIn.Cluster = "emmy"
job, err := fsa.LoadJobMeta(&jobIn)
if err != nil {
t.Fatal(err)
}
if job.JobID != 1403244 {
t.Fail()
}
if int(job.NumNodes) != len(job.Resources) {
t.Fail()
}
if job.StartTime != 1608923076 {
t.Fail()
}
}
func TestLoadJobData(t *testing.T) {
var fsa FsArchive
err := fsa.Init(json.RawMessage("{\"path\":\"../../test/archive\"}"))
if err != nil {
t.Fatal(err)
}
jobIn := schema.Job{BaseJob: schema.JobDefaults}
jobIn.StartTime = time.Unix(1608923076, 0)
jobIn.JobID = 1403244
jobIn.Cluster = "emmy"
data, err := fsa.LoadJobData(&jobIn)
if err != nil {
t.Fatal(err)
}
for name, scopes := range data {
fmt.Printf("Metric name: %s\n", name)
if _, exists := scopes[schema.MetricScopeNode]; !exists {
t.Fail()
}
}
}
func TestLoadCluster(t *testing.T) {
var fsa FsArchive
err := fsa.Init(json.RawMessage("{\"path\":\"../../test/archive\"}"))
if err != nil {
t.Fatal(err)
}
cfg, err := fsa.LoadClusterCfg("emmy")
if err != nil {
t.Fatal(err)
}
if cfg.SubClusters[0].CoresPerSocket != 10 {
t.Fail()
}
}
func TestIter(t *testing.T) {
var fsa FsArchive
err := fsa.Init(json.RawMessage("{\"path\":\"../../test/archive\"}"))
if err != nil {
t.Fatal(err)
}
for job := range fsa.Iter() {
fmt.Printf("Job %d\n", job.JobID)
if job.Cluster != "emmy" {
t.Fail()
}
}
}